forked from TrueCloudLab/frostfs-node
[#493] node: Get rid of outdated object GC worker
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
1f817d1cd2
commit
80ef4492c1
4 changed files with 0 additions and 217 deletions
|
@ -92,10 +92,6 @@ const (
|
||||||
// config keys for cfgReputation
|
// config keys for cfgReputation
|
||||||
cfgReputationContract = "reputation.scripthash"
|
cfgReputationContract = "reputation.scripthash"
|
||||||
|
|
||||||
cfgGCQueueSize = "gc.queuesize"
|
|
||||||
cfgGCQueueTick = "gc.duration.sleep"
|
|
||||||
cfgGCTimeout = "gc.duration.timeout"
|
|
||||||
|
|
||||||
cfgPolicerWorkScope = "policer.work_scope"
|
cfgPolicerWorkScope = "policer.work_scope"
|
||||||
cfgPolicerExpRate = "policer.expansion_rate"
|
cfgPolicerExpRate = "policer.expansion_rate"
|
||||||
cfgPolicerHeadTimeout = "policer.head_timeout"
|
cfgPolicerHeadTimeout = "policer.head_timeout"
|
||||||
|
@ -466,10 +462,6 @@ func defaultConfiguration(v *viper.Viper) {
|
||||||
|
|
||||||
v.SetDefault(cfgMetricsShutdownTimeout, "30s")
|
v.SetDefault(cfgMetricsShutdownTimeout, "30s")
|
||||||
|
|
||||||
v.SetDefault(cfgGCQueueSize, 1000)
|
|
||||||
v.SetDefault(cfgGCQueueTick, "5s")
|
|
||||||
v.SetDefault(cfgGCTimeout, "5s")
|
|
||||||
|
|
||||||
v.SetDefault(cfgPolicerWorkScope, 100)
|
v.SetDefault(cfgPolicerWorkScope, 100)
|
||||||
v.SetDefault(cfgPolicerExpRate, 10) // in %
|
v.SetDefault(cfgPolicerExpRate, 10) // in %
|
||||||
v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second)
|
v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second)
|
||||||
|
|
|
@ -34,7 +34,6 @@ import (
|
||||||
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
||||||
searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2"
|
searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/gc"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
|
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
|
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
|
||||||
|
@ -94,26 +93,12 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe
|
||||||
return s.get.GetRangeHash(ctx, req)
|
return s.get.GetRangeHash(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
type localObjectRemover struct {
|
|
||||||
storage *engine.StorageEngine
|
|
||||||
|
|
||||||
log *logger.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
type localObjectInhumer struct {
|
type localObjectInhumer struct {
|
||||||
storage *engine.StorageEngine
|
storage *engine.StorageEngine
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *localObjectRemover) Delete(addr ...*objectSDK.Address) error {
|
|
||||||
_, err := r.storage.Delete(new(engine.DeletePrm).
|
|
||||||
WithAddresses(addr...),
|
|
||||||
)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *localObjectInhumer) DeleteObjects(ts *objectSDK.Address, addr ...*objectSDK.Address) {
|
func (r *localObjectInhumer) DeleteObjects(ts *objectSDK.Address, addr ...*objectSDK.Address) {
|
||||||
prm := new(engine.InhumePrm)
|
prm := new(engine.InhumePrm)
|
||||||
|
|
||||||
|
@ -182,26 +167,11 @@ func initObjectService(c *cfg) {
|
||||||
sidechain: c.cfgMorph.client,
|
sidechain: c.cfgMorph.client,
|
||||||
}
|
}
|
||||||
|
|
||||||
objRemover := &localObjectRemover{
|
|
||||||
storage: ls,
|
|
||||||
log: c.log,
|
|
||||||
}
|
|
||||||
|
|
||||||
objInhumer := &localObjectInhumer{
|
objInhumer := &localObjectInhumer{
|
||||||
storage: ls,
|
storage: ls,
|
||||||
log: c.log,
|
log: c.log,
|
||||||
}
|
}
|
||||||
|
|
||||||
objGC := gc.New(
|
|
||||||
gc.WithLogger(c.log),
|
|
||||||
gc.WithRemover(objRemover),
|
|
||||||
gc.WithQueueCapacity(c.viper.GetUint32(cfgGCQueueSize)),
|
|
||||||
gc.WithSleepInterval(c.viper.GetDuration(cfgGCQueueTick)),
|
|
||||||
gc.WithWorkingInterval(c.viper.GetDuration(cfgGCTimeout)),
|
|
||||||
)
|
|
||||||
|
|
||||||
c.workers = append(c.workers, objGC)
|
|
||||||
|
|
||||||
repl := replicator.New(
|
repl := replicator.New(
|
||||||
replicator.WithLogger(c.log),
|
replicator.WithLogger(c.log),
|
||||||
replicator.WithPutTimeout(
|
replicator.WithPutTimeout(
|
||||||
|
|
|
@ -1,137 +0,0 @@
|
||||||
package gc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/mr-tron/base58"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
// GC represents an object garbage collector.
|
|
||||||
type GC struct {
|
|
||||||
*cfg
|
|
||||||
|
|
||||||
timer *time.Timer
|
|
||||||
|
|
||||||
queue chan *object.Address
|
|
||||||
}
|
|
||||||
|
|
||||||
// Option represents GC constructor option.
|
|
||||||
type Option func(*cfg)
|
|
||||||
|
|
||||||
type cfg struct {
|
|
||||||
sleepInterval, workInterval time.Duration
|
|
||||||
|
|
||||||
queueCap uint32
|
|
||||||
|
|
||||||
log *logger.Logger
|
|
||||||
|
|
||||||
remover Remover
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remover is an interface of the component that stores objects.
|
|
||||||
type Remover interface {
|
|
||||||
// Delete removes object from physical storage.
|
|
||||||
Delete(...*object.Address) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
|
||||||
return &cfg{
|
|
||||||
sleepInterval: 5 * time.Second,
|
|
||||||
workInterval: 5 * time.Second,
|
|
||||||
queueCap: 10,
|
|
||||||
log: zap.L(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates, initializes and returns GC instance.
|
|
||||||
func New(opts ...Option) *GC {
|
|
||||||
cfg := defaultCfg()
|
|
||||||
|
|
||||||
for i := range opts {
|
|
||||||
opts[i](cfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.log = cfg.log.With(zap.String("component", "Object GC"))
|
|
||||||
|
|
||||||
return &GC{
|
|
||||||
cfg: cfg,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gc *GC) Run(ctx context.Context) {
|
|
||||||
defer func() {
|
|
||||||
close(gc.queue)
|
|
||||||
gc.timer.Stop()
|
|
||||||
gc.log.Info("routine stopped")
|
|
||||||
}()
|
|
||||||
|
|
||||||
gc.log.Info("process routine",
|
|
||||||
zap.Uint32("queue capacity", gc.queueCap),
|
|
||||||
zap.Duration("sleep interval", gc.sleepInterval),
|
|
||||||
zap.Duration("working interval", gc.workInterval),
|
|
||||||
)
|
|
||||||
|
|
||||||
gc.queue = make(chan *object.Address, gc.queueCap)
|
|
||||||
gc.timer = time.NewTimer(gc.sleepInterval)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
gc.log.Warn("context is done",
|
|
||||||
zap.String("error", ctx.Err().Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
|
||||||
case _, ok := <-gc.timer.C:
|
|
||||||
if !ok {
|
|
||||||
gc.log.Warn("timer is stopped")
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
abort := time.After(gc.workInterval)
|
|
||||||
|
|
||||||
loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
gc.log.Warn("context is done",
|
|
||||||
zap.String("error", ctx.Err().Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
|
||||||
case <-abort:
|
|
||||||
break loop
|
|
||||||
case addr, ok := <-gc.queue:
|
|
||||||
if !ok {
|
|
||||||
gc.log.Warn("queue channel is closed")
|
|
||||||
} else if err := gc.remover.Delete(addr); err != nil {
|
|
||||||
gc.log.Error("could not remove object",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
gc.log.Info("object removed",
|
|
||||||
zap.String("CID", stringifyCID(addr.ContainerID())),
|
|
||||||
zap.String("ID", stringifyID(addr.ObjectID())),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
gc.timer.Reset(gc.sleepInterval)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func stringifyID(addr *object.ID) string {
|
|
||||||
return base58.Encode(addr.ToV2().GetValue())
|
|
||||||
}
|
|
||||||
|
|
||||||
func stringifyCID(addr *container.ID) string {
|
|
||||||
return base58.Encode(addr.ToV2().GetValue())
|
|
||||||
}
|
|
|
@ -1,42 +0,0 @@
|
||||||
package gc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
// WithRemover returns option to set object remover.
|
|
||||||
func WithRemover(v Remover) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.remover = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithLogger returns option to set logging component.
|
|
||||||
func WithLogger(v *logger.Logger) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.log = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithQueueCapacity returns option to set delete queue capacity.
|
|
||||||
func WithQueueCapacity(v uint32) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.queueCap = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithWorkingInterval returns option to set working interval of GC.
|
|
||||||
func WithWorkingInterval(v time.Duration) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.workInterval = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithSleepInteval returns option to set sleep interval of GC.
|
|
||||||
func WithSleepInterval(v time.Duration) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.sleepInterval = v
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue