From a74a402a7d0f8da0085852de840a235b60d85a57 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 10 Nov 2021 11:34:00 +0300 Subject: [PATCH] [#965] policer: Implement continuous replication Signed-off-by: Alex Vanin --- cmd/neofs-node/config.go | 22 ++++- cmd/neofs-node/object.go | 19 +--- pkg/services/policer/policer.go | 87 +++++++++++------- pkg/services/policer/process.go | 151 +++++++++++++------------------- pkg/services/policer/queue.go | 25 ++---- 5 files changed, 145 insertions(+), 159 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 4d2811f4..53259919 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -197,6 +197,10 @@ type cfgLocalStorage struct { type cfgObjectRoutines struct { putRemote *ants.Pool + + putRemoteCapacity int + + replication *ants.Pool } type cfgControlService struct { @@ -449,10 +453,13 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) { optNonBlocking := ants.WithNonblocking(true) - pool.putRemote, err = ants.NewPool(objectconfig.Put(cfg).PoolSizeRemote(), optNonBlocking) - if err != nil { - fatalOnErr(err) - } + pool.putRemoteCapacity = objectconfig.Put(cfg).PoolSizeRemote() + + pool.putRemote, err = ants.NewPool(pool.putRemoteCapacity, optNonBlocking) + fatalOnErr(err) + + pool.replication, err = ants.NewPool(pool.putRemoteCapacity) + fatalOnErr(err) return pool } @@ -486,3 +493,10 @@ func (c *cfg) bootstrap() error { func (c *cfg) needBootstrap() bool { return c.cfgNetmap.needBootstrap } + +// ObjectServiceLoad implements system loader interface for policer component. +// It is calculated as size/capacity ratio of "remote object put" worker. +// Returns float value between 0.0 and 1.0. +func (c *cfg) ObjectServiceLoad() float64 { + return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity) +} diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 7f82fe5e..c4181c4c 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -17,7 +17,6 @@ import ( morphClient "github.com/nspcc-dev/neofs-node/pkg/morph/client" cntrwrp "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" nmwrp "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" - "github.com/nspcc-dev/neofs-node/pkg/morph/event" objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" objectService "github.com/nspcc-dev/neofs-node/pkg/services/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/acl" @@ -232,8 +231,6 @@ func initObjectService(c *cfg) { c.workers = append(c.workers, repl) - ch := make(chan *policer.Task, 1) - pol := policer.New( policer.WithLogger(c.log), policer.WithLocalStorage(ls), @@ -241,9 +238,6 @@ func initObjectService(c *cfg) { policer.WithPlacementBuilder( placement.NewNetworkMapSourceBuilder(c.cfgObject.netMapSource), ), - policer.WithWorkScope(100), - policer.WithExpansionRate(10), - policer.WithTrigger(ch), policer.WithRemoteHeader( headsvc.NewRemoteHeader(keyStorage, clientConstructor), ), @@ -260,18 +254,11 @@ func initObjectService(c *cfg) { ) } }), + policer.WithMaxCapacity(c.cfgObject.pool.putRemoteCapacity), + policer.WithPool(c.cfgObject.pool.replication), + policer.WithNodeLoader(c), ) - addNewEpochNotificationHandler(c, func(ev event.Event) { - select { - case ch <- new(policer.Task): - case <-c.ctx.Done(): - close(ch) - default: - c.log.Info("policer is busy") - } - }) - traverseGen := util.NewTraverserGenerator(c.cfgObject.netMapSource, c.cfgObject.cnrSource, c) c.workers = append(c.workers, pol) diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index 69348a37..eff7b265 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -1,9 +1,9 @@ package policer import ( - "sync" "time" + lru "github.com/hashicorp/golang-lru" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" @@ -12,15 +12,22 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/replicator" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) +// NodeLoader provides application load statistics. +type nodeLoader interface { + // ObjectServiceLoad returns object service load value in [0:1] range. + ObjectServiceLoad() float64 +} + // Policer represents the utility that verifies // compliance with the object storage policy. type Policer struct { *cfg - prevTask prevTask + cache *lru.Cache } // Option is an option for Policer constructor. @@ -33,12 +40,8 @@ type RedundantCopyCallback func(*object.Address) type cfg struct { headTimeout time.Duration - workScope workScope - log *logger.Logger - trigger <-chan *Task - jobQueue jobQueue cnrSrc container.Source @@ -52,11 +55,25 @@ type cfg struct { replicator *replicator.Replicator cbRedundantCopy RedundantCopyCallback + + taskPool *ants.Pool + + loader nodeLoader + + maxCapacity int + + batchSize, cacheSize uint32 + + rebalanceFreq, evictDuration time.Duration } func defaultCfg() *cfg { return &cfg{ - log: zap.L(), + log: zap.L(), + batchSize: 10, + cacheSize: 200_000, // should not allocate more than 200 MiB + rebalanceFreq: 1 * time.Second, + evictDuration: 30 * time.Second, } } @@ -70,12 +87,14 @@ func New(opts ...Option) *Policer { c.log = c.log.With(zap.String("component", "Object Policer")) + cache, err := lru.New(int(c.cacheSize)) + if err != nil { + panic(err) + } + return &Policer{ - cfg: c, - prevTask: prevTask{ - cancel: func() {}, - wait: new(sync.WaitGroup), - }, + cfg: c, + cache: cache, } } @@ -86,27 +105,6 @@ func WithHeadTimeout(v time.Duration) Option { } } -// WithWorkScope returns option to set job work scope value of Policer. -func WithWorkScope(v int) Option { - return func(c *cfg) { - c.workScope.val = v - } -} - -// WithExpansionRate returns option to set expansion rate of Policer's works scope (in %). -func WithExpansionRate(v int) Option { - return func(c *cfg) { - c.workScope.expRate = v - } -} - -// WithTrigger returns option to set triggering channel of Policer. -func WithTrigger(v <-chan *Task) Option { - return func(c *cfg) { - c.trigger = v - } -} - // WithLogger returns option to set Logger of Policer. func WithLogger(v *logger.Logger) Option { return func(c *cfg) { @@ -164,3 +162,26 @@ func WithRedundantCopyCallback(cb RedundantCopyCallback) Option { c.cbRedundantCopy = cb } } + +// WithMaxCapacity returns option to set max capacity +// that can be set to the pool. +func WithMaxCapacity(cap int) Option { + return func(c *cfg) { + c.maxCapacity = cap + } +} + +// WithPool returns option to set pool for +// policy and replication operations. +func WithPool(p *ants.Pool) Option { + return func(c *cfg) { + c.taskPool = p + } +} + +// WithNodeLoader returns option to set NeoFS node load source. +func WithNodeLoader(l nodeLoader) Option { + return func(c *cfg) { + c.loader = l + } +} diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index eb9fbd6b..1f35630f 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -2,113 +2,84 @@ package policer import ( "context" - "sync" + "errors" + "time" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + "github.com/nspcc-dev/neofs-sdk-go/object" "go.uber.org/zap" ) -// Task represents group of Policer tact parameters. -type Task struct{} - -type prevTask struct { - undone int - - cancel context.CancelFunc - - wait *sync.WaitGroup -} - -type workScope struct { - val int - - expRate int // in % -} - func (p *Policer) Run(ctx context.Context) { defer func() { p.log.Info("routine stopped") }() - p.log.Info("process routine", - zap.Int("work scope value", p.workScope.val), - zap.Int("expansion rate (%)", p.workScope.val), - zap.Duration("head timeout", p.headTimeout), + go p.poolCapacityWorker(ctx) + p.shardPolicyWorker(ctx) +} + +func (p *Policer) shardPolicyWorker(ctx context.Context) { + var ( + addrs []*object.Address + cursor *engine.Cursor + err error ) + for { + addrs, cursor, err = p.jobQueue.Select(cursor, p.batchSize) + if err != nil { + if errors.Is(err, engine.ErrEndOfListing) { + time.Sleep(time.Second) // finished whole cycle, sleep a bit + continue + } + p.log.Warn("failure at object select for replication", zap.Error(err)) + } + + for i := range addrs { + select { + case <-ctx.Done(): + return + default: + addr := addrs[i] + addrStr := addr.String() + err = p.taskPool.Submit(func() { + v, ok := p.cache.Get(addrStr) + if ok && time.Since(v.(time.Time)) < p.evictDuration { + return + } + + p.processObject(ctx, addr) + p.cache.Add(addrStr, time.Now()) + }) + if err != nil { + p.log.Warn("pool submission", zap.Error(err)) + } + } + } + } +} + +func (p *Policer) poolCapacityWorker(ctx context.Context) { + ticker := time.NewTicker(p.rebalanceFreq) for { select { case <-ctx.Done(): - p.prevTask.cancel() - - p.log.Warn("context is done", - zap.String("error", ctx.Err().Error()), - ) - + ticker.Stop() return - case task, ok := <-p.trigger: - if !ok { - p.log.Warn("trigger channel is closed") - - return + case <-ticker.C: + neofsSysLoad := p.loader.ObjectServiceLoad() + newCapacity := int((1.0 - neofsSysLoad) * float64(p.maxCapacity)) + if newCapacity == 0 { + newCapacity++ } - p.prevTask.cancel() - p.prevTask.wait.Wait() - - var taskCtx context.Context - - taskCtx, p.prevTask.cancel = context.WithCancel(ctx) - - go p.handleTask(taskCtx, task) + if p.taskPool.Cap() != newCapacity { + p.taskPool.Tune(newCapacity) + p.log.Debug("tune replication capacity", + zap.Float64("system_load", neofsSysLoad), + zap.Int("new_capacity", newCapacity)) + } } } } - -func (p *Policer) handleTask(ctx context.Context, task *Task) { - p.prevTask.wait.Add(1) - - defer func() { - p.prevTask.wait.Done() - p.log.Info("finish work", - zap.Int("amount of unfinished objects", p.prevTask.undone), - ) - }() - - var delta int - - // undone - amount of objects we couldn't process in last epoch - if p.prevTask.undone > 0 { - // if there are unprocessed objects, then lower your estimation - delta = -p.prevTask.undone - } else { - // otherwise try to expand - delta = p.workScope.val * p.workScope.expRate / 100 - } - - addrs, err := p.jobQueue.Select(p.workScope.val + delta) - if err != nil { - p.log.Warn("could not select objects", - zap.String("error", err.Error()), - ) - } - - // if there are NOT enough objects to fill the pool, do not change it - // otherwise expand or shrink it with the delta value - if len(addrs) >= p.workScope.val+delta { - p.workScope.val += delta - } - - p.prevTask.undone = len(addrs) - - for i := range addrs { - select { - case <-ctx.Done(): - return - default: - } - - p.processObject(ctx, addrs[i]) - - p.prevTask.undone-- - } -} diff --git a/pkg/services/policer/queue.go b/pkg/services/policer/queue.go index f3864a6c..44bc5706 100644 --- a/pkg/services/policer/queue.go +++ b/pkg/services/policer/queue.go @@ -1,8 +1,9 @@ package policer import ( + "fmt" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/util/rand" "github.com/nspcc-dev/neofs-sdk-go/object" ) @@ -10,23 +11,15 @@ type jobQueue struct { localStorage *engine.StorageEngine } -func (q *jobQueue) Select(limit int) ([]*object.Address, error) { - // TODO: optimize the logic for selecting objects - // We can prioritize objects for migration, newly arrived objects, etc. - // It is recommended to make changes after updating the metabase +func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]*object.Address, *engine.Cursor, error) { + prm := new(engine.ListWithCursorPrm) + prm.WithCursor(cursor) + prm.WithCount(count) - res, err := engine.List(q.localStorage, 0) // consider some limit + res, err := q.localStorage.ListWithCursor(prm) if err != nil { - return nil, err + return nil, nil, fmt.Errorf("cannot list objects in engine: %w", err) } - rand.New().Shuffle(len(res), func(i, j int) { - res[i], res[j] = res[j], res[i] - }) - - if len(res) < limit { - return res, nil - } - - return res[:limit], nil + return res.AddressList(), res.Cursor(), nil }