From 08593f664b227ca12f3086219fb500daaa6474cf Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 23 Aug 2024 12:28:09 +0300 Subject: [PATCH] [#1329] putSvc: Reset SuccessAfter for non-EC objects in EC container broadcasting Signed-off-by: Dmitrii Stepanov --- pkg/services/object/put/distributed.go | 9 +++++++++ pkg/services/object/put/single.go | 9 ++++++--- pkg/services/object/put/streamer.go | 7 +++++-- pkg/services/object_manager/placement/traverser.go | 7 +++++++ 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index c71427b67..5176f7a54 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -24,6 +24,8 @@ type distributedTarget struct { nodeTargetInitializer func(nodeDesc) preparedObjectTarget relay func(context.Context, nodeDesc) error + + resetSuccessAfterOnBroadcast bool } // parameters and state of container traversal. @@ -35,6 +37,8 @@ type traversal struct { // container nodes which was processed during the primary object placement mExclude map[string]*bool + + resetSuccessAfterOnBroadcast bool } // updates traversal parameters after the primary placement finish and @@ -44,6 +48,10 @@ func (x *traversal) submitPrimaryPlacementFinish() bool { // do not track success during container broadcast (best-effort) x.opts = append(x.opts, placement.WithoutSuccessTracking()) + if x.resetSuccessAfterOnBroadcast { + x.opts = append(x.opts, placement.ResetSuccessAfter()) + } + // avoid 2nd broadcast x.extraBroadcastEnabled = false @@ -118,5 +126,6 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error { iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id))) iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */) + iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast return iter.forEachNode(ctx, t.sendObject) } diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go index 9fa8ddb67..3cc8518f5 100644 --- a/pkg/services/object/put/single.go +++ b/pkg/services/object/put/single.go @@ -166,6 +166,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { iter := s.cfg.newNodeIterator(placement.placementOptions) iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly) + iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast signer := &putSingleRequestSigner{ req: req, @@ -209,9 +210,10 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace } type putSinglePlacement struct { - placementOptions []placement.Option - isEC bool - container containerSDK.Container + placementOptions []placement.Option + isEC bool + container containerSDK.Container + resetSuccessAfterOnBroadcast bool } func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) { @@ -232,6 +234,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb } if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly { result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1))) + result.resetSuccessAfterOnBroadcast = true } result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value)) diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 4e655ed54..3ee8f5aaa 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -233,16 +233,19 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) } } + var resetSuccessAfterOnBroadcast bool traverseOpts := prm.traverseOpts if forECPlacement && !prm.common.LocalOnly() { // save non-regular and linking object to EC container. // EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc. traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1))) + resetSuccessAfterOnBroadcast = true } return &distributedTarget{ - cfg: p.cfg, - placementOpts: traverseOpts, + cfg: p.cfg, + placementOpts: traverseOpts, + resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast, nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { if node.local { return localTarget{ diff --git a/pkg/services/object_manager/placement/traverser.go b/pkg/services/object_manager/placement/traverser.go index 306169571..9a5877c52 100644 --- a/pkg/services/object_manager/placement/traverser.go +++ b/pkg/services/object_manager/placement/traverser.go @@ -303,6 +303,13 @@ func SuccessAfter(v uint32) Option { } } +// ResetSuccessAfter resets flat success number setting option. +func ResetSuccessAfter() Option { + return func(c *cfg) { + c.flatSuccess = nil + } +} + // WithoutSuccessTracking disables success tracking in traversal. func WithoutSuccessTracking() Option { return func(c *cfg) {