diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 47104b323..268303f9b 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -23,8 +23,6 @@ type preparedObjectTarget interface { type distributedTarget struct { traversal traversal - remotePool, localPool util.WorkerPool - obj *objectSDK.Object objMeta object.ContentMeta @@ -32,7 +30,7 @@ type distributedTarget struct { nodeTargetInitializer func(nodeDesc) preparedObjectTarget - isLocalKey func([]byte) bool + getWorkerPool func([]byte) (util.WorkerPool, bool) relay func(nodeDesc) error @@ -196,16 +194,7 @@ loop: addr := addrs[i] - isLocal := t.isLocalKey(addr.PublicKey()) - - var workerPool util.WorkerPool - - if isLocal { - workerPool = t.localPool - } else { - workerPool = t.remotePool - } - + workerPool, isLocal := t.getWorkerPool(addr.PublicKey()) if err := workerPool.Submit(func() { defer wg.Done() diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 915b718a3..53de626f5 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer" + pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" @@ -221,9 +222,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { extraBroadcastEnabled: withBroadcast, }, - payload: getPayload(), - remotePool: p.remotePool, - localPool: p.localPool, + payload: getPayload(), + getWorkerPool: p.getWorkerPool, nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { if node.local { return &localTarget{ @@ -245,8 +245,6 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { relay: relay, fmt: p.fmtValidator, log: p.log, - - isLocalKey: p.netmapKeys.IsLocalKey, } } @@ -283,3 +281,10 @@ func (p *Streamer) Close() (*PutResponse, error) { id: ids.SelfID(), }, nil } + +func (p *Streamer) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) { + if p.netmapKeys.IsLocalKey(pub) { + return p.localPool, true + } + return p.remotePool, false +}