[#1329] putSvc: Reset SuccessAfter for non-EC objects in EC container broadcasting

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-08-23 12:28:09 +03:00
parent eec359cfa8
commit 08593f664b
4 changed files with 27 additions and 5 deletions

View file

@ -24,6 +24,8 @@ type distributedTarget struct {
nodeTargetInitializer func(nodeDesc) preparedObjectTarget nodeTargetInitializer func(nodeDesc) preparedObjectTarget
relay func(context.Context, nodeDesc) error relay func(context.Context, nodeDesc) error
resetSuccessAfterOnBroadcast bool
} }
// parameters and state of container traversal. // parameters and state of container traversal.
@ -35,6 +37,8 @@ type traversal struct {
// container nodes which was processed during the primary object placement // container nodes which was processed during the primary object placement
mExclude map[string]*bool mExclude map[string]*bool
resetSuccessAfterOnBroadcast bool
} }
// updates traversal parameters after the primary placement finish and // updates traversal parameters after the primary placement finish and
@ -44,6 +48,10 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
// do not track success during container broadcast (best-effort) // do not track success during container broadcast (best-effort)
x.opts = append(x.opts, placement.WithoutSuccessTracking()) x.opts = append(x.opts, placement.WithoutSuccessTracking())
if x.resetSuccessAfterOnBroadcast {
x.opts = append(x.opts, placement.ResetSuccessAfter())
}
// avoid 2nd broadcast // avoid 2nd broadcast
x.extraBroadcastEnabled = false x.extraBroadcastEnabled = false
@ -118,5 +126,6 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id))) iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */) iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
return iter.forEachNode(ctx, t.sendObject) return iter.forEachNode(ctx, t.sendObject)
} }

View file

@ -166,6 +166,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
iter := s.cfg.newNodeIterator(placement.placementOptions) iter := s.cfg.newNodeIterator(placement.placementOptions)
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly) iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
signer := &putSingleRequestSigner{ signer := &putSingleRequestSigner{
req: req, req: req,
@ -212,6 +213,7 @@ type putSinglePlacement struct {
placementOptions []placement.Option placementOptions []placement.Option
isEC bool isEC bool
container containerSDK.Container container containerSDK.Container
resetSuccessAfterOnBroadcast bool
} }
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) { func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
@ -232,6 +234,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
} }
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly { if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1))) result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
result.resetSuccessAfterOnBroadcast = true
} }
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value)) result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))

View file

@ -233,16 +233,19 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool)
} }
} }
var resetSuccessAfterOnBroadcast bool
traverseOpts := prm.traverseOpts traverseOpts := prm.traverseOpts
if forECPlacement && !prm.common.LocalOnly() { if forECPlacement && !prm.common.LocalOnly() {
// save non-regular and linking object to EC container. // save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc. // EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1))) traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
resetSuccessAfterOnBroadcast = true
} }
return &distributedTarget{ return &distributedTarget{
cfg: p.cfg, cfg: p.cfg,
placementOpts: traverseOpts, placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local { if node.local {
return localTarget{ return localTarget{

View file

@ -303,6 +303,13 @@ func SuccessAfter(v uint32) Option {
} }
} }
// ResetSuccessAfter resets flat success number setting option.
func ResetSuccessAfter() Option {
return func(c *cfg) {
c.flatSuccess = nil
}
}
// WithoutSuccessTracking disables success tracking in traversal. // WithoutSuccessTracking disables success tracking in traversal.
func WithoutSuccessTracking() Option { func WithoutSuccessTracking() Option {
return func(c *cfg) { return func(c *cfg) {