From 14d894178e29be1e97de160905f3fc7504bd7693 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 3 Apr 2023 12:24:01 +0300 Subject: [PATCH] [#199] putsvc: Refactor placement iterator Resolve funlen linter for iteratePlacement method Signed-off-by: Dmitrii Stepanov --- pkg/services/object/put/distributed.go | 111 ++++++++++++------------- 1 file changed, 53 insertions(+), 58 deletions(-) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 47104b323d..3b5f4ec53e 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -164,7 +164,6 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { return nil } -// nolint: funlen func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) { id, _ := t.obj.ID() @@ -175,72 +174,22 @@ func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transform return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err) } - var resErr atomic.Value + resErr := &atomic.Value{} -loop: for { addrs := traverser.Next() if len(addrs) == 0 { break } - wg := new(sync.WaitGroup) - - for i := range addrs { - if t.traversal.processed(addrs[i]) { - // it can happen only during additional container broadcast - continue - } - - wg.Add(1) - - addr := addrs[i] - - isLocal := t.isLocalKey(addr.PublicKey()) - - var workerPool util.WorkerPool - - if isLocal { - workerPool = t.localPool - } else { - workerPool = t.remotePool - } - - if err := workerPool.Submit(func() { - defer wg.Done() - - err := f(nodeDesc{local: isLocal, info: addr}) - - // mark the container node as processed in order to exclude it - // in subsequent container broadcast. Note that we don't - // process this node during broadcast if primary placement - // on it failed. - t.traversal.submitProcessed(addr) - - if err != nil { - resErr.Store(err) - svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) - return - } - - traverser.SubmitSuccess() - }); err != nil { - wg.Done() - - svcutil.LogWorkerPoolError(t.log, "PUT", err) - - break loop - } + if t.iterateAddresses(traverser, addrs, f, resErr) { + break } - - wg.Wait() } if !traverser.Success() { var err errIncompletePut - err.singleErr, _ = resErr.Load().(error) - return nil, err } @@ -248,10 +197,7 @@ loop: if t.traversal.submitPrimaryPlacementFinish() { _, err = t.iteratePlacement(f) if err != nil { - t.log.Error("additional container broadcast failure", - zap.Error(err), - ) - + t.log.Error("additional container broadcast failure", zap.Error(err)) // we don't fail primary operation because of broadcast failure } } @@ -261,3 +207,52 @@ loop: return new(transformer.AccessIdentifiers). WithSelfID(id), nil } + +func (t *distributedTarget) iterateAddresses(traverser *placement.Traverser, addrs []placement.Node, f func(nodeDesc) error, resErr *atomic.Value) bool { + wg := &sync.WaitGroup{} + + for i := range addrs { + if t.traversal.processed(addrs[i]) { + // it can happen only during additional container broadcast + continue + } + + wg.Add(1) + + addr := addrs[i] + isLocal := t.isLocalKey(addr.PublicKey()) + + workerPool := t.remotePool + if isLocal { + workerPool = t.localPool + } + + if err := workerPool.Submit(func() { + defer wg.Done() + + err := f(nodeDesc{local: isLocal, info: addr}) + + // mark the container node as processed in order to exclude it + // in subsequent container broadcast. Note that we don't + // process this node during broadcast if primary placement + // on it failed. + t.traversal.submitProcessed(addr) + + if err != nil { + resErr.Store(err) + svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) + return + } + + traverser.SubmitSuccess() + }); err != nil { + wg.Done() + svcutil.LogWorkerPoolError(t.log, "PUT", err) + return true + } + } + + wg.Wait() + + return false +}