diff --git a/CHANGELOG.md b/CHANGELOG.md index f294cb7b6..5637cccfe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Changelog for NeoFS Node ### Fixed - Do not replicate object twice to the same node (#1410) +- Concurrent object handling by the Policer (#1411) ### Removed diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index ac747f585..21807c320 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -1,6 +1,7 @@ package policer import ( + "sync" "time" lru "github.com/hashicorp/golang-lru" @@ -22,12 +23,39 @@ type nodeLoader interface { ObjectServiceLoad() float64 } +type objectsInWork struct { + m sync.RWMutex + objs map[oid.Address]struct{} +} + +func (oiw *objectsInWork) inWork(addr oid.Address) bool { + oiw.m.RLock() + _, ok := oiw.objs[addr] + oiw.m.RUnlock() + + return ok +} + +func (oiw *objectsInWork) remove(addr oid.Address) { + oiw.m.Lock() + delete(oiw.objs, addr) + oiw.m.Unlock() +} + +func (oiw *objectsInWork) add(addr oid.Address) { + oiw.m.Lock() + oiw.objs[addr] = struct{}{} + oiw.m.Unlock() +} + // Policer represents the utility that verifies // compliance with the object storage policy. type Policer struct { *cfg cache *lru.Cache + + objsInWork *objectsInWork } // Option is an option for Policer constructor. @@ -95,6 +123,9 @@ func New(opts ...Option) *Policer { return &Policer{ cfg: c, cache: cache, + objsInWork: &objectsInWork{ + objs: make(map[oid.Address]struct{}, c.maxCapacity), + }, } } diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index 6834d7358..ca360968e 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -48,15 +48,24 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { return default: addr := addrs[i] - addrStr := addr.EncodeToString() + if p.objsInWork.inWork(addr) { + // do not process an object + // that is in work + continue + } + err = p.taskPool.Submit(func() { - v, ok := p.cache.Get(addrStr) + v, ok := p.cache.Get(addr) if ok && time.Since(v.(time.Time)) < p.evictDuration { return } + p.objsInWork.add(addr) + p.processObject(ctx, addr) - p.cache.Add(addrStr, time.Now()) + + p.cache.Add(addr, time.Now()) + p.objsInWork.remove(addr) }) if err != nil { p.log.Warn("pool submission", zap.Error(err))