From 0d5495e997ecdb56f6a6d5a2407200376d3f828e Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sat, 3 Oct 2020 13:25:56 +0300 Subject: [PATCH] [#70] object manager: Implement an example object garbage collector Signed-off-by: Leonard Lyubich --- pkg/services/object_manager/gc/gc.go | 148 +++++++++++++++++++++++++ pkg/services/object_manager/gc/opts.go | 42 +++++++ 2 files changed, 190 insertions(+) create mode 100644 pkg/services/object_manager/gc/gc.go create mode 100644 pkg/services/object_manager/gc/opts.go diff --git a/pkg/services/object_manager/gc/gc.go b/pkg/services/object_manager/gc/gc.go new file mode 100644 index 00000000..69290a96 --- /dev/null +++ b/pkg/services/object_manager/gc/gc.go @@ -0,0 +1,148 @@ +package gc + +import ( + "context" + "time" + + "github.com/mr-tron/base58" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// GC represents an object garbage collector. +type GC struct { + *cfg + + timer *time.Timer + + queue chan *object.Address +} + +// Option represents GC constructor option. +type Option func(*cfg) + +type cfg struct { + sleepInterval, workInterval time.Duration + + queueCap uint32 + + log *logger.Logger + + remover Remover +} + +// Remover is an interface of the component that stores objects. +type Remover interface { + // Delete removes (or marks to remove) object from physical storage. + Delete(*object.Address) error +} + +func defaultCfg() *cfg { + return &cfg{ + sleepInterval: 5 * time.Second, + workInterval: 5 * time.Second, + queueCap: 10, + log: zap.L(), + } +} + +// New creates, initializes and returns GC instance. +func New(opts ...Option) *GC { + cfg := defaultCfg() + + for i := range opts { + opts[i](cfg) + } + + cfg.log = cfg.log.With(zap.String("component", "Object GC")) + + return &GC{ + cfg: cfg, + } +} + +func (gc *GC) Run(ctx context.Context) { + defer func() { + close(gc.queue) + gc.timer.Stop() + gc.log.Info("routine stopped") + }() + + gc.log.Info("process routine", + zap.Uint32("queue capacity", gc.queueCap), + zap.Duration("sleep interval", gc.sleepInterval), + zap.Duration("working interval", gc.workInterval), + ) + + gc.queue = make(chan *object.Address, gc.queueCap) + gc.timer = time.NewTimer(gc.sleepInterval) + + for { + select { + case <-ctx.Done(): + gc.log.Warn("context is done", + zap.String("error", ctx.Err().Error()), + ) + + return + case _, ok := <-gc.timer.C: + if !ok { + gc.log.Warn("timer is stopped") + + return + } + + abort := time.After(gc.workInterval) + + loop: + for { + select { + case <-ctx.Done(): + gc.log.Warn("context is done", + zap.String("error", ctx.Err().Error()), + ) + + return + case <-abort: + break loop + case addr, ok := <-gc.queue: + if !ok { + gc.log.Warn("queue channel is closed") + } else if err := gc.remover.Delete(addr); err != nil { + gc.log.Error("could not remove object", + zap.String("error", err.Error()), + ) + } else { + gc.log.Info("object removed", + zap.String("CID", stringifyCID(addr.GetContainerID())), + zap.String("ID", stringifyID(addr.GetObjectID())), + ) + } + } + } + + gc.timer.Reset(gc.sleepInterval) + } + } +} + +func stringifyID(addr *object.ID) string { + return base58.Encode(addr.ToV2().GetValue()) +} + +func stringifyCID(addr *container.ID) string { + return base58.Encode(addr.ToV2().GetValue()) +} + +// DeleteObjects adds list of adresses to delete queue. +func (gc *GC) DeleteObjects(list ...*object.Address) { + for i := range list { + select { + case gc.queue <- list[i]: + default: + gc.log.Info("queue for deletion is full") + } + } +} diff --git a/pkg/services/object_manager/gc/opts.go b/pkg/services/object_manager/gc/opts.go new file mode 100644 index 00000000..fa159cf7 --- /dev/null +++ b/pkg/services/object_manager/gc/opts.go @@ -0,0 +1,42 @@ +package gc + +import ( + "time" + + "github.com/nspcc-dev/neofs-node/pkg/util/logger" +) + +// WithRemover returns option to set object remover. +func WithRemover(v Remover) Option { + return func(c *cfg) { + c.remover = v + } +} + +// WithLogger returns option to set logging component. +func WithLogger(v *logger.Logger) Option { + return func(c *cfg) { + c.log = v + } +} + +// WithQueueCapacity returns option to set delete queue capacity. +func WithQueueCapacity(v uint32) Option { + return func(c *cfg) { + c.queueCap = v + } +} + +// WithWorkingInterval returns option to set working interval of GC. +func WithWorkingInterval(v time.Duration) Option { + return func(c *cfg) { + c.workInterval = v + } +} + +// WithSleepInteval returns option to set sleep interval of GC. +func WithSleepInterval(v time.Duration) Option { + return func(c *cfg) { + c.sleepInterval = v + } +}