package policer import ( "context" "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "go.uber.org/zap" ) func (p *Policer) Run(ctx context.Context) { defer func() { p.log.Info(logs.PolicerRoutineStopped) }() go p.poolCapacityWorker(ctx) p.shardPolicyWorker(ctx) } func (p *Policer) shardPolicyWorker(ctx context.Context) { var ( addrs []objectcore.AddressWithType cursor *engine.Cursor err error ) for { select { case <-ctx.Done(): return default: } addrs, cursor, err = p.jobQueue.Select(cursor, p.batchSize) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { time.Sleep(time.Second) // finished whole cycle, sleep a bit continue } p.log.Warn(logs.PolicerFailureAtObjectSelectForReplication, zap.Error(err)) } for i := range addrs { select { case <-ctx.Done(): return default: addr := addrs[i] if p.objsInWork.inWork(addr.Address) { // do not process an object // that is in work continue } err = p.taskPool.Submit(func() { v, ok := p.cache.Get(addr.Address) if ok && time.Since(v) < p.evictDuration { return } p.objsInWork.add(addr.Address) p.processObject(ctx, addr) p.cache.Add(addr.Address, time.Now()) p.objsInWork.remove(addr.Address) }) if err != nil { p.log.Warn(logs.PolicerPoolSubmission, zap.Error(err)) } } } } } func (p *Policer) poolCapacityWorker(ctx context.Context) { ticker := time.NewTicker(p.rebalanceFreq) for { select { case <-ctx.Done(): ticker.Stop() return case <-ticker.C: frostfsSysLoad := p.loader.ObjectServiceLoad() newCapacity := int((1.0 - frostfsSysLoad) * float64(p.maxCapacity)) if newCapacity == 0 { newCapacity++ } if p.taskPool.Cap() != newCapacity { p.taskPool.Tune(newCapacity) p.log.Debug(logs.PolicerTuneReplicationCapacity, zap.Float64("system_load", frostfsSysLoad), zap.Int("new_capacity", newCapacity)) } } } }