[#965] policer: Implement continuous replication

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-11-10 11:34:00 +03:00 committed by Alex Vanin
parent 20f0b29a6e
commit a74a402a7d
5 changed files with 145 additions and 159 deletions

View file

@ -2,113 +2,84 @@ package policer
import (
"context"
"sync"
"errors"
"time"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
)
// Task represents group of Policer tact parameters.
type Task struct{}
type prevTask struct {
undone int
cancel context.CancelFunc
wait *sync.WaitGroup
}
type workScope struct {
val int
expRate int // in %
}
func (p *Policer) Run(ctx context.Context) {
defer func() {
p.log.Info("routine stopped")
}()
p.log.Info("process routine",
zap.Int("work scope value", p.workScope.val),
zap.Int("expansion rate (%)", p.workScope.val),
zap.Duration("head timeout", p.headTimeout),
go p.poolCapacityWorker(ctx)
p.shardPolicyWorker(ctx)
}
func (p *Policer) shardPolicyWorker(ctx context.Context) {
var (
addrs []*object.Address
cursor *engine.Cursor
err error
)
for {
addrs, cursor, err = p.jobQueue.Select(cursor, p.batchSize)
if err != nil {
if errors.Is(err, engine.ErrEndOfListing) {
time.Sleep(time.Second) // finished whole cycle, sleep a bit
continue
}
p.log.Warn("failure at object select for replication", zap.Error(err))
}
for i := range addrs {
select {
case <-ctx.Done():
return
default:
addr := addrs[i]
addrStr := addr.String()
err = p.taskPool.Submit(func() {
v, ok := p.cache.Get(addrStr)
if ok && time.Since(v.(time.Time)) < p.evictDuration {
return
}
p.processObject(ctx, addr)
p.cache.Add(addrStr, time.Now())
})
if err != nil {
p.log.Warn("pool submission", zap.Error(err))
}
}
}
}
}
func (p *Policer) poolCapacityWorker(ctx context.Context) {
ticker := time.NewTicker(p.rebalanceFreq)
for {
select {
case <-ctx.Done():
p.prevTask.cancel()
p.log.Warn("context is done",
zap.String("error", ctx.Err().Error()),
)
ticker.Stop()
return
case task, ok := <-p.trigger:
if !ok {
p.log.Warn("trigger channel is closed")
return
case <-ticker.C:
neofsSysLoad := p.loader.ObjectServiceLoad()
newCapacity := int((1.0 - neofsSysLoad) * float64(p.maxCapacity))
if newCapacity == 0 {
newCapacity++
}
p.prevTask.cancel()
p.prevTask.wait.Wait()
var taskCtx context.Context
taskCtx, p.prevTask.cancel = context.WithCancel(ctx)
go p.handleTask(taskCtx, task)
if p.taskPool.Cap() != newCapacity {
p.taskPool.Tune(newCapacity)
p.log.Debug("tune replication capacity",
zap.Float64("system_load", neofsSysLoad),
zap.Int("new_capacity", newCapacity))
}
}
}
}
func (p *Policer) handleTask(ctx context.Context, task *Task) {
p.prevTask.wait.Add(1)
defer func() {
p.prevTask.wait.Done()
p.log.Info("finish work",
zap.Int("amount of unfinished objects", p.prevTask.undone),
)
}()
var delta int
// undone - amount of objects we couldn't process in last epoch
if p.prevTask.undone > 0 {
// if there are unprocessed objects, then lower your estimation
delta = -p.prevTask.undone
} else {
// otherwise try to expand
delta = p.workScope.val * p.workScope.expRate / 100
}
addrs, err := p.jobQueue.Select(p.workScope.val + delta)
if err != nil {
p.log.Warn("could not select objects",
zap.String("error", err.Error()),
)
}
// if there are NOT enough objects to fill the pool, do not change it
// otherwise expand or shrink it with the delta value
if len(addrs) >= p.workScope.val+delta {
p.workScope.val += delta
}
p.prevTask.undone = len(addrs)
for i := range addrs {
select {
case <-ctx.Done():
return
default:
}
p.processObject(ctx, addrs[i])
p.prevTask.undone--
}
}