[#845] object/put: Separate pools for local and remote operations

In previous implementation Object PUT used single pool of workers for local
and remote ops, but these ops are heterogeneous.

Use remote/local pool for remote/local operations in PUT service. At first
the pools are configured with the same size.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-09-24 13:36:54 +03:00 committed by Alex Vanin
parent 3b2b6007c6
commit ee20200c2e
5 changed files with 25 additions and 10 deletions

View file

@ -195,7 +195,7 @@ type cfgLocalStorage struct {
} }
type cfgObjectRoutines struct { type cfgObjectRoutines struct {
put *ants.Pool putRemote, putLocal *ants.Pool
} }
type cfgControlService struct { type cfgControlService struct {
@ -435,7 +435,12 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
optNonBlocking := ants.WithNonblocking(true) optNonBlocking := ants.WithNonblocking(true)
pool.put, err = ants.NewPool(objectconfig.Put(cfg).PoolSize(), optNonBlocking) pool.putRemote, err = ants.NewPool(objectconfig.Put(cfg).PoolSize(), optNonBlocking)
if err != nil {
fatalOnErr(err)
}
pool.putLocal, err = ants.NewPool(objectconfig.Put(cfg).PoolSize(), optNonBlocking)
if err != nil { if err != nil {
fatalOnErr(err) fatalOnErr(err)
} }

View file

@ -281,7 +281,7 @@ func initObjectService(c *cfg) {
objectCore.WithDeleteHandler(objInhumer), objectCore.WithDeleteHandler(objInhumer),
), ),
putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithNetworkState(c.cfgNetmap.state),
putsvc.WithWorkerPool(c.cfgObject.pool.put), putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
putsvc.WithLogger(c.log), putsvc.WithLogger(c.log),
) )

View file

@ -16,7 +16,7 @@ import (
type distributedTarget struct { type distributedTarget struct {
traverseOpts []placement.Option traverseOpts []placement.Option
workerPool util.WorkerPool remotePool, localPool util.WorkerPool
obj *object.RawObject obj *object.RawObject
@ -129,7 +129,15 @@ loop:
isLocal := t.isLocalKey(addr.Key()) isLocal := t.isLocalKey(addr.Key())
if err := t.workerPool.Submit(func() { var workerPool util.WorkerPool
if isLocal {
workerPool = t.localPool
} else {
workerPool = t.remotePool
}
if err := workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
if err := f(nodeDesc{local: isLocal, info: addr}); err != nil { if err := f(nodeDesc{local: isLocal, info: addr}); err != nil {

View file

@ -44,7 +44,7 @@ type cfg struct {
netMapSrc netmap.Source netMapSrc netmap.Source
workerPool util.WorkerPool remotePool, localPool util.WorkerPool
netmapKeys netmap.AnnouncedKeys netmapKeys netmap.AnnouncedKeys
@ -61,7 +61,8 @@ type cfg struct {
func defaultCfg() *cfg { func defaultCfg() *cfg {
return &cfg{ return &cfg{
workerPool: new(util.SyncWorkerPool), remotePool: new(util.SyncWorkerPool),
localPool: new(util.SyncWorkerPool),
log: zap.L(), log: zap.L(),
} }
} }
@ -117,9 +118,9 @@ func WithNetworkMapSource(v netmap.Source) Option {
} }
} }
func WithWorkerPool(v util.WorkerPool) Option { func WithWorkerPools(remote, local util.WorkerPool) Option {
return func(c *cfg) { return func(c *cfg) {
c.workerPool = v c.remotePool, c.localPool = remote, local
} }
} }

View file

@ -161,7 +161,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
return &distributedTarget{ return &distributedTarget{
traverseOpts: prm.traverseOpts, traverseOpts: prm.traverseOpts,
workerPool: p.workerPool, remotePool: p.remotePool,
localPool: p.localPool,
nodeTargetInitializer: func(node nodeDesc) transformer.ObjectTarget { nodeTargetInitializer: func(node nodeDesc) transformer.ObjectTarget {
if node.local { if node.local {
return &localTarget{ return &localTarget{