frostfs-node/pkg/services/policer/process.go
Ekaterina Lebedeva afd2ba9a66
All checks were successful
DCO action / DCO (pull_request) Successful in 2m55s
Vulncheck / Vulncheck (pull_request) Successful in 3m22s
Tests and linters / Staticcheck (pull_request) Successful in 4m10s
Build / Build Components (1.20) (pull_request) Successful in 4m58s
Build / Build Components (1.21) (pull_request) Successful in 4m54s
Tests and linters / Lint (pull_request) Successful in 5m47s
Tests and linters / Tests (1.20) (pull_request) Successful in 6m45s
Tests and linters / Tests (1.21) (pull_request) Successful in 6m58s
Tests and linters / Tests with -race (pull_request) Successful in 7m42s
[#110] Add check for repeated error log in policer
processObject() returns 3 types of errors: container not found errors,
could not get container error and placement vector building error. Every
error will occur for all objects in container simultaneously, so we can
log each error once and safely ignore the rest.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-02-06 00:56:41 +03:00

89 lines
2.1 KiB
Go

package policer
import (
"context"
"errors"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
)
func (p *Policer) Run(ctx context.Context) {
p.shardPolicyWorker(ctx)
p.log.Info(logs.PolicerRoutineStopped)
}
func (p *Policer) shardPolicyWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
p.taskPool.Release()
return
default:
}
addrs, err := p.keySpaceIterator.Next(ctx, p.batchSize)
if err != nil {
if errors.Is(err, engine.ErrEndOfListing) {
p.keySpaceIterator.Rewind()
time.Sleep(p.sleepDuration) // finished whole cycle, sleep a bit
continue
}
p.log.Warn(logs.PolicerFailureAtObjectSelectForReplication, zap.Error(err))
}
// contains all errors logged in this iteration for each container
cnrErrSkip := make(map[cid.ID][]error)
for i := range addrs {
select {
case <-ctx.Done():
p.taskPool.Release()
return
default:
addr := addrs[i]
if p.objsInWork.inWork(addr.Address) {
// do not process an object
// that is in work
continue
}
err := p.taskPool.Submit(func() {
v, ok := p.cache.Get(addr.Address)
if ok && time.Since(v) < p.evictDuration {
return
}
if p.objsInWork.add(addr.Address) {
err := p.processObject(ctx, addr)
if err != nil && !skipLog(cnrErrSkip[addr.Address.Container()], err) {
p.log.Error(logs.PolicerUnableToProcessObj,
zap.Stringer("object", addr.Address),
zap.String("error", err.Error()))
cnrErrSkip[addr.Address.Container()] = append(cnrErrSkip[addr.Address.Container()], err)
}
p.cache.Add(addr.Address, time.Now())
p.objsInWork.remove(addr.Address)
p.metrics.IncProcessedObjects()
}
})
if err != nil {
p.log.Warn(logs.PolicerPoolSubmission, zap.Error(err))
}
}
}
}
}
func skipLog(errs []error, err error) bool {
for _, e := range errs {
if errors.Is(err, e) {
return true
}
}
return false
}