forked from TrueCloudLab/frostfs-node
[#1329] putSvc: Reset SuccessAfter for non-EC objects in EC container broadcasting
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
dfe825b81b
commit
bd24beecf8
4 changed files with 27 additions and 5 deletions
|
@ -24,6 +24,8 @@ type distributedTarget struct {
|
||||||
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
||||||
|
|
||||||
relay func(context.Context, nodeDesc) error
|
relay func(context.Context, nodeDesc) error
|
||||||
|
|
||||||
|
resetSuccessAfterOnBroadcast bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// parameters and state of container traversal.
|
// parameters and state of container traversal.
|
||||||
|
@ -35,6 +37,8 @@ type traversal struct {
|
||||||
|
|
||||||
// container nodes which was processed during the primary object placement
|
// container nodes which was processed during the primary object placement
|
||||||
mExclude map[string]*bool
|
mExclude map[string]*bool
|
||||||
|
|
||||||
|
resetSuccessAfterOnBroadcast bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// updates traversal parameters after the primary placement finish and
|
// updates traversal parameters after the primary placement finish and
|
||||||
|
@ -44,6 +48,10 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
|
||||||
// do not track success during container broadcast (best-effort)
|
// do not track success during container broadcast (best-effort)
|
||||||
x.opts = append(x.opts, placement.WithoutSuccessTracking())
|
x.opts = append(x.opts, placement.WithoutSuccessTracking())
|
||||||
|
|
||||||
|
if x.resetSuccessAfterOnBroadcast {
|
||||||
|
x.opts = append(x.opts, placement.ResetSuccessAfter())
|
||||||
|
}
|
||||||
|
|
||||||
// avoid 2nd broadcast
|
// avoid 2nd broadcast
|
||||||
x.extraBroadcastEnabled = false
|
x.extraBroadcastEnabled = false
|
||||||
|
|
||||||
|
@ -118,5 +126,6 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
|
||||||
|
|
||||||
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
|
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
|
||||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
|
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
|
||||||
|
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
|
||||||
return iter.forEachNode(ctx, t.sendObject)
|
return iter.forEachNode(ctx, t.sendObject)
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,6 +166,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
|
||||||
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
iter := s.cfg.newNodeIterator(placement.placementOptions)
|
iter := s.cfg.newNodeIterator(placement.placementOptions)
|
||||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
||||||
|
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
|
||||||
|
|
||||||
signer := &putSingleRequestSigner{
|
signer := &putSingleRequestSigner{
|
||||||
req: req,
|
req: req,
|
||||||
|
@ -212,6 +213,7 @@ type putSinglePlacement struct {
|
||||||
placementOptions []placement.Option
|
placementOptions []placement.Option
|
||||||
isEC bool
|
isEC bool
|
||||||
container containerSDK.Container
|
container containerSDK.Container
|
||||||
|
resetSuccessAfterOnBroadcast bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
|
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
|
||||||
|
@ -232,6 +234,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
||||||
}
|
}
|
||||||
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
|
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
|
||||||
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
|
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
|
||||||
|
result.resetSuccessAfterOnBroadcast = true
|
||||||
}
|
}
|
||||||
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
|
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
|
||||||
|
|
||||||
|
|
|
@ -246,16 +246,19 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var resetSuccessAfterOnBroadcast bool
|
||||||
traverseOpts := prm.traverseOpts
|
traverseOpts := prm.traverseOpts
|
||||||
if forECPlacement && !prm.common.LocalOnly() {
|
if forECPlacement && !prm.common.LocalOnly() {
|
||||||
// save non-regular and linking object to EC container.
|
// save non-regular and linking object to EC container.
|
||||||
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
||||||
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
|
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
|
||||||
|
resetSuccessAfterOnBroadcast = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return &distributedTarget{
|
return &distributedTarget{
|
||||||
cfg: p.cfg,
|
cfg: p.cfg,
|
||||||
placementOpts: traverseOpts,
|
placementOpts: traverseOpts,
|
||||||
|
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
|
||||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||||
if node.local {
|
if node.local {
|
||||||
return localTarget{
|
return localTarget{
|
||||||
|
|
|
@ -303,6 +303,13 @@ func SuccessAfter(v uint32) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetSuccessAfter resets flat success number setting option.
|
||||||
|
func ResetSuccessAfter() Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.flatSuccess = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithoutSuccessTracking disables success tracking in traversal.
|
// WithoutSuccessTracking disables success tracking in traversal.
|
||||||
func WithoutSuccessTracking() Option {
|
func WithoutSuccessTracking() Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
|
|
Loading…
Reference in a new issue