node: Fix Put in multi REP with intersecting sets of nodes #560

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:bugfix/traverse-via-intersecting-sets into master 2023-08-08 10:22:54 +00:00
2 changed files with 23 additions and 17 deletions

View file

@ -48,7 +48,7 @@ type traversal struct {
extraBroadcastEnabled bool extraBroadcastEnabled bool
// container nodes which was processed during the primary object placement // container nodes which was processed during the primary object placement
mExclude map[string]struct{} mExclude map[string]*bool
} }
// updates traversal parameters after the primary placement finish and // updates traversal parameters after the primary placement finish and
@ -68,24 +68,18 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
} }
// marks the container node as processed during the primary object placement. // marks the container node as processed during the primary object placement.
func (x *traversal) submitProcessed(n placement.Node) { func (x *traversal) submitProcessed(n placement.Node, item *bool) {
if x.extraBroadcastEnabled { if x.extraBroadcastEnabled {
key := string(n.PublicKey()) key := string(n.PublicKey())
if x.mExclude == nil { if x.mExclude == nil {
x.mExclude = make(map[string]struct{}, 1) x.mExclude = make(map[string]*bool, 1)
} }
x.mExclude[key] = struct{}{} x.mExclude[key] = item
} }
} }
// checks if specified node was processed during the primary object placement.
func (x *traversal) processed(n placement.Node) bool {
_, ok := x.mExclude[string(n.PublicKey())]
return ok
}
type nodeDesc struct { type nodeDesc struct {
local bool local bool
@ -181,6 +175,8 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
resErr := &atomic.Value{} resErr := &atomic.Value{}
// Must iterate over all replicas, regardless of whether there are identical nodes there.
// At the same time need to exclude identical nodes from processing.
for { for {
addrs := traverser.Next() addrs := traverser.Next()
if len(addrs) == 0 { if len(addrs) == 0 {
@ -214,14 +210,18 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := range addrs { for i := range addrs {
if t.traversal.processed(addrs[i]) { addr := addrs[i]
if val := t.traversal.mExclude[string(addr.PublicKey())]; val != nil {
// Check is node processed successful on the previous iteration.
if *val {
traverser.SubmitSuccess()
}
// it can happen only during additional container broadcast // it can happen only during additional container broadcast
continue continue
} }
wg.Add(1) wg.Add(1)
item := new(bool)
addr := addrs[i]
workerPool, isLocal := t.getWorkerPool(addr.PublicKey()) workerPool, isLocal := t.getWorkerPool(addr.PublicKey())
if err := workerPool.Submit(func() { if err := workerPool.Submit(func() {
@ -235,6 +235,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
} }
traverser.SubmitSuccess() traverser.SubmitSuccess()
*item = true
}); err != nil { }); err != nil {
wg.Done() wg.Done()
svcutil.LogWorkerPoolError(t.log, "PUT", err) svcutil.LogWorkerPoolError(t.log, "PUT", err)
@ -245,7 +246,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
// in subsequent container broadcast. Note that we don't // in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement // process this node during broadcast if primary placement
// on it failed. // on it failed.
t.traversal.submitProcessed(addr) t.traversal.submitProcessed(addr, item)
} }
wg.Wait() wg.Wait()

View file

@ -154,7 +154,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
opts: placementOptions, opts: placementOptions,
extraBroadcastEnabled: len(obj.Children()) > 0 || extraBroadcastEnabled: len(obj.Children()) > 0 ||
(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)), (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
mExclude: make(map[string]struct{}), mExclude: make(map[string]*bool),
} }
signer := &putSingleRequestSigner{ signer := &putSingleRequestSigner{
req: req, req: req,
@ -247,7 +247,10 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
for _, nodeAddress := range nodeAddresses { for _, nodeAddress := range nodeAddresses {
nodeAddress := nodeAddress nodeAddress := nodeAddress
if traversal.processed(nodeAddress) { if ok := traversal.mExclude[string(nodeAddress.PublicKey())]; ok != nil {
if *ok {
traverser.SubmitSuccess()
}
continue continue
} }
@ -258,6 +261,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
workerPool = s.localPool workerPool = s.localPool
} }
item := new(bool)
wg.Add(1) wg.Add(1)
if err := workerPool.Submit(func() { if err := workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
@ -271,13 +275,14 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
} }
traverser.SubmitSuccess() traverser.SubmitSuccess()
*item = true
}); err != nil { }); err != nil {
wg.Done() wg.Done()
svcutil.LogWorkerPoolError(s.log, "PUT", err) svcutil.LogWorkerPoolError(s.log, "PUT", err)
return true return true
} }
traversal.submitProcessed(nodeAddress) traversal.submitProcessed(nodeAddress, item)
} }
wg.Wait() wg.Wait()