From a422f42ca93ae090066f2ccc23e2ee86cd8be5fa Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 28 May 2021 10:34:31 +0300 Subject: [PATCH] [#501] object/put: refactor distributed target Signed-off-by: Evgenii Stratonikov --- pkg/services/object/put/distributed.go | 43 ++++++++++++++------------ 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 40442b3f..0b35f284 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -45,13 +45,6 @@ func (t *distributedTarget) Write(p []byte) (n int, err error) { } func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { - traverser, err := placement.NewTraverser( - append(t.traverseOpts, placement.ForObject(t.obj.ID()))..., - ) - if err != nil { - return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err) - } - sz := 0 for i := range t.chunks { @@ -70,6 +63,28 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err) } + return t.iteratePlacement(t.sendObject) +} + +func (t *distributedTarget) sendObject(addr *network.Address) error { + target := t.nodeTargetInitializer(addr) + + if err := target.WriteHeader(t.obj); err != nil { + return fmt.Errorf("could not write header: %w", err) + } else if _, err := target.Close(); err != nil { + return fmt.Errorf("could not close object stream: %w", err) + } + return nil +} + +func (t *distributedTarget) iteratePlacement(f func(*network.Address) error) (*transformer.AccessIdentifiers, error) { + traverser, err := placement.NewTraverser( + append(t.traverseOpts, placement.ForObject(t.obj.ID()))..., + ) + if err != nil { + return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err) + } + loop: for { addrs := traverser.Next() @@ -83,21 +98,11 @@ loop: wg.Add(1) addr := addrs[i] - if err := t.workerPool.Submit(func() { defer wg.Done() - target := t.nodeTargetInitializer(addr) - - if err := target.WriteHeader(t.obj); err != nil { - svcutil.LogServiceError(t.log, "PUT", addr, - fmt.Errorf("could not write header: %w", err)) - - return - } else if _, err := target.Close(); err != nil { - svcutil.LogServiceError(t.log, "PUT", addr, - fmt.Errorf("could not close object stream: %w", err)) - + if err := f(addr); err != nil { + svcutil.LogServiceError(t.log, "PUT", addr, err) return }