node: Fix Put in multi REP with intersecting sets of nodes #560

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:bugfix/traverse-via-intersecting-sets into master 2023-08-08 10:22:54 +00:00
2 changed files with 23 additions and 17 deletions

View file

@ -48,7 +48,7 @@ type traversal struct {
extraBroadcastEnabled bool
// container nodes which was processed during the primary object placement
mExclude map[string]struct{}
mExclude map[string]*bool
}
// updates traversal parameters after the primary placement finish and
@ -68,24 +68,18 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
}
// marks the container node as processed during the primary object placement.
func (x *traversal) submitProcessed(n placement.Node) {
func (x *traversal) submitProcessed(n placement.Node, item *bool) {
if x.extraBroadcastEnabled {
key := string(n.PublicKey())
if x.mExclude == nil {
x.mExclude = make(map[string]struct{}, 1)
x.mExclude = make(map[string]*bool, 1)
}
x.mExclude[key] = struct{}{}
x.mExclude[key] = item
}
}
// checks if specified node was processed during the primary object placement.
func (x *traversal) processed(n placement.Node) bool {
_, ok := x.mExclude[string(n.PublicKey())]
return ok
}
type nodeDesc struct {
local bool
@ -181,6 +175,8 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
resErr := &atomic.Value{}
// Must iterate over all replicas, regardless of whether there are identical nodes there.
// At the same time need to exclude identical nodes from processing.
for {
addrs := traverser.Next()
if len(addrs) == 0 {
@ -214,14 +210,18 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
wg := &sync.WaitGroup{}
for i := range addrs {
if t.traversal.processed(addrs[i]) {
addr := addrs[i]
if val := t.traversal.mExclude[string(addr.PublicKey())]; val != nil {
// Check is node processed successful on the previous iteration.
if *val {
traverser.SubmitSuccess()
}
// it can happen only during additional container broadcast
continue
}
wg.Add(1)
addr := addrs[i]
item := new(bool)
workerPool, isLocal := t.getWorkerPool(addr.PublicKey())
if err := workerPool.Submit(func() {
@ -235,6 +235,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
}
traverser.SubmitSuccess()
*item = true
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(t.log, "PUT", err)
@ -245,7 +246,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addr)
t.traversal.submitProcessed(addr, item)
}
wg.Wait()

View file

@ -154,7 +154,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
opts: placementOptions,
extraBroadcastEnabled: len(obj.Children()) > 0 ||
(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
mExclude: make(map[string]struct{}),
mExclude: make(map[string]*bool),
}
signer := &putSingleRequestSigner{
req: req,
@ -247,7 +247,10 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
for _, nodeAddress := range nodeAddresses {
nodeAddress := nodeAddress
if traversal.processed(nodeAddress) {
if ok := traversal.mExclude[string(nodeAddress.PublicKey())]; ok != nil {
if *ok {
traverser.SubmitSuccess()
}
continue
}
@ -258,6 +261,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
workerPool = s.localPool
}
item := new(bool)
wg.Add(1)
if err := workerPool.Submit(func() {
defer wg.Done()
@ -271,13 +275,14 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
}
traverser.SubmitSuccess()
*item = true
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(s.log, "PUT", err)
return true
}
traversal.submitProcessed(nodeAddress)
traversal.submitProcessed(nodeAddress, item)
}
wg.Wait()