diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index a5ebb0010..2fa87c40f 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -7,6 +7,7 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.uber.org/zap" @@ -18,10 +19,11 @@ func (p *Policer) Run(ctx context.Context) { } func (p *Policer) shardPolicyWorker(ctx context.Context) { + defer p.taskPool.Release() + for { select { case <-ctx.Done(): - p.taskPool.Release() return default: } @@ -37,45 +39,48 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { } skipMap := newSkipMap() - for i := range addrs { + for _, addr := range addrs { select { case <-ctx.Done(): - p.taskPool.Release() return default: - addr := addrs[i] - if p.objsInWork.inWork(addr.Address) { - // do not process an object - // that is in work - continue - } + } + if p.objsInWork.inWork(addr.Address) { + // do not process an object that is in work + continue + } - err := p.taskPool.Submit(func() { - v, ok := p.cache.Get(addr.Address) - if ok && time.Since(v) < p.evictDuration { - return - } - - if p.objsInWork.add(addr.Address) { - err := p.processObject(ctx, addr) - if err != nil && !skipMap.addSeenError(addr.Address.Container(), err) { - p.log.Error(logs.PolicerUnableToProcessObj, - zap.Stringer("object", addr.Address), - zap.String("error", err.Error())) - } - p.cache.Add(addr.Address, time.Now()) - p.objsInWork.remove(addr.Address) - p.metrics.IncProcessedObjects() - } - }) - if err != nil { - p.log.Warn(logs.PolicerPoolSubmission, zap.Error(err)) - } + if err := p.submitPolicerTask(ctx, addr, skipMap); err != nil { + p.log.Warn(logs.PolicerPoolSubmission, zap.Error(err)) } } } } +func (p *Policer) submitPolicerTask(ctx context.Context, addr object.Info, skipMap *errMap) error { + return p.taskPool.Submit(func() { + v, ok := p.cache.Get(addr.Address) + if ok && time.Since(v) < p.evictDuration { + return + } + + if !p.objsInWork.add(addr.Address) { + return + } + + err := p.processObject(ctx, addr) + if err != nil && !skipMap.addSeenError(addr.Address.Container(), err) { + p.log.Error(logs.PolicerUnableToProcessObj, + zap.Stringer("object", addr.Address), + zap.String("error", err.Error())) + } + + p.cache.Add(addr.Address, time.Now()) + p.objsInWork.remove(addr.Address) + p.metrics.IncProcessedObjects() + }) +} + type errMap struct { sync.Mutex skipMap map[cid.ID][]error