0dab4b7581
Implement Policer service that performs background work to check compliance with the placement policy for local objects in the container. In the initial implementation, the selection of the working queue of objects is simplified, and there is no transfer of the result to the replicator. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
114 lines
2 KiB
Go
114 lines
2 KiB
Go
package policer
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Task represents group of Policer tact parameters.
|
|
type Task struct{}
|
|
|
|
type prevTask struct {
|
|
undone int
|
|
|
|
cancel context.CancelFunc
|
|
|
|
wait *sync.WaitGroup
|
|
}
|
|
|
|
type workScope struct {
|
|
val int
|
|
|
|
expRate int // in %
|
|
}
|
|
|
|
func (p *Policer) Run(ctx context.Context) {
|
|
defer func() {
|
|
p.log.Info("routine stopped")
|
|
}()
|
|
|
|
p.log.Info("process routine",
|
|
zap.Int("work scope value", p.workScope.val),
|
|
zap.Int("expansion rate (%)", p.workScope.val),
|
|
zap.Duration("head timeout", p.headTimeout),
|
|
)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
p.prevTask.cancel()
|
|
|
|
p.log.Warn("context is done",
|
|
zap.String("error", ctx.Err().Error()),
|
|
)
|
|
|
|
return
|
|
case task, ok := <-p.trigger:
|
|
if !ok {
|
|
p.log.Warn("trigger channel is closed")
|
|
|
|
return
|
|
}
|
|
|
|
p.prevTask.cancel()
|
|
p.prevTask.wait.Wait()
|
|
|
|
var taskCtx context.Context
|
|
|
|
taskCtx, p.prevTask.cancel = context.WithCancel(ctx)
|
|
|
|
go p.handleTask(taskCtx, task)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Policer) handleTask(ctx context.Context, task *Task) {
|
|
p.prevTask.wait.Add(1)
|
|
|
|
defer func() {
|
|
p.prevTask.wait.Done()
|
|
p.log.Info("finish work",
|
|
zap.Int("amount of unfinished objects", p.prevTask.undone),
|
|
)
|
|
}()
|
|
|
|
var delta int
|
|
|
|
// undone - amount of objects we couldn't process in last epoch
|
|
if p.prevTask.undone > 0 {
|
|
// if there are unprocessed objects, then lower your estimation
|
|
delta = -p.prevTask.undone
|
|
} else {
|
|
// otherwise try to expand
|
|
delta = p.workScope.val * p.workScope.expRate / 100
|
|
}
|
|
|
|
addrs, err := p.jobQueue.Select(p.workScope.val + delta)
|
|
if err != nil {
|
|
p.log.Warn("could not select objects",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
|
|
// if there are NOT enough objects to fill the pool, do not change it
|
|
// otherwise expand or shrink it with the delta value
|
|
if len(addrs) >= p.workScope.val+delta {
|
|
p.workScope.val += delta
|
|
}
|
|
|
|
p.prevTask.undone = len(addrs)
|
|
|
|
for i := range addrs {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
p.processObject(ctx, addrs[i])
|
|
|
|
p.prevTask.undone--
|
|
}
|
|
}
|