package policer import ( "context" "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "go.uber.org/zap" ) func (p *Policer) Run(ctx context.Context) { p.shardPolicyWorker(ctx) p.log.Info(logs.PolicerRoutineStopped) } func (p *Policer) shardPolicyWorker(ctx context.Context) { for { select { case <-ctx.Done(): return default: } addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { p.keySpaceIterator.Rewind() time.Sleep(p.sleepDuration) // finished whole cycle, sleep a bit continue } p.log.Warn(logs.PolicerFailureAtObjectSelectForReplication, zap.Error(err)) } for i := range addrs { select { case <-ctx.Done(): return default: addr := addrs[i] if p.objsInWork.inWork(addr.Address) { // do not process an object // that is in work continue } err := p.taskPool.Submit(func() { v, ok := p.cache.Get(addr.Address) if ok && time.Since(v) < p.evictDuration { return } if p.objsInWork.add(addr.Address) { err := p.processObject(ctx, addr) if err != nil { p.log.Error(logs.PolicerUnableToProcessObj, zap.Stringer("object", addr.Address), zap.String("error", err.Error())) } p.cache.Add(addr.Address, time.Now()) p.objsInWork.remove(addr.Address) p.metrics.IncProcessedObjects() } }) if err != nil { p.log.Warn(logs.PolicerPoolSubmission, zap.Error(err)) } } } } }