node: Fix Put
in multi REP with intersecting sets of nodes #560
2 changed files with 23 additions and 17 deletions
|
@ -48,7 +48,7 @@ type traversal struct {
|
||||||
extraBroadcastEnabled bool
|
extraBroadcastEnabled bool
|
||||||
|
|
||||||
// container nodes which was processed during the primary object placement
|
// container nodes which was processed during the primary object placement
|
||||||
mExclude map[string]struct{}
|
mExclude map[string]*bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// updates traversal parameters after the primary placement finish and
|
// updates traversal parameters after the primary placement finish and
|
||||||
|
@ -68,24 +68,18 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// marks the container node as processed during the primary object placement.
|
// marks the container node as processed during the primary object placement.
|
||||||
func (x *traversal) submitProcessed(n placement.Node) {
|
func (x *traversal) submitProcessed(n placement.Node, item *bool) {
|
||||||
if x.extraBroadcastEnabled {
|
if x.extraBroadcastEnabled {
|
||||||
key := string(n.PublicKey())
|
key := string(n.PublicKey())
|
||||||
|
|
||||||
if x.mExclude == nil {
|
if x.mExclude == nil {
|
||||||
x.mExclude = make(map[string]struct{}, 1)
|
x.mExclude = make(map[string]*bool, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
x.mExclude[key] = struct{}{}
|
x.mExclude[key] = item
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// checks if specified node was processed during the primary object placement.
|
|
||||||
func (x *traversal) processed(n placement.Node) bool {
|
|
||||||
_, ok := x.mExclude[string(n.PublicKey())]
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
type nodeDesc struct {
|
type nodeDesc struct {
|
||||||
local bool
|
local bool
|
||||||
|
|
||||||
|
@ -181,6 +175,8 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
|
||||||
|
|
||||||
resErr := &atomic.Value{}
|
resErr := &atomic.Value{}
|
||||||
|
|
||||||
|
// Must iterate over all replicas, regardless of whether there are identical nodes there.
|
||||||
|
// At the same time need to exclude identical nodes from processing.
|
||||||
for {
|
for {
|
||||||
addrs := traverser.Next()
|
addrs := traverser.Next()
|
||||||
if len(addrs) == 0 {
|
if len(addrs) == 0 {
|
||||||
|
@ -214,14 +210,18 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
for i := range addrs {
|
for i := range addrs {
|
||||||
if t.traversal.processed(addrs[i]) {
|
addr := addrs[i]
|
||||||
|
if val := t.traversal.mExclude[string(addr.PublicKey())]; val != nil {
|
||||||
|
// Check is node processed successful on the previous iteration.
|
||||||
|
if *val {
|
||||||
|
traverser.SubmitSuccess()
|
||||||
|
}
|
||||||
// it can happen only during additional container broadcast
|
// it can happen only during additional container broadcast
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
item := new(bool)
|
||||||
addr := addrs[i]
|
|
||||||
|
|
||||||
workerPool, isLocal := t.getWorkerPool(addr.PublicKey())
|
workerPool, isLocal := t.getWorkerPool(addr.PublicKey())
|
||||||
if err := workerPool.Submit(func() {
|
if err := workerPool.Submit(func() {
|
||||||
|
@ -235,6 +235,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
|
||||||
}
|
}
|
||||||
|
|
||||||
traverser.SubmitSuccess()
|
traverser.SubmitSuccess()
|
||||||
|
*item = true
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
svcutil.LogWorkerPoolError(t.log, "PUT", err)
|
svcutil.LogWorkerPoolError(t.log, "PUT", err)
|
||||||
|
@ -245,7 +246,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
|
||||||
// in subsequent container broadcast. Note that we don't
|
// in subsequent container broadcast. Note that we don't
|
||||||
// process this node during broadcast if primary placement
|
// process this node during broadcast if primary placement
|
||||||
// on it failed.
|
// on it failed.
|
||||||
t.traversal.submitProcessed(addr)
|
t.traversal.submitProcessed(addr, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
|
@ -154,7 +154,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
|
||||||
opts: placementOptions,
|
opts: placementOptions,
|
||||||
extraBroadcastEnabled: len(obj.Children()) > 0 ||
|
extraBroadcastEnabled: len(obj.Children()) > 0 ||
|
||||||
(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
|
(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
|
||||||
mExclude: make(map[string]struct{}),
|
mExclude: make(map[string]*bool),
|
||||||
}
|
}
|
||||||
signer := &putSingleRequestSigner{
|
signer := &putSingleRequestSigner{
|
||||||
req: req,
|
req: req,
|
||||||
|
@ -247,7 +247,10 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
|
||||||
|
|
||||||
for _, nodeAddress := range nodeAddresses {
|
for _, nodeAddress := range nodeAddresses {
|
||||||
nodeAddress := nodeAddress
|
nodeAddress := nodeAddress
|
||||||
if traversal.processed(nodeAddress) {
|
if ok := traversal.mExclude[string(nodeAddress.PublicKey())]; ok != nil {
|
||||||
|
if *ok {
|
||||||
|
traverser.SubmitSuccess()
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -258,6 +261,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
|
||||||
workerPool = s.localPool
|
workerPool = s.localPool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
item := new(bool)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
if err := workerPool.Submit(func() {
|
if err := workerPool.Submit(func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
@ -271,13 +275,14 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
|
||||||
}
|
}
|
||||||
|
|
||||||
traverser.SubmitSuccess()
|
traverser.SubmitSuccess()
|
||||||
|
*item = true
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
svcutil.LogWorkerPoolError(s.log, "PUT", err)
|
svcutil.LogWorkerPoolError(s.log, "PUT", err)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
traversal.submitProcessed(nodeAddress)
|
traversal.submitProcessed(nodeAddress, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
Loading…
Reference in a new issue