forked from TrueCloudLab/frostfs-node
[#1094] shard: unify collection of expired objects
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
fbb95cff14
commit
ed7c732676
1 changed files with 21 additions and 56 deletions
|
@ -216,39 +216,12 @@ func (s *Shard) removeGarbage() {
|
|||
}
|
||||
|
||||
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||
epoch := e.(newEpoch).epoch
|
||||
|
||||
var expired []*object.Address
|
||||
|
||||
// collect expired non-tombstone object
|
||||
err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return meta.ErrInterruptIterator
|
||||
default:
|
||||
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, false)
|
||||
if err != nil || len(expired) == 0 {
|
||||
if err != nil {
|
||||
s.log.Warn("iterator over expired objects failed", zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
if expiredObject.Type() != object.TypeTombstone {
|
||||
expired = append(expired, expiredObject.Address())
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
s.log.Warn("iterator over expired objects failed",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
} else if len(expired) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// check if context canceled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// inhume the collected objects
|
||||
|
@ -265,44 +238,36 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: can be unified with Shard.collectExpiredObjects.
|
||||
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||
epoch := e.(newEpoch).epoch
|
||||
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, true)
|
||||
if err != nil || len(expired) == 0 {
|
||||
if err != nil {
|
||||
s.log.Warn("iterator over expired tombstones failes", zap.String("error", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
s.expiredTombstonesCallback(ctx, expired)
|
||||
}
|
||||
|
||||
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, collectTombstones bool) ([]*object.Address, error) {
|
||||
var expired []*object.Address
|
||||
|
||||
// collect expired tombstone objects
|
||||
err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return meta.ErrInterruptIterator
|
||||
default:
|
||||
if (expiredObject.Type() == object.TypeTombstone) == collectTombstones {
|
||||
expired = append(expired, expiredObject.Address())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if expiredObject.Type() == object.TypeTombstone {
|
||||
expired = append(expired, expiredObject.Address())
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
s.log.Warn("iterator over expired tombstones failed",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
} else if len(expired) == 0 {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check if context canceled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
s.expiredTombstonesCallback(ctx, expired)
|
||||
return expired, ctx.Err()
|
||||
}
|
||||
|
||||
// HandleExpiredTombstones marks to be removed all objects that are
|
||||
|
|
Loading…
Reference in a new issue