[#560] node: Fix Put in multi REP with intersecting sets of nodes

Once the node was processed it skipped, at the step of forming
result in case when all nodes skipped, because processed for
previous REP, service mark the whole request as incomplete.

Example of policies which are unblocked:
- REP 1 REP 1 CBF 1
- REP 4 IN X REP 4 IN Y
  CBF 4
  SELECT 2 FROM FX AS X SELECT 2 FROM FY AS Y
  FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS FY
  FILTER Price GE 0 AS FX

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2023-08-03 10:39:48 +03:00
parent d3a52ec73a
commit e8b484bcef
2 changed files with 23 additions and 17 deletions

View file

@ -48,7 +48,7 @@ type traversal struct {
extraBroadcastEnabled bool extraBroadcastEnabled bool
// container nodes which was processed during the primary object placement // container nodes which was processed during the primary object placement
mExclude map[string]struct{} mExclude map[string]*bool
} }
// updates traversal parameters after the primary placement finish and // updates traversal parameters after the primary placement finish and
@ -68,24 +68,18 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
} }
// marks the container node as processed during the primary object placement. // marks the container node as processed during the primary object placement.
func (x *traversal) submitProcessed(n placement.Node) { func (x *traversal) submitProcessed(n placement.Node, item *bool) {
if x.extraBroadcastEnabled { if x.extraBroadcastEnabled {
key := string(n.PublicKey()) key := string(n.PublicKey())
if x.mExclude == nil { if x.mExclude == nil {
x.mExclude = make(map[string]struct{}, 1) x.mExclude = make(map[string]*bool, 1)
} }
x.mExclude[key] = struct{}{} x.mExclude[key] = item
} }
} }
// checks if specified node was processed during the primary object placement.
func (x *traversal) processed(n placement.Node) bool {
_, ok := x.mExclude[string(n.PublicKey())]
return ok
}
type nodeDesc struct { type nodeDesc struct {
local bool local bool
@ -181,6 +175,8 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
resErr := &atomic.Value{} resErr := &atomic.Value{}
// Must iterate over all replicas, regardless of whether there are identical nodes there.
// At the same time need to exclude identical nodes from processing.
for { for {
addrs := traverser.Next() addrs := traverser.Next()
if len(addrs) == 0 { if len(addrs) == 0 {
@ -214,14 +210,18 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := range addrs { for i := range addrs {
if t.traversal.processed(addrs[i]) { addr := addrs[i]
if val := t.traversal.mExclude[string(addr.PublicKey())]; val != nil {
// Check is node processed successful on the previous iteration.
if *val {
traverser.SubmitSuccess()
}
// it can happen only during additional container broadcast // it can happen only during additional container broadcast
continue continue
} }
wg.Add(1) wg.Add(1)
item := new(bool)
addr := addrs[i]
workerPool, isLocal := t.getWorkerPool(addr.PublicKey()) workerPool, isLocal := t.getWorkerPool(addr.PublicKey())
if err := workerPool.Submit(func() { if err := workerPool.Submit(func() {
@ -235,6 +235,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
} }
traverser.SubmitSuccess() traverser.SubmitSuccess()
*item = true
}); err != nil { }); err != nil {
wg.Done() wg.Done()
svcutil.LogWorkerPoolError(t.log, "PUT", err) svcutil.LogWorkerPoolError(t.log, "PUT", err)
@ -245,7 +246,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
// in subsequent container broadcast. Note that we don't // in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement // process this node during broadcast if primary placement
// on it failed. // on it failed.
t.traversal.submitProcessed(addr) t.traversal.submitProcessed(addr, item)
} }
wg.Wait() wg.Wait()

View file

@ -154,7 +154,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
opts: placementOptions, opts: placementOptions,
extraBroadcastEnabled: len(obj.Children()) > 0 || extraBroadcastEnabled: len(obj.Children()) > 0 ||
(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)), (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
mExclude: make(map[string]struct{}), mExclude: make(map[string]*bool),
} }
signer := &putSingleRequestSigner{ signer := &putSingleRequestSigner{
req: req, req: req,
@ -247,7 +247,10 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
for _, nodeAddress := range nodeAddresses { for _, nodeAddress := range nodeAddresses {
nodeAddress := nodeAddress nodeAddress := nodeAddress
if traversal.processed(nodeAddress) { if ok := traversal.mExclude[string(nodeAddress.PublicKey())]; ok != nil {
if *ok {
traverser.SubmitSuccess()
}
continue continue
} }
@ -258,6 +261,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
workerPool = s.localPool workerPool = s.localPool
} }
item := new(bool)
wg.Add(1) wg.Add(1)
if err := workerPool.Submit(func() { if err := workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
@ -271,13 +275,14 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
} }
traverser.SubmitSuccess() traverser.SubmitSuccess()
*item = true
}); err != nil { }); err != nil {
wg.Done() wg.Done()
svcutil.LogWorkerPoolError(s.log, "PUT", err) svcutil.LogWorkerPoolError(s.log, "PUT", err)
return true return true
} }
traversal.submitProcessed(nodeAddress) traversal.submitProcessed(nodeAddress, item)
} }
wg.Wait() wg.Wait()