Compare commits
1 commit
master
...
bugfix/tra
Author | SHA1 | Date | |
---|---|---|---|
e8b484bcef |
2 changed files with 23 additions and 17 deletions
|
@ -48,7 +48,7 @@ type traversal struct {
|
|||
extraBroadcastEnabled bool
|
||||
|
||||
// container nodes which was processed during the primary object placement
|
||||
mExclude map[string]struct{}
|
||||
mExclude map[string]*bool
|
||||
}
|
||||
|
||||
// updates traversal parameters after the primary placement finish and
|
||||
|
@ -68,24 +68,18 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
|
|||
}
|
||||
|
||||
// marks the container node as processed during the primary object placement.
|
||||
func (x *traversal) submitProcessed(n placement.Node) {
|
||||
func (x *traversal) submitProcessed(n placement.Node, item *bool) {
|
||||
if x.extraBroadcastEnabled {
|
||||
key := string(n.PublicKey())
|
||||
|
||||
if x.mExclude == nil {
|
||||
x.mExclude = make(map[string]struct{}, 1)
|
||||
x.mExclude = make(map[string]*bool, 1)
|
||||
}
|
||||
|
||||
x.mExclude[key] = struct{}{}
|
||||
x.mExclude[key] = item
|
||||
}
|
||||
}
|
||||
|
||||
// checks if specified node was processed during the primary object placement.
|
||||
func (x *traversal) processed(n placement.Node) bool {
|
||||
_, ok := x.mExclude[string(n.PublicKey())]
|
||||
return ok
|
||||
}
|
||||
|
||||
type nodeDesc struct {
|
||||
local bool
|
||||
|
||||
|
@ -181,6 +175,8 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
|
|||
|
||||
resErr := &atomic.Value{}
|
||||
|
||||
// Must iterate over all replicas, regardless of whether there are identical nodes there.
|
||||
// At the same time need to exclude identical nodes from processing.
|
||||
for {
|
||||
addrs := traverser.Next()
|
||||
if len(addrs) == 0 {
|
||||
|
@ -214,14 +210,18 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
|
|||
wg := &sync.WaitGroup{}
|
||||
|
||||
for i := range addrs {
|
||||
if t.traversal.processed(addrs[i]) {
|
||||
addr := addrs[i]
|
||||
if val := t.traversal.mExclude[string(addr.PublicKey())]; val != nil {
|
||||
// Check is node processed successful on the previous iteration.
|
||||
if *val {
|
||||
traverser.SubmitSuccess()
|
||||
}
|
||||
// it can happen only during additional container broadcast
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
addr := addrs[i]
|
||||
item := new(bool)
|
||||
|
||||
workerPool, isLocal := t.getWorkerPool(addr.PublicKey())
|
||||
if err := workerPool.Submit(func() {
|
||||
|
@ -235,6 +235,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
|
|||
}
|
||||
|
||||
traverser.SubmitSuccess()
|
||||
*item = true
|
||||
}); err != nil {
|
||||
wg.Done()
|
||||
svcutil.LogWorkerPoolError(t.log, "PUT", err)
|
||||
|
@ -245,7 +246,7 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
|
|||
// in subsequent container broadcast. Note that we don't
|
||||
// process this node during broadcast if primary placement
|
||||
// on it failed.
|
||||
t.traversal.submitProcessed(addr)
|
||||
t.traversal.submitProcessed(addr, item)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
|
|
@ -154,7 +154,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
|
|||
opts: placementOptions,
|
||||
extraBroadcastEnabled: len(obj.Children()) > 0 ||
|
||||
(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
|
||||
mExclude: make(map[string]struct{}),
|
||||
mExclude: make(map[string]*bool),
|
||||
}
|
||||
signer := &putSingleRequestSigner{
|
||||
req: req,
|
||||
|
@ -247,7 +247,10 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
|
|||
|
||||
for _, nodeAddress := range nodeAddresses {
|
||||
nodeAddress := nodeAddress
|
||||
if traversal.processed(nodeAddress) {
|
||||
if ok := traversal.mExclude[string(nodeAddress.PublicKey())]; ok != nil {
|
||||
if *ok {
|
||||
traverser.SubmitSuccess()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -258,6 +261,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
|
|||
workerPool = s.localPool
|
||||
}
|
||||
|
||||
item := new(bool)
|
||||
wg.Add(1)
|
||||
if err := workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
|
@ -271,13 +275,14 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
|
|||
}
|
||||
|
||||
traverser.SubmitSuccess()
|
||||
*item = true
|
||||
}); err != nil {
|
||||
wg.Done()
|
||||
svcutil.LogWorkerPoolError(s.log, "PUT", err)
|
||||
return true
|
||||
}
|
||||
|
||||
traversal.submitProcessed(nodeAddress)
|
||||
traversal.submitProcessed(nodeAddress, item)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
|
Loading…
Reference in a new issue