From f762855a5d4c632691884b00312bf5eb4fd0b600 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 30 Dec 2022 14:40:34 +0300 Subject: [PATCH] [#6] services/object: Reduce `distibutedTarget` memory footprint Signed-off-by: Evgenii Stratonikov --- pkg/services/object/put/distributed.go | 15 ++------------- pkg/services/object/put/streamer.go | 15 ++++++++++----- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 40a42a33a..c697d338b 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -23,8 +23,6 @@ type preparedObjectTarget interface { type distributedTarget struct { traversal traversal - remotePool, localPool util.WorkerPool - obj *objectSDK.Object objMeta object.ContentMeta @@ -32,7 +30,7 @@ type distributedTarget struct { nodeTargetInitializer func(nodeDesc) preparedObjectTarget - isLocalKey func([]byte) bool + getWorkerPool func([]byte) (util.WorkerPool, bool) relay func(nodeDesc) error @@ -195,16 +193,7 @@ loop: addr := addrs[i] - isLocal := t.isLocalKey(addr.PublicKey()) - - var workerPool util.WorkerPool - - if isLocal { - workerPool = t.localPool - } else { - workerPool = t.remotePool - } - + workerPool, isLocal := t.getWorkerPool(addr.PublicKey()) if err := workerPool.Submit(func() { defer wg.Done() diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index e5942d099..f088f624f 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -10,6 +10,7 @@ import ( "github.com/TrueCloudLab/frostfs-node/pkg/services/object/util" "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "github.com/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer" + pkgutil "github.com/TrueCloudLab/frostfs-node/pkg/util" containerSDK "github.com/TrueCloudLab/frostfs-sdk-go/container" "github.com/TrueCloudLab/frostfs-sdk-go/object" "github.com/TrueCloudLab/frostfs-sdk-go/user" @@ -215,9 +216,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { extraBroadcastEnabled: withBroadcast, }, - payload: getPayload(), - remotePool: p.remotePool, - localPool: p.localPool, + payload: getPayload(), + getWorkerPool: p.getWorkerPool, nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { if node.local { return &localTarget{ @@ -239,8 +239,6 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { relay: relay, fmt: p.fmtValidator, log: p.log, - - isLocalKey: p.netmapKeys.IsLocalKey, } } @@ -277,3 +275,10 @@ func (p *Streamer) Close() (*PutResponse, error) { id: ids.SelfID(), }, nil } + +func (p *Streamer) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) { + if p.netmapKeys.IsLocalKey(pub) { + return p.localPool, true + } + return p.remotePool, false +}