From 717f2beb475e70780575ffc66bfa7047f034e045 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 17 Feb 2021 15:27:40 +0300 Subject: [PATCH] [#378] shard: Collect expired tombstones in GC every epoch Add new epoch event handler to GC that finds all expired tombstones and marks them and underlying objects to be removed. Shard uses callbacks provided by the storage engine to mark underlying objects. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/engine/inhume.go | 20 ++++++ pkg/local_object_storage/engine/shards.go | 5 +- pkg/local_object_storage/shard/control.go | 1 + pkg/local_object_storage/shard/gc.go | 74 +++++++++++++++++++++++ pkg/local_object_storage/shard/shard.go | 15 +++++ 5 files changed, 114 insertions(+), 1 deletion(-) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index d82f0e64..b40154c3 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" @@ -80,3 +81,22 @@ func (e *StorageEngine) inhume(addr *objectSDK.Address, prm *shard.InhumePrm, ch return } + +func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []*objectSDK.Address) { + tss := make(map[string]struct{}, len(addrs)) + + for i := range addrs { + tss[addrs[i].String()] = struct{}{} + } + + e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) { + sh.HandleExpiredTombstones(tss) + + select { + case <-ctx.Done(): + return true + default: + return false + } + }) +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index d125562d..efb2652a 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -29,7 +29,10 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { return nil, errors.Wrap(err, "could not generate shard ID") } - e.shards[id.String()] = shard.New(append(opts, shard.WithID(id))...) + e.shards[id.String()] = shard.New(append(opts, + shard.WithID(id), + shard.WithExpiredObjectsCallback(e.processExpiredTombstones), + )...) return id, nil } diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index b1d79747..8c29c3d6 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -47,6 +47,7 @@ func (s *Shard) Init() error { cancelFunc: func() {}, handlers: []eventHandler{ s.collectExpiredObjects, + s.collectExpiredTombstones, }, }, }, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 16396759..fc39a9b0 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -236,3 +236,77 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { return } } + +// TODO: can be unified with Shard.collectExpiredObjects. +func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { + epoch := e.(newEpoch).epoch + + var expired []*object.Address + + // collect expired tombstone objects + err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error { + select { + case <-ctx.Done(): + return meta.ErrInterruptIterator + default: + } + + if expiredObject.Type() == object.TypeTombstone { + expired = append(expired, expiredObject.Address()) + } + + return nil + }) + if err != nil { + s.log.Warn("iterator over expired tombstones failed", + zap.String("error", err.Error()), + ) + + return + } else if len(expired) == 0 { + return + } + + // check if context canceled + select { + case <-ctx.Done(): + return + default: + } + + s.expiredTombstonesCallback(ctx, expired) +} + +// HandleExpiredTombstones mark to be removed all objects that are expired in epoch +// and protected by tombstone with string address from tss. +// +// Does not modify tss. +func (s *Shard) HandleExpiredTombstones(tss map[string]struct{}) { + inhume := make([]*object.Address, 0, len(tss)) + + err := s.metaBase.IterateCoveredByTombstones(tss, func(addr *object.Address) error { + inhume = append(inhume, addr) + return nil + }) + if err != nil { + s.log.Warn("iterator over expired objects failed", + zap.String("error", err.Error()), + ) + + return + } else if len(inhume) == 0 { + return + } + + _, err = s.metaBase.Inhume(new(meta.InhumePrm). + WithAddresses(inhume...). + WithGCMark(), + ) + if err != nil { + s.log.Warn("could not inhume objects under the expired tombstone", + zap.String("error", err.Error()), + ) + + return + } +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 79eeaed3..039b117e 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -1,8 +1,10 @@ package shard import ( + "context" "time" + "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/util" @@ -27,6 +29,9 @@ type Shard struct { // Option represents Shard's constructor option. type Option func(*cfg) +// ExpiredObjectsCallback is a callback handling list of expired objects. +type ExpiredObjectsCallback func(context.Context, []*object.Address) + type cfg struct { rmBatchSize int @@ -43,6 +48,8 @@ type cfg struct { log *logger.Logger gcCfg *gcCfg + + expiredTombstonesCallback ExpiredObjectsCallback } func defaultCfg() *cfg { @@ -157,3 +164,11 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option { c.gcCfg.removerInterval = dur } } + +// WithExpiredObjectsCallback returns option to specify callback +// of the expired tombstones handler. +func WithExpiredObjectsCallback(cb ExpiredObjectsCallback) Option { + return func(c *cfg) { + c.expiredTombstonesCallback = cb + } +}