diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 9b5f8f09..1eaad145 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -195,7 +195,7 @@ type cfgLocalStorage struct { } type cfgObjectRoutines struct { - put *ants.Pool + putRemote, putLocal *ants.Pool } type cfgControlService struct { @@ -435,7 +435,12 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) { optNonBlocking := ants.WithNonblocking(true) - pool.put, err = ants.NewPool(objectconfig.Put(cfg).PoolSize(), optNonBlocking) + pool.putRemote, err = ants.NewPool(objectconfig.Put(cfg).PoolSize(), optNonBlocking) + if err != nil { + fatalOnErr(err) + } + + pool.putLocal, err = ants.NewPool(objectconfig.Put(cfg).PoolSize(), optNonBlocking) if err != nil { fatalOnErr(err) } diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 364f7b51..8c5b272e 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -281,7 +281,7 @@ func initObjectService(c *cfg) { objectCore.WithDeleteHandler(objInhumer), ), putsvc.WithNetworkState(c.cfgNetmap.state), - putsvc.WithWorkerPool(c.cfgObject.pool.put), + putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), putsvc.WithLogger(c.log), ) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 3bd696b4..7adae55c 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -16,7 +16,7 @@ import ( type distributedTarget struct { traverseOpts []placement.Option - workerPool util.WorkerPool + remotePool, localPool util.WorkerPool obj *object.RawObject @@ -129,7 +129,15 @@ loop: isLocal := t.isLocalKey(addr.Key()) - if err := t.workerPool.Submit(func() { + var workerPool util.WorkerPool + + if isLocal { + workerPool = t.localPool + } else { + workerPool = t.remotePool + } + + if err := workerPool.Submit(func() { defer wg.Done() if err := f(nodeDesc{local: isLocal, info: addr}); err != nil { diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 78af791d..12e84ae3 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -44,7 +44,7 @@ type cfg struct { netMapSrc netmap.Source - workerPool util.WorkerPool + remotePool, localPool util.WorkerPool netmapKeys netmap.AnnouncedKeys @@ -61,7 +61,8 @@ type cfg struct { func defaultCfg() *cfg { return &cfg{ - workerPool: new(util.SyncWorkerPool), + remotePool: new(util.SyncWorkerPool), + localPool: new(util.SyncWorkerPool), log: zap.L(), } } @@ -117,9 +118,9 @@ func WithNetworkMapSource(v netmap.Source) Option { } } -func WithWorkerPool(v util.WorkerPool) Option { +func WithWorkerPools(remote, local util.WorkerPool) Option { return func(c *cfg) { - c.workerPool = v + c.remotePool, c.localPool = remote, local } } diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index dcd9fe71..df52ba32 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -161,7 +161,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { return &distributedTarget{ traverseOpts: prm.traverseOpts, - workerPool: p.workerPool, + remotePool: p.remotePool, + localPool: p.localPool, nodeTargetInitializer: func(node nodeDesc) transformer.ObjectTarget { if node.local { return &localTarget{