diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 9533df08b..1d80a5025 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -12,10 +12,11 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + "go.uber.org/zap" ) type distributedTarget struct { - traverseOpts []placement.Option + traversal traversal remotePool, localPool util.WorkerPool @@ -34,6 +35,50 @@ type distributedTarget struct { log *logger.Logger } +// parameters and state of container traversal. +type traversal struct { + opts []placement.Option + + // need of additional broadcast after the object is saved + extraBroadcastEnabled bool + + // container nodes which was processed during the primary object placement + mExclude map[string]struct{} +} + +// updates traversal parameters after the primary placement finish and +// returns true if additional container broadcast is needed. +func (x *traversal) submitPrimaryPlacementFinish() bool { + if x.extraBroadcastEnabled { + // do not track success during container broadcast (best-effort) + x.opts = append(x.opts, placement.WithoutSuccessTracking()) + + // avoid 2nd broadcast + x.extraBroadcastEnabled = false + + return true + } + + return false +} + +// marks the container node as processed during the primary object placement. +func (x *traversal) submitProcessed(n placement.Node) { + if x.extraBroadcastEnabled { + if x.mExclude == nil { + x.mExclude = make(map[string]struct{}, 1) + } + + x.mExclude[string(n.PublicKey())] = struct{}{} + } +} + +// checks if specified node was processed during the primary object placement. +func (x traversal) processed(n placement.Node) bool { + _, ok := x.mExclude[string(n.PublicKey())] + return ok +} + type nodeDesc struct { local bool @@ -106,7 +151,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) { traverser, err := placement.NewTraverser( - append(t.traverseOpts, placement.ForObject(t.obj.ID()))..., + append(t.traversal.opts, placement.ForObject(t.obj.ID()))..., ) if err != nil { return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err) @@ -124,6 +169,11 @@ loop: wg := new(sync.WaitGroup) for i := range addrs { + if t.traversal.processed(addrs[i]) { + // it can happen only during additional container broadcast + continue + } + wg.Add(1) addr := addrs[i] @@ -141,7 +191,15 @@ loop: if err := workerPool.Submit(func() { defer wg.Done() - if err := f(nodeDesc{local: isLocal, info: addr}); err != nil { + err := f(nodeDesc{local: isLocal, info: addr}) + + // mark the container node as processed in order to exclude it + // in subsequent container broadcast. Note that we don't + // process this node during broadcast if primary placement + // on it failed. + t.traversal.submitProcessed(addr) + + if err != nil { resErr.Store(err) svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) return @@ -168,6 +226,18 @@ loop: return nil, err } + // perform additional container broadcast if needed + if t.traversal.submitPrimaryPlacementFinish() { + _, err = t.iteratePlacement(f) + if err != nil { + t.log.Error("additional container broadcast failure", + zap.Error(err), + ) + + // we don't fail primary operation because of broadcast failure + } + } + return new(transformer.AccessIdentifiers). WithSelfID(t.obj.ID()), nil } diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index bcf5695f4..50bbb02df 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -161,9 +161,11 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { } return &distributedTarget{ - traverseOpts: prm.traverseOpts, - remotePool: p.remotePool, - localPool: p.localPool, + traversal: traversal{ + opts: prm.traverseOpts, + }, + remotePool: p.remotePool, + localPool: p.localPool, nodeTargetInitializer: func(node nodeDesc) transformer.ObjectTarget { if node.local { return &localTarget{