diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index d82f0e64a..b40154c3e 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" @@ -80,3 +81,22 @@ func (e *StorageEngine) inhume(addr *objectSDK.Address, prm *shard.InhumePrm, ch return } + +func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []*objectSDK.Address) { + tss := make(map[string]struct{}, len(addrs)) + + for i := range addrs { + tss[addrs[i].String()] = struct{}{} + } + + e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) { + sh.HandleExpiredTombstones(tss) + + select { + case <-ctx.Done(): + return true + default: + return false + } + }) +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index d125562d1..efb2652a1 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -29,7 +29,10 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { return nil, errors.Wrap(err, "could not generate shard ID") } - e.shards[id.String()] = shard.New(append(opts, shard.WithID(id))...) + e.shards[id.String()] = shard.New(append(opts, + shard.WithID(id), + shard.WithExpiredObjectsCallback(e.processExpiredTombstones), + )...) return id, nil } diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index b1d79747b..8c29c3d6c 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -47,6 +47,7 @@ func (s *Shard) Init() error { cancelFunc: func() {}, handlers: []eventHandler{ s.collectExpiredObjects, + s.collectExpiredTombstones, }, }, }, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 163967593..fc39a9b0b 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -236,3 +236,77 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { return } } + +// TODO: can be unified with Shard.collectExpiredObjects. +func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { + epoch := e.(newEpoch).epoch + + var expired []*object.Address + + // collect expired tombstone objects + err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error { + select { + case <-ctx.Done(): + return meta.ErrInterruptIterator + default: + } + + if expiredObject.Type() == object.TypeTombstone { + expired = append(expired, expiredObject.Address()) + } + + return nil + }) + if err != nil { + s.log.Warn("iterator over expired tombstones failed", + zap.String("error", err.Error()), + ) + + return + } else if len(expired) == 0 { + return + } + + // check if context canceled + select { + case <-ctx.Done(): + return + default: + } + + s.expiredTombstonesCallback(ctx, expired) +} + +// HandleExpiredTombstones mark to be removed all objects that are expired in epoch +// and protected by tombstone with string address from tss. +// +// Does not modify tss. +func (s *Shard) HandleExpiredTombstones(tss map[string]struct{}) { + inhume := make([]*object.Address, 0, len(tss)) + + err := s.metaBase.IterateCoveredByTombstones(tss, func(addr *object.Address) error { + inhume = append(inhume, addr) + return nil + }) + if err != nil { + s.log.Warn("iterator over expired objects failed", + zap.String("error", err.Error()), + ) + + return + } else if len(inhume) == 0 { + return + } + + _, err = s.metaBase.Inhume(new(meta.InhumePrm). + WithAddresses(inhume...). + WithGCMark(), + ) + if err != nil { + s.log.Warn("could not inhume objects under the expired tombstone", + zap.String("error", err.Error()), + ) + + return + } +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 79eeaed3d..039b117e8 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -1,8 +1,10 @@ package shard import ( + "context" "time" + "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/util" @@ -27,6 +29,9 @@ type Shard struct { // Option represents Shard's constructor option. type Option func(*cfg) +// ExpiredObjectsCallback is a callback handling list of expired objects. +type ExpiredObjectsCallback func(context.Context, []*object.Address) + type cfg struct { rmBatchSize int @@ -43,6 +48,8 @@ type cfg struct { log *logger.Logger gcCfg *gcCfg + + expiredTombstonesCallback ExpiredObjectsCallback } func defaultCfg() *cfg { @@ -157,3 +164,11 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option { c.gcCfg.removerInterval = dur } } + +// WithExpiredObjectsCallback returns option to specify callback +// of the expired tombstones handler. +func WithExpiredObjectsCallback(cb ExpiredObjectsCallback) Option { + return func(c *cfg) { + c.expiredTombstonesCallback = cb + } +}