frostfs-node/pkg/services/policer/process.go

115 lines
2 KiB
Go
Raw Normal View History

package policer
import (
"context"
"sync"
"go.uber.org/zap"
)
// Task represents group of Policer tact parameters.
type Task struct{}
type prevTask struct {
undone int
cancel context.CancelFunc
wait *sync.WaitGroup
}
type workScope struct {
val int
expRate int // in %
}
func (p *Policer) Run(ctx context.Context) {
defer func() {
p.log.Info("routine stopped")
}()
p.log.Info("process routine",
zap.Int("work scope value", p.workScope.val),
zap.Int("expansion rate (%)", p.workScope.val),
zap.Duration("head timeout", p.headTimeout),
)
for {
select {
case <-ctx.Done():
p.prevTask.cancel()
p.log.Warn("context is done",
zap.String("error", ctx.Err().Error()),
)
return
case task, ok := <-p.trigger:
if !ok {
p.log.Warn("trigger channel is closed")
return
}
p.prevTask.cancel()
p.prevTask.wait.Wait()
var taskCtx context.Context
taskCtx, p.prevTask.cancel = context.WithCancel(ctx)
go p.handleTask(taskCtx, task)
}
}
}
func (p *Policer) handleTask(ctx context.Context, task *Task) {
p.prevTask.wait.Add(1)
defer func() {
p.prevTask.wait.Done()
p.log.Info("finish work",
zap.Int("amount of unfinished objects", p.prevTask.undone),
)
}()
var delta int
// undone - amount of objects we couldn't process in last epoch
if p.prevTask.undone > 0 {
// if there are unprocessed objects, then lower your estimation
delta = -p.prevTask.undone
} else {
// otherwise try to expand
delta = p.workScope.val * p.workScope.expRate / 100
}
addrs, err := p.jobQueue.Select(p.workScope.val + delta)
if err != nil {
p.log.Warn("could not select objects",
zap.String("error", err.Error()),
)
}
// if there are NOT enough objects to fill the pool, do not change it
// otherwise expand or shrink it with the delta value
if len(addrs) >= p.workScope.val+delta {
p.workScope.val += delta
}
p.prevTask.undone = len(addrs)
for i := range addrs {
select {
case <-ctx.Done():
return
default:
}
p.processObject(ctx, addrs[i])
p.prevTask.undone--
}
}