From 13087dc3dd2d1847f4904509b889d62c5804f12d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 16 Feb 2021 14:34:48 +0300 Subject: [PATCH] [#378] shard: Implement skeleton of internal GC Shard's GC component consists of: * asynchronous remover that periodically wake up and removes all garbage objects from the shard, and goes to sleep for particular time interval; * external event listener that distributes jobs between workers; * group of workers that can handle a single job related to particular external event. Remover and event listener represents go-routines which are started by `init` method (calls from `Shard.Init`). In initial version all event handlers are interrupted: this means that next event of the same type will interrupt previous handling and start the new one. GC is fully encapsulated in Shard. All GC configurations are reflected in Shard's configuration. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/shard/gc.go | 163 ++++++++++++++++++++++++ pkg/local_object_storage/shard/shard.go | 47 ++++++- 2 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 pkg/local_object_storage/shard/gc.go diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go new file mode 100644 index 000000000..504748473 --- /dev/null +++ b/pkg/local_object_storage/shard/gc.go @@ -0,0 +1,163 @@ +package shard + +import ( + "context" + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Event represents class of external events. +type Event interface { + typ() eventType +} + +type eventType int + +const ( + _ eventType = iota +) + +type eventHandler func(context.Context, Event) + +type eventHandlers struct { + cancelFunc context.CancelFunc + + handlers []eventHandler +} + +type gc struct { + *gcCfg + + workerPool util.WorkerPool + + remover func() + + mEventHandler map[eventType]*eventHandlers +} + +type gcCfg struct { + eventChanInit func() <-chan Event + + removerInterval time.Duration + + log *logger.Logger + + workerPoolInit func(int) util.WorkerPool +} + +func defaultGCCfg() *gcCfg { + ch := make(chan Event) + close(ch) + + return &gcCfg{ + eventChanInit: func() <-chan Event { + return ch + }, + removerInterval: 10 * time.Second, + log: zap.L(), + workerPoolInit: func(int) util.WorkerPool { + return nil + }, + } +} + +func (gc *gc) init() { + sz := 0 + + for _, v := range gc.mEventHandler { + sz += len(v.handlers) + } + + if sz > 0 { + gc.workerPool = gc.workerPoolInit(sz) + } + + go gc.tickRemover() + go gc.listenEvents() +} + +func (gc *gc) listenEvents() { + eventChan := gc.eventChanInit() + + for { + event, ok := <-eventChan + if !ok { + gc.log.Warn("stop event listener by closed channel") + return + } + + v, ok := gc.mEventHandler[event.typ()] + if !ok { + continue + } + + v.cancelFunc() + + var ctx context.Context + ctx, v.cancelFunc = context.WithCancel(context.Background()) + + for _, h := range v.handlers { + err := gc.workerPool.Submit(func() { + h(ctx, event) + }) + if err != nil { + gc.log.Warn("could not submit GC job to worker pool", + zap.String("error", err.Error()), + ) + } + } + } +} + +func (gc *gc) tickRemover() { + timer := time.NewTimer(gc.removerInterval) + defer timer.Stop() + + for { + <-timer.C + gc.remover() + timer.Reset(gc.removerInterval) + } +} + +// iterates over metabase graveyard and deletes objects +// with GC-marked graves. +func (s *Shard) removeGarbage() { + buf := make([]*object.Address, 0, s.rmBatchSize) + + // iterate over metabase graveyard and accumulate + // objects with GC mark + err := s.metaBase.IterateOverGraveyard(func(g *meta.Grave) error { + if g.WithGCMark() { + buf = append(buf, g.Address()) + } + + return nil + }) + if err != nil { + s.log.Warn("iterator over metabase graveyard failed", + zap.String("error", err.Error()), + ) + + return + } else if len(buf) == 0 { + return + } + + // delete accumulated objects + _, err = s.Delete(new(DeletePrm). + WithAddresses(buf...), + ) + if err != nil { + s.log.Warn("could not delete the objects", + zap.String("error", err.Error()), + ) + + return + } +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 96b0d0881..79eeaed3d 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -1,10 +1,14 @@ package shard import ( + "time" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/atomic" + "go.uber.org/zap" ) // Shard represents single shard of NeoFS Local Storage Engine. @@ -24,6 +28,8 @@ type Shard struct { type Option func(*cfg) type cfg struct { + rmBatchSize int + useWriteCache bool info Info @@ -35,10 +41,16 @@ type cfg struct { writeCacheOpts []blobstor.Option log *logger.Logger + + gcCfg *gcCfg } func defaultCfg() *cfg { - return new(cfg) + return &cfg{ + rmBatchSize: 100, + log: zap.L(), + gcCfg: defaultGCCfg(), + } } // New creates, initializes and returns new Shard instance. @@ -98,6 +110,7 @@ func WithWriteCacheOptions(opts ...blobstor.Option) Option { func WithLogger(l *logger.Logger) Option { return func(c *cfg) { c.log = l + c.gcCfg.log = l } } @@ -112,3 +125,35 @@ func WithWriteCache(use bool) Option { func (s Shard) hasWriteCache() bool { return s.cfg.useWriteCache } + +// WithRemoverBatchSize returns option to set batch size +// of single removal operation. +func WithRemoverBatchSize(sz int) Option { + return func(c *cfg) { + c.rmBatchSize = sz + } +} + +// WithGCWorkerPoolInitializer returns option to set initializer of +// worker pool with specified worker number. +func WithGCWorkerPoolInitializer(wpInit func(int) util.WorkerPool) Option { + return func(c *cfg) { + c.gcCfg.workerPoolInit = wpInit + } +} + +// WithGCEventChannelInitializer returns option to set set initializer of +// GC event channel. +func WithGCEventChannelInitializer(chInit func() <-chan Event) Option { + return func(c *cfg) { + c.gcCfg.eventChanInit = chInit + } +} + +// WithGCRemoverSleepInterval returns option to specify sleep +// interval between object remover executions. +func WithGCRemoverSleepInterval(dur time.Duration) Option { + return func(c *cfg) { + c.gcCfg.removerInterval = dur + } +}