[#1507] node: Do not handle object concurrently by the policer

Cache object that are being processed. That prevents concurrent
object handling when there is a few number of objects and object handling
takes more time that the policer needs for starting that object handling one
more time.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
experimental
Pavel Karpy 2022-06-09 19:45:51 +03:00 committed by LeL
parent 256165045b
commit 36f4929e52
3 changed files with 44 additions and 3 deletions

View File

@ -10,6 +10,7 @@ Changelog for NeoFS Node
### Fixed
- Do not replicate object twice to the same node (#1410)
- Concurrent object handling by the Policer (#1411)
### Removed

View File

@ -1,6 +1,7 @@
package policer
import (
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
@ -22,12 +23,39 @@ type nodeLoader interface {
ObjectServiceLoad() float64
}
type objectsInWork struct {
m sync.RWMutex
objs map[oid.Address]struct{}
}
func (oiw *objectsInWork) inWork(addr oid.Address) bool {
oiw.m.RLock()
_, ok := oiw.objs[addr]
oiw.m.RUnlock()
return ok
}
func (oiw *objectsInWork) remove(addr oid.Address) {
oiw.m.Lock()
delete(oiw.objs, addr)
oiw.m.Unlock()
}
func (oiw *objectsInWork) add(addr oid.Address) {
oiw.m.Lock()
oiw.objs[addr] = struct{}{}
oiw.m.Unlock()
}
// Policer represents the utility that verifies
// compliance with the object storage policy.
type Policer struct {
*cfg
cache *lru.Cache
objsInWork *objectsInWork
}
// Option is an option for Policer constructor.
@ -95,6 +123,9 @@ func New(opts ...Option) *Policer {
return &Policer{
cfg: c,
cache: cache,
objsInWork: &objectsInWork{
objs: make(map[oid.Address]struct{}, c.maxCapacity),
},
}
}

View File

@ -48,15 +48,24 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
return
default:
addr := addrs[i]
addrStr := addr.EncodeToString()
if p.objsInWork.inWork(addr) {
// do not process an object
// that is in work
continue
}
err = p.taskPool.Submit(func() {
v, ok := p.cache.Get(addrStr)
v, ok := p.cache.Get(addr)
if ok && time.Since(v.(time.Time)) < p.evictDuration {
return
}
p.objsInWork.add(addr)
p.processObject(ctx, addr)
p.cache.Add(addrStr, time.Now())
p.cache.Add(addr, time.Now())
p.objsInWork.remove(addr)
})
if err != nil {
p.log.Warn("pool submission", zap.Error(err))