From 8d589314b5b76d213f02a9f2cd152bec8bcd7d06 Mon Sep 17 00:00:00 2001
From: Anton Nikiforov <an.nikiforov@yadro.com>
Date: Thu, 3 Aug 2023 10:39:48 +0300
Subject: [PATCH] [#560] node: Fix `Put` in multi REP with intersecting sets of
 nodes

Once the node was processed it skipped, at the step of forming
result in case when all nodes skipped, because processed for
previous REP, service mark the whole request as incomplete.

Example of policies which are unblocked:
- REP 1 REP 1 CBF 1
- REP 4 IN X REP 4 IN Y
  CBF 4
  SELECT 2 FROM FX AS X SELECT 2 FROM FY AS Y
  FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS FY
  FILTER Price GE 0 AS FX

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
---
 pkg/services/object/put/distributed.go | 29 +++++++++++++-------------
 pkg/services/object/put/single.go      | 11 +++++++---
 2 files changed, 23 insertions(+), 17 deletions(-)

diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go
index f4afd44aa..4740ad1fe 100644
--- a/pkg/services/object/put/distributed.go
+++ b/pkg/services/object/put/distributed.go
@@ -48,7 +48,7 @@ type traversal struct {
 	extraBroadcastEnabled bool
 
 	// container nodes which was processed during the primary object placement
-	mExclude map[string]struct{}
+	mExclude map[string]*bool
 }
 
 // updates traversal parameters after the primary placement finish and
@@ -68,24 +68,18 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
 }
 
 // marks the container node as processed during the primary object placement.
-func (x *traversal) submitProcessed(n placement.Node) {
+func (x *traversal) submitProcessed(n placement.Node, item *bool) {
 	if x.extraBroadcastEnabled {
 		key := string(n.PublicKey())
 
 		if x.mExclude == nil {
-			x.mExclude = make(map[string]struct{}, 1)
+			x.mExclude = make(map[string]*bool, 1)
 		}
 
-		x.mExclude[key] = struct{}{}
+		x.mExclude[key] = item
 	}
 }
 
-// checks if specified node was processed during the primary object placement.
-func (x *traversal) processed(n placement.Node) bool {
-	_, ok := x.mExclude[string(n.PublicKey())]
-	return ok
-}
-
 type nodeDesc struct {
 	local bool
 
@@ -181,6 +175,8 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
 
 	resErr := &atomic.Value{}
 
+	// Must iterate over all replicas, regardless of whether there are identical nodes there.
+	// At the same time need to exclude identical nodes from processing.
 	for {
 		addrs := traverser.Next()
 		if len(addrs) == 0 {
@@ -214,14 +210,18 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
 	wg := &sync.WaitGroup{}
 
 	for i := range addrs {
-		if t.traversal.processed(addrs[i]) {
+		addr := addrs[i]
+		if val := t.traversal.mExclude[string(addr.PublicKey())]; val != nil {
+			// Check is node processed successful on the previous iteration.
+			if *val {
+				traverser.SubmitSuccess()
+			}
 			// it can happen only during additional container broadcast
 			continue
 		}
 
 		wg.Add(1)
-
-		addr := addrs[i]
+		item := new(bool)
 
 		workerPool, isLocal := t.getWorkerPool(addr.PublicKey())
 		if err := workerPool.Submit(func() {
@@ -235,6 +235,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
 			}
 
 			traverser.SubmitSuccess()
+			*item = true
 		}); err != nil {
 			wg.Done()
 			svcutil.LogWorkerPoolError(t.log, "PUT", err)
@@ -245,7 +246,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
 		// in subsequent container broadcast. Note that we don't
 		// process this node during broadcast if primary placement
 		// on it failed.
-		t.traversal.submitProcessed(addr)
+		t.traversal.submitProcessed(addr, item)
 	}
 
 	wg.Wait()
diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go
index 9d17a8128..7c7955c87 100644
--- a/pkg/services/object/put/single.go
+++ b/pkg/services/object/put/single.go
@@ -154,7 +154,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
 		opts: placementOptions,
 		extraBroadcastEnabled: len(obj.Children()) > 0 ||
 			(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
-		mExclude: make(map[string]struct{}),
+		mExclude: make(map[string]*bool),
 	}
 	signer := &putSingleRequestSigner{
 		req:        req,
@@ -247,7 +247,10 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
 
 	for _, nodeAddress := range nodeAddresses {
 		nodeAddress := nodeAddress
-		if traversal.processed(nodeAddress) {
+		if ok := traversal.mExclude[string(nodeAddress.PublicKey())]; ok != nil {
+			if *ok {
+				traverser.SubmitSuccess()
+			}
 			continue
 		}
 
@@ -258,6 +261,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
 			workerPool = s.localPool
 		}
 
+		item := new(bool)
 		wg.Add(1)
 		if err := workerPool.Submit(func() {
 			defer wg.Done()
@@ -271,13 +275,14 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
 			}
 
 			traverser.SubmitSuccess()
+			*item = true
 		}); err != nil {
 			wg.Done()
 			svcutil.LogWorkerPoolError(s.log, "PUT", err)
 			return true
 		}
 
-		traversal.submitProcessed(nodeAddress)
+		traversal.submitProcessed(nodeAddress, item)
 	}
 
 	wg.Wait()