forked from TrueCloudLab/frostfs-node
[#70] object manager: Implement an example object garbage collector
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
798fca9354
commit
0d5495e997
2 changed files with 190 additions and 0 deletions
148
pkg/services/object_manager/gc/gc.go
Normal file
148
pkg/services/object_manager/gc/gc.go
Normal file
|
@ -0,0 +1,148 @@
|
||||||
|
package gc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/mr-tron/base58"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GC represents an object garbage collector.
|
||||||
|
type GC struct {
|
||||||
|
*cfg
|
||||||
|
|
||||||
|
timer *time.Timer
|
||||||
|
|
||||||
|
queue chan *object.Address
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option represents GC constructor option.
|
||||||
|
type Option func(*cfg)
|
||||||
|
|
||||||
|
type cfg struct {
|
||||||
|
sleepInterval, workInterval time.Duration
|
||||||
|
|
||||||
|
queueCap uint32
|
||||||
|
|
||||||
|
log *logger.Logger
|
||||||
|
|
||||||
|
remover Remover
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remover is an interface of the component that stores objects.
|
||||||
|
type Remover interface {
|
||||||
|
// Delete removes (or marks to remove) object from physical storage.
|
||||||
|
Delete(*object.Address) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultCfg() *cfg {
|
||||||
|
return &cfg{
|
||||||
|
sleepInterval: 5 * time.Second,
|
||||||
|
workInterval: 5 * time.Second,
|
||||||
|
queueCap: 10,
|
||||||
|
log: zap.L(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates, initializes and returns GC instance.
|
||||||
|
func New(opts ...Option) *GC {
|
||||||
|
cfg := defaultCfg()
|
||||||
|
|
||||||
|
for i := range opts {
|
||||||
|
opts[i](cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.log = cfg.log.With(zap.String("component", "Object GC"))
|
||||||
|
|
||||||
|
return &GC{
|
||||||
|
cfg: cfg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gc *GC) Run(ctx context.Context) {
|
||||||
|
defer func() {
|
||||||
|
close(gc.queue)
|
||||||
|
gc.timer.Stop()
|
||||||
|
gc.log.Info("routine stopped")
|
||||||
|
}()
|
||||||
|
|
||||||
|
gc.log.Info("process routine",
|
||||||
|
zap.Uint32("queue capacity", gc.queueCap),
|
||||||
|
zap.Duration("sleep interval", gc.sleepInterval),
|
||||||
|
zap.Duration("working interval", gc.workInterval),
|
||||||
|
)
|
||||||
|
|
||||||
|
gc.queue = make(chan *object.Address, gc.queueCap)
|
||||||
|
gc.timer = time.NewTimer(gc.sleepInterval)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
gc.log.Warn("context is done",
|
||||||
|
zap.String("error", ctx.Err().Error()),
|
||||||
|
)
|
||||||
|
|
||||||
|
return
|
||||||
|
case _, ok := <-gc.timer.C:
|
||||||
|
if !ok {
|
||||||
|
gc.log.Warn("timer is stopped")
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
abort := time.After(gc.workInterval)
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
gc.log.Warn("context is done",
|
||||||
|
zap.String("error", ctx.Err().Error()),
|
||||||
|
)
|
||||||
|
|
||||||
|
return
|
||||||
|
case <-abort:
|
||||||
|
break loop
|
||||||
|
case addr, ok := <-gc.queue:
|
||||||
|
if !ok {
|
||||||
|
gc.log.Warn("queue channel is closed")
|
||||||
|
} else if err := gc.remover.Delete(addr); err != nil {
|
||||||
|
gc.log.Error("could not remove object",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
gc.log.Info("object removed",
|
||||||
|
zap.String("CID", stringifyCID(addr.GetContainerID())),
|
||||||
|
zap.String("ID", stringifyID(addr.GetObjectID())),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
gc.timer.Reset(gc.sleepInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func stringifyID(addr *object.ID) string {
|
||||||
|
return base58.Encode(addr.ToV2().GetValue())
|
||||||
|
}
|
||||||
|
|
||||||
|
func stringifyCID(addr *container.ID) string {
|
||||||
|
return base58.Encode(addr.ToV2().GetValue())
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObjects adds list of adresses to delete queue.
|
||||||
|
func (gc *GC) DeleteObjects(list ...*object.Address) {
|
||||||
|
for i := range list {
|
||||||
|
select {
|
||||||
|
case gc.queue <- list[i]:
|
||||||
|
default:
|
||||||
|
gc.log.Info("queue for deletion is full")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
42
pkg/services/object_manager/gc/opts.go
Normal file
42
pkg/services/object_manager/gc/opts.go
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
package gc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
// WithRemover returns option to set object remover.
|
||||||
|
func WithRemover(v Remover) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.remover = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithLogger returns option to set logging component.
|
||||||
|
func WithLogger(v *logger.Logger) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.log = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithQueueCapacity returns option to set delete queue capacity.
|
||||||
|
func WithQueueCapacity(v uint32) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.queueCap = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithWorkingInterval returns option to set working interval of GC.
|
||||||
|
func WithWorkingInterval(v time.Duration) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.workInterval = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithSleepInteval returns option to set sleep interval of GC.
|
||||||
|
func WithSleepInterval(v time.Duration) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.sleepInterval = v
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue