diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go new file mode 100644 index 00000000..50474847 --- /dev/null +++ b/pkg/local_object_storage/shard/gc.go @@ -0,0 +1,163 @@ +package shard + +import ( + "context" + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Event represents class of external events. +type Event interface { + typ() eventType +} + +type eventType int + +const ( + _ eventType = iota +) + +type eventHandler func(context.Context, Event) + +type eventHandlers struct { + cancelFunc context.CancelFunc + + handlers []eventHandler +} + +type gc struct { + *gcCfg + + workerPool util.WorkerPool + + remover func() + + mEventHandler map[eventType]*eventHandlers +} + +type gcCfg struct { + eventChanInit func() <-chan Event + + removerInterval time.Duration + + log *logger.Logger + + workerPoolInit func(int) util.WorkerPool +} + +func defaultGCCfg() *gcCfg { + ch := make(chan Event) + close(ch) + + return &gcCfg{ + eventChanInit: func() <-chan Event { + return ch + }, + removerInterval: 10 * time.Second, + log: zap.L(), + workerPoolInit: func(int) util.WorkerPool { + return nil + }, + } +} + +func (gc *gc) init() { + sz := 0 + + for _, v := range gc.mEventHandler { + sz += len(v.handlers) + } + + if sz > 0 { + gc.workerPool = gc.workerPoolInit(sz) + } + + go gc.tickRemover() + go gc.listenEvents() +} + +func (gc *gc) listenEvents() { + eventChan := gc.eventChanInit() + + for { + event, ok := <-eventChan + if !ok { + gc.log.Warn("stop event listener by closed channel") + return + } + + v, ok := gc.mEventHandler[event.typ()] + if !ok { + continue + } + + v.cancelFunc() + + var ctx context.Context + ctx, v.cancelFunc = context.WithCancel(context.Background()) + + for _, h := range v.handlers { + err := gc.workerPool.Submit(func() { + h(ctx, event) + }) + if err != nil { + gc.log.Warn("could not submit GC job to worker pool", + zap.String("error", err.Error()), + ) + } + } + } +} + +func (gc *gc) tickRemover() { + timer := time.NewTimer(gc.removerInterval) + defer timer.Stop() + + for { + <-timer.C + gc.remover() + timer.Reset(gc.removerInterval) + } +} + +// iterates over metabase graveyard and deletes objects +// with GC-marked graves. +func (s *Shard) removeGarbage() { + buf := make([]*object.Address, 0, s.rmBatchSize) + + // iterate over metabase graveyard and accumulate + // objects with GC mark + err := s.metaBase.IterateOverGraveyard(func(g *meta.Grave) error { + if g.WithGCMark() { + buf = append(buf, g.Address()) + } + + return nil + }) + if err != nil { + s.log.Warn("iterator over metabase graveyard failed", + zap.String("error", err.Error()), + ) + + return + } else if len(buf) == 0 { + return + } + + // delete accumulated objects + _, err = s.Delete(new(DeletePrm). + WithAddresses(buf...), + ) + if err != nil { + s.log.Warn("could not delete the objects", + zap.String("error", err.Error()), + ) + + return + } +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 96b0d088..79eeaed3 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -1,10 +1,14 @@ package shard import ( + "time" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/atomic" + "go.uber.org/zap" ) // Shard represents single shard of NeoFS Local Storage Engine. @@ -24,6 +28,8 @@ type Shard struct { type Option func(*cfg) type cfg struct { + rmBatchSize int + useWriteCache bool info Info @@ -35,10 +41,16 @@ type cfg struct { writeCacheOpts []blobstor.Option log *logger.Logger + + gcCfg *gcCfg } func defaultCfg() *cfg { - return new(cfg) + return &cfg{ + rmBatchSize: 100, + log: zap.L(), + gcCfg: defaultGCCfg(), + } } // New creates, initializes and returns new Shard instance. @@ -98,6 +110,7 @@ func WithWriteCacheOptions(opts ...blobstor.Option) Option { func WithLogger(l *logger.Logger) Option { return func(c *cfg) { c.log = l + c.gcCfg.log = l } } @@ -112,3 +125,35 @@ func WithWriteCache(use bool) Option { func (s Shard) hasWriteCache() bool { return s.cfg.useWriteCache } + +// WithRemoverBatchSize returns option to set batch size +// of single removal operation. +func WithRemoverBatchSize(sz int) Option { + return func(c *cfg) { + c.rmBatchSize = sz + } +} + +// WithGCWorkerPoolInitializer returns option to set initializer of +// worker pool with specified worker number. +func WithGCWorkerPoolInitializer(wpInit func(int) util.WorkerPool) Option { + return func(c *cfg) { + c.gcCfg.workerPoolInit = wpInit + } +} + +// WithGCEventChannelInitializer returns option to set set initializer of +// GC event channel. +func WithGCEventChannelInitializer(chInit func() <-chan Event) Option { + return func(c *cfg) { + c.gcCfg.eventChanInit = chInit + } +} + +// WithGCRemoverSleepInterval returns option to specify sleep +// interval between object remover executions. +func WithGCRemoverSleepInterval(dur time.Duration) Option { + return func(c *cfg) { + c.gcCfg.removerInterval = dur + } +}