From 7799f8e4cfe9af592c51a13e20fedf65ede8bc86 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 19 Apr 2022 21:00:22 +0300 Subject: [PATCH] [#1318] engine: Change tombstone clear process - Delete objects physically on tombstone's arrival; - Store information about tombstones in the Graveyard; - Clear Graveyard every epoch based on the information about TS in the network. Signed-off-by: Pavel Karpy --- pkg/local_object_storage/engine/inhume.go | 11 +- .../metabase/graveyard.go | 2 +- pkg/local_object_storage/shard/gc.go | 127 ++++++++++-------- pkg/local_object_storage/shard/shard.go | 19 ++- 4 files changed, 93 insertions(+), 66 deletions(-) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 39c29aac..c42e83c0 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -4,6 +4,7 @@ import ( "context" "errors" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" @@ -153,15 +154,9 @@ func (e *StorageEngine) inhumeAddr(addr *addressSDK.Address, prm *shard.InhumePr return } -func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []*addressSDK.Address) { - tss := make(map[string]*addressSDK.Address, len(addrs)) - - for i := range addrs { - tss[addrs[i].String()] = addrs[i] - } - +func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { - sh.HandleExpiredTombstones(tss) + sh.HandleExpiredTombstones(addrs) select { case <-ctx.Done(): diff --git a/pkg/local_object_storage/metabase/graveyard.go b/pkg/local_object_storage/metabase/graveyard.go index 127f9ba6..a8b04cc5 100644 --- a/pkg/local_object_storage/metabase/graveyard.go +++ b/pkg/local_object_storage/metabase/graveyard.go @@ -2,10 +2,10 @@ package meta import ( "bytes" + "errors" "fmt" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" - "github.com/pkg/errors" "go.etcd.io/bbolt" ) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index eeb6ec8c..548316e3 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -13,6 +13,15 @@ import ( "go.uber.org/zap" ) +// TombstoneSource is an interface that checks +// tombstone status in the NeoFS network. +type TombstoneSource interface { + // IsTombstoneAvailable must return boolean value that means + // provided tombstone's presence in the NeoFS network at the + // time of the passed epoch. + IsTombstoneAvailable(ctx context.Context, addr *addressSDK.Address, epoch uint64) bool +} + // Event represents class of external events. type Event interface { typ() eventType @@ -238,17 +247,54 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { } func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { - expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool { - return typ == object.TypeTombstone - }) - if err != nil || len(expired) == 0 { - if err != nil { - s.log.Warn("iterator over expired tombstones failed", zap.String("error", err.Error())) + epoch := e.(newEpoch).epoch + log := s.log.With(zap.Uint64("epoch", epoch)) + + log.Debug("started expired tombstones handling") + + const tssDeleteBatch = 50 + tss := make([]meta.TombstonedObject, 0, tssDeleteBatch) + tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch) + + iterPrm := new(meta.GraveyardIterationPrm).SetHandler(func(deletedObject meta.TombstonedObject) error { + tss = append(tss, deletedObject) + + if len(tss) == tssDeleteBatch { + return meta.ErrInterruptIterator } - return + + return nil + }) + + for { + log.Debug("iterating tombstones") + + err := s.metaBase.IterateOverGraveyard(iterPrm) + if err != nil { + log.Error("iterator over graveyard failed", zap.Error(err)) + return + } + + tssLen := len(tss) + if tssLen == 0 { + break + } + + for _, ts := range tss { + if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) { + tssExp = append(tssExp, ts) + } + } + + log.Debug("handling expired tombstones batch", zap.Int("number", tssLen)) + s.expiredTombstonesCallback(ctx, tss) + + iterPrm.SetOffset(tss[tssLen-1].Address()) + tss = tss[:0] + tssExp = tssExp[:0] } - s.expiredTombstonesCallback(ctx, expired) + log.Debug("finished expired tombstones handling") } func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { @@ -285,60 +331,24 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond fu return expired, ctx.Err() } -// HandleExpiredTombstones marks to be removed all objects that are -// protected by tombstones with string addresses from tss. -// If successful, marks tombstones themselves as garbage. +// HandleExpiredTombstones marks tombstones themselves as garbage +// and clears up corresponding graveyard records. // // Does not modify tss. -func (s *Shard) HandleExpiredTombstones(tss map[string]*addressSDK.Address) { - inhume := make([]*addressSDK.Address, 0, len(tss)) - - // Collect all objects covered by the tombstones. - - err := s.metaBase.IterateCoveredByTombstones(tss, func(addr *addressSDK.Address) error { - inhume = append(inhume, addr) - return nil - }) - if err != nil { - s.log.Warn("iterator over expired objects failed", - zap.String("error", err.Error()), - ) - - return - } - - // Mark collected objects as garbage. - +func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) { + // Mark tombstones as garbage. var pInhume meta.InhumePrm + tsAddrs := make([]*addressSDK.Address, 0, len(tss)) + for _, ts := range tss { + tsAddrs = append(tsAddrs, ts.Tombstone()) + } + pInhume.WithGCMark() - - if len(inhume) > 0 { - // inhume objects - pInhume.WithAddresses(inhume...) - - _, err = s.metaBase.Inhume(&pInhume) - if err != nil { - s.log.Warn("could not inhume objects under the expired tombstone", - zap.String("error", err.Error()), - ) - - return - } - } - - // Mark the tombstones as garbage. - - inhume = inhume[:0] - - for _, addr := range tss { - inhume = append(inhume, addr) - } - - pInhume.WithAddresses(inhume...) // GC mark is already set above + pInhume.WithAddresses(tsAddrs...) // inhume tombstones - _, err = s.metaBase.Inhume(&pInhume) + _, err := s.metaBase.Inhume(&pInhume) if err != nil { s.log.Warn("could not mark tombstones as garbage", zap.String("error", err.Error()), @@ -346,6 +356,13 @@ func (s *Shard) HandleExpiredTombstones(tss map[string]*addressSDK.Address) { return } + + // drop just processed expired tombstones + // from graveyard + err = s.metaBase.DropGraves(tss) + if err != nil { + s.log.Warn("could not drop expired grave records", zap.Error(err)) + } } // HandleExpiredLocks unlocks all objects which were locked by lockers. diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 63c8d552..60ab31c8 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -25,11 +25,16 @@ type Shard struct { blobStor *blobstor.BlobStor metaBase *meta.DB + + tsSource TombstoneSource } // Option represents Shard's constructor option. type Option func(*cfg) +// ExpiredTombstonesCallback is a callback handling list of expired tombstones. +type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject) + // ExpiredObjectsCallback is a callback handling list of expired objects. type ExpiredObjectsCallback func(context.Context, []*addressSDK.Address) @@ -54,9 +59,11 @@ type cfg struct { gcCfg *gcCfg - expiredTombstonesCallback ExpiredObjectsCallback + expiredTombstonesCallback ExpiredTombstonesCallback expiredLocksCallback ExpiredObjectsCallback + + tsSource TombstoneSource } func defaultCfg() *cfg { @@ -91,6 +98,7 @@ func New(opts ...Option) *Shard { blobStor: bs, metaBase: mb, writeCache: writeCache, + tsSource: c.tsSource, } s.fillInfo() @@ -184,7 +192,7 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option { // WithExpiredTombstonesCallback returns option to specify callback // of the expired tombstones handler. -func WithExpiredTombstonesCallback(cb ExpiredObjectsCallback) Option { +func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option { return func(c *cfg) { c.expiredTombstonesCallback = cb } @@ -214,6 +222,13 @@ func WithMode(v Mode) Option { } } +// WithTombstoneSource returns option to set TombstoneSource. +func WithTombstoneSource(v TombstoneSource) Option { + return func(c *cfg) { + c.tsSource = v + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()