package policer import ( "context" "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.uber.org/zap" ) func (p *Policer) Run(ctx context.Context) { p.shardPolicyWorker(ctx) p.log.Info(logs.PolicerRoutineStopped) } func (p *Policer) shardPolicyWorker(ctx context.Context) { for { select { case <-ctx.Done(): p.taskPool.Release() return default: } addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { p.keySpaceIterator.Rewind() time.Sleep(p.sleepDuration) // finished whole cycle, sleep a bit continue } p.log.Warn(logs.PolicerFailureAtObjectSelectForReplication, zap.Error(err)) } // contains all errors logged in this iteration for each container cnrErrSkip := make(map[cid.ID][]error) for i := range addrs { select { case <-ctx.Done(): p.taskPool.Release() return default: addr := addrs[i] if p.objsInWork.inWork(addr.Address) { // do not process an object // that is in work continue } err := p.taskPool.Submit(func() { v, ok := p.cache.Get(addr.Address) if ok && time.Since(v) < p.evictDuration { return } if p.objsInWork.add(addr.Address) { err := p.processObject(ctx, addr) if err != nil && !skipLog(cnrErrSkip[addr.Address.Container()], err) { p.log.Error(logs.PolicerUnableToProcessObj, zap.Stringer("object", addr.Address), zap.String("error", err.Error())) cnrErrSkip[addr.Address.Container()] = append(cnrErrSkip[addr.Address.Container()], err) } p.cache.Add(addr.Address, time.Now()) p.objsInWork.remove(addr.Address) p.metrics.IncProcessedObjects() } }) if err != nil { p.log.Warn(logs.PolicerPoolSubmission, zap.Error(err)) } } } } } func skipLog(errs []error, err error) bool { for _, e := range errs { if errors.Is(err, e) { return true } } return false }