frostfs-node/pkg/services/policer/process.go
Leonard Lyubich 0dab4b7581 [] services: Implement Policer service
Implement Policer service that performs background work to check compliance
with the placement policy for local objects in the container. In the initial
implementation, the selection of the working queue of objects is
simplified, and there is no transfer of the result to the replicator.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2020-10-21 14:42:51 +03:00

114 lines
2 KiB
Go

package policer
import (
"context"
"sync"
"go.uber.org/zap"
)
// Task represents group of Policer tact parameters.
type Task struct{}
type prevTask struct {
undone int
cancel context.CancelFunc
wait *sync.WaitGroup
}
type workScope struct {
val int
expRate int // in %
}
func (p *Policer) Run(ctx context.Context) {
defer func() {
p.log.Info("routine stopped")
}()
p.log.Info("process routine",
zap.Int("work scope value", p.workScope.val),
zap.Int("expansion rate (%)", p.workScope.val),
zap.Duration("head timeout", p.headTimeout),
)
for {
select {
case <-ctx.Done():
p.prevTask.cancel()
p.log.Warn("context is done",
zap.String("error", ctx.Err().Error()),
)
return
case task, ok := <-p.trigger:
if !ok {
p.log.Warn("trigger channel is closed")
return
}
p.prevTask.cancel()
p.prevTask.wait.Wait()
var taskCtx context.Context
taskCtx, p.prevTask.cancel = context.WithCancel(ctx)
go p.handleTask(taskCtx, task)
}
}
}
func (p *Policer) handleTask(ctx context.Context, task *Task) {
p.prevTask.wait.Add(1)
defer func() {
p.prevTask.wait.Done()
p.log.Info("finish work",
zap.Int("amount of unfinished objects", p.prevTask.undone),
)
}()
var delta int
// undone - amount of objects we couldn't process in last epoch
if p.prevTask.undone > 0 {
// if there are unprocessed objects, then lower your estimation
delta = -p.prevTask.undone
} else {
// otherwise try to expand
delta = p.workScope.val * p.workScope.expRate / 100
}
addrs, err := p.jobQueue.Select(p.workScope.val + delta)
if err != nil {
p.log.Warn("could not select objects",
zap.String("error", err.Error()),
)
}
// if there are NOT enough objects to fill the pool, do not change it
// otherwise expand or shrink it with the delta value
if len(addrs) >= p.workScope.val+delta {
p.workScope.val += delta
}
p.prevTask.undone = len(addrs)
for i := range addrs {
select {
case <-ctx.Done():
return
default:
}
p.processObject(ctx, addrs[i])
p.prevTask.undone--
}
}