forked from TrueCloudLab/frostfs-node
115 lines
2 KiB
Go
115 lines
2 KiB
Go
|
package policer
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"sync"
|
||
|
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
// Task represents group of Policer tact parameters.
|
||
|
type Task struct{}
|
||
|
|
||
|
type prevTask struct {
|
||
|
undone int
|
||
|
|
||
|
cancel context.CancelFunc
|
||
|
|
||
|
wait *sync.WaitGroup
|
||
|
}
|
||
|
|
||
|
type workScope struct {
|
||
|
val int
|
||
|
|
||
|
expRate int // in %
|
||
|
}
|
||
|
|
||
|
func (p *Policer) Run(ctx context.Context) {
|
||
|
defer func() {
|
||
|
p.log.Info("routine stopped")
|
||
|
}()
|
||
|
|
||
|
p.log.Info("process routine",
|
||
|
zap.Int("work scope value", p.workScope.val),
|
||
|
zap.Int("expansion rate (%)", p.workScope.val),
|
||
|
zap.Duration("head timeout", p.headTimeout),
|
||
|
)
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
p.prevTask.cancel()
|
||
|
|
||
|
p.log.Warn("context is done",
|
||
|
zap.String("error", ctx.Err().Error()),
|
||
|
)
|
||
|
|
||
|
return
|
||
|
case task, ok := <-p.trigger:
|
||
|
if !ok {
|
||
|
p.log.Warn("trigger channel is closed")
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
p.prevTask.cancel()
|
||
|
p.prevTask.wait.Wait()
|
||
|
|
||
|
var taskCtx context.Context
|
||
|
|
||
|
taskCtx, p.prevTask.cancel = context.WithCancel(ctx)
|
||
|
|
||
|
go p.handleTask(taskCtx, task)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (p *Policer) handleTask(ctx context.Context, task *Task) {
|
||
|
p.prevTask.wait.Add(1)
|
||
|
|
||
|
defer func() {
|
||
|
p.prevTask.wait.Done()
|
||
|
p.log.Info("finish work",
|
||
|
zap.Int("amount of unfinished objects", p.prevTask.undone),
|
||
|
)
|
||
|
}()
|
||
|
|
||
|
var delta int
|
||
|
|
||
|
// undone - amount of objects we couldn't process in last epoch
|
||
|
if p.prevTask.undone > 0 {
|
||
|
// if there are unprocessed objects, then lower your estimation
|
||
|
delta = -p.prevTask.undone
|
||
|
} else {
|
||
|
// otherwise try to expand
|
||
|
delta = p.workScope.val * p.workScope.expRate / 100
|
||
|
}
|
||
|
|
||
|
addrs, err := p.jobQueue.Select(p.workScope.val + delta)
|
||
|
if err != nil {
|
||
|
p.log.Warn("could not select objects",
|
||
|
zap.String("error", err.Error()),
|
||
|
)
|
||
|
}
|
||
|
|
||
|
// if there are NOT enough objects to fill the pool, do not change it
|
||
|
// otherwise expand or shrink it with the delta value
|
||
|
if len(addrs) >= p.workScope.val+delta {
|
||
|
p.workScope.val += delta
|
||
|
}
|
||
|
|
||
|
p.prevTask.undone = len(addrs)
|
||
|
|
||
|
for i := range addrs {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
p.processObject(ctx, addrs[i])
|
||
|
|
||
|
p.prevTask.undone--
|
||
|
}
|
||
|
}
|