diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index f4afd44a..4740ad1f 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -48,7 +48,7 @@ type traversal struct { extraBroadcastEnabled bool // container nodes which was processed during the primary object placement - mExclude map[string]struct{} + mExclude map[string]*bool } // updates traversal parameters after the primary placement finish and @@ -68,24 +68,18 @@ func (x *traversal) submitPrimaryPlacementFinish() bool { } // marks the container node as processed during the primary object placement. -func (x *traversal) submitProcessed(n placement.Node) { +func (x *traversal) submitProcessed(n placement.Node, item *bool) { if x.extraBroadcastEnabled { key := string(n.PublicKey()) if x.mExclude == nil { - x.mExclude = make(map[string]struct{}, 1) + x.mExclude = make(map[string]*bool, 1) } - x.mExclude[key] = struct{}{} + x.mExclude[key] = item } } -// checks if specified node was processed during the primary object placement. -func (x *traversal) processed(n placement.Node) bool { - _, ok := x.mExclude[string(n.PublicKey())] - return ok -} - type nodeDesc struct { local bool @@ -181,6 +175,8 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error { resErr := &atomic.Value{} + // Must iterate over all replicas, regardless of whether there are identical nodes there. + // At the same time need to exclude identical nodes from processing. for { addrs := traverser.Next() if len(addrs) == 0 { @@ -214,14 +210,18 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla wg := &sync.WaitGroup{} for i := range addrs { - if t.traversal.processed(addrs[i]) { + addr := addrs[i] + if val := t.traversal.mExclude[string(addr.PublicKey())]; val != nil { + // Check is node processed successful on the previous iteration. + if *val { + traverser.SubmitSuccess() + } // it can happen only during additional container broadcast continue } wg.Add(1) - - addr := addrs[i] + item := new(bool) workerPool, isLocal := t.getWorkerPool(addr.PublicKey()) if err := workerPool.Submit(func() { @@ -235,6 +235,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla } traverser.SubmitSuccess() + *item = true }); err != nil { wg.Done() svcutil.LogWorkerPoolError(t.log, "PUT", err) @@ -245,7 +246,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla // in subsequent container broadcast. Note that we don't // process this node during broadcast if primary placement // on it failed. - t.traversal.submitProcessed(addr) + t.traversal.submitProcessed(addr, item) } wg.Wait() diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go index 9d17a812..7c7955c8 100644 --- a/pkg/services/object/put/single.go +++ b/pkg/services/object/put/single.go @@ -154,7 +154,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o opts: placementOptions, extraBroadcastEnabled: len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)), - mExclude: make(map[string]struct{}), + mExclude: make(map[string]*bool), } signer := &putSingleRequestSigner{ req: req, @@ -247,7 +247,10 @@ func (s *Service) saveToPlacementNodes(ctx context.Context, for _, nodeAddress := range nodeAddresses { nodeAddress := nodeAddress - if traversal.processed(nodeAddress) { + if ok := traversal.mExclude[string(nodeAddress.PublicKey())]; ok != nil { + if *ok { + traverser.SubmitSuccess() + } continue } @@ -258,6 +261,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context, workerPool = s.localPool } + item := new(bool) wg.Add(1) if err := workerPool.Submit(func() { defer wg.Done() @@ -271,13 +275,14 @@ func (s *Service) saveToPlacementNodes(ctx context.Context, } traverser.SubmitSuccess() + *item = true }); err != nil { wg.Done() svcutil.LogWorkerPoolError(s.log, "PUT", err) return true } - traversal.submitProcessed(nodeAddress) + traversal.submitProcessed(nodeAddress, item) } wg.Wait()