Fix linking object handling for EC #1329

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:fix/object_nodes_linking into master 2024-09-04 19:51:11 +00:00
6 changed files with 66 additions and 49 deletions

View file

@ -172,7 +172,7 @@ func getComplexObjectParts(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *
func getCompexObjectMembers(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) []oid.ID {
splitInfo := errSplitInfo.SplitInfo()
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID, false); ok {
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID); ok {
Review

Now we always get linking object here.
Why this doesn't affect the result?

Now we always get linking object here. Why this doesn't affect the result?
Review
			if len(obj.Header().Children()) > 0 {
				// linking object is not data object, so skip it
				return nil
			}
``` if len(obj.Header().Children()) > 0 { // linking object is not data object, so skip it return nil } ```
return members
}
@ -185,6 +185,7 @@ func getCompexObjectMembers(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli
func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, members []oid.ID, prmHead internalclient.HeadObjectPrm) []phyObject {
result := make([]phyObject, 0, len(members))
var hasNonEC, hasEC bool
var resultGuard sync.Mutex
if len(members) == 0 {
@ -193,31 +194,8 @@ func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, member
prmHead.SetRawFlag(true) // to get an error instead of whole object
first := members[0]
var addrObj oid.Address
addrObj.SetContainer(cnrID)
addrObj.SetObject(first)
prmHead.SetAddress(addrObj)
_, err := internalclient.HeadObject(cmd.Context(), prmHead)
var ecInfoError *objectSDK.ECInfoError
if errors.As(err, &ecInfoError) {
chunks := getECObjectChunks(cmd, cnrID, first, ecInfoError)
result = append(result, chunks...)
} else if err == nil { // not EC object, so all members must be phy objects
for _, member := range members {
result = append(result, phyObject{
containerID: cnrID,
objectID: member,
})
}
return result
} else {
commonCmd.ExitOnErr(cmd, "failed to flatten parts of complex object: %w", err)
}
eg, egCtx := errgroup.WithContext(cmd.Context())
for idx := 1; idx < len(members); idx++ {
for idx := 0; idx < len(members); idx++ {
partObjID := members[idx]
eg.Go(func() error {
@ -227,24 +205,44 @@ func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, member
partAddr.SetObject(partObjID)
partHeadPrm.SetAddress(partAddr)
_, err := internalclient.HeadObject(egCtx, partHeadPrm)
var ecInfoError *objectSDK.ECInfoError
if errors.As(err, &ecInfoError) {
chunks := getECObjectChunks(cmd, cnrID, partObjID, ecInfoError)
resultGuard.Lock()
defer resultGuard.Unlock()
result = append(result, chunks...)
return nil
} else if err == nil {
return errMalformedComplexObject
obj, err := internalclient.HeadObject(egCtx, partHeadPrm)
if err != nil {
var ecInfoError *objectSDK.ECInfoError
if errors.As(err, &ecInfoError) {
resultGuard.Lock()
defer resultGuard.Unlock()
result = append(result, getECObjectChunks(cmd, cnrID, partObjID, ecInfoError)...)
hasEC = true
return nil
}
return err
}
return err
if obj.Header().Type() != objectSDK.TypeRegular {
commonCmd.ExitOnErr(cmd, "failed to flatten parts of complex object: %w", fmt.Errorf("object '%s' with type '%s' is not supported as part of complex object", partAddr, obj.Header().Type()))
}
if len(obj.Header().Children()) > 0 {
// linking object is not data object, so skip it
return nil
}
resultGuard.Lock()
defer resultGuard.Unlock()
result = append(result, phyObject{
containerID: cnrID,
objectID: partObjID,
})
hasNonEC = true
return nil
})
}
commonCmd.ExitOnErr(cmd, "failed to flatten parts of complex object: %w", eg.Wait())
if hasEC && hasNonEC {
commonCmd.ExitOnErr(cmd, "failed to flatten parts of complex object: %w", errMalformedComplexObject)
}
return result
}

View file

@ -374,7 +374,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
common.PrintVerbose(cmd, "Split information received - object is virtual.")
splitInfo := errSplit.SplitInfo()
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr); ok {
return members
}
@ -390,7 +390,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
return nil
}
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) {
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID) ([]oid.ID, bool) {
// collect split chain by the descending ease of operations (ease is evaluated heuristically).
// If any approach fails, we don't try the next since we assume that it will fail too.
@ -411,10 +411,7 @@ func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.
common.PrintVerbose(cmd, "Received split members from the linking object: %v", children)
if withLinking {
return append(children, idLinking), true
}
return children, true
return append(children, idLinking), true
}
// linking object is not required for

View file

@ -24,6 +24,8 @@ type distributedTarget struct {
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
relay func(context.Context, nodeDesc) error
resetSuccessAfterOnBroadcast bool
}
// parameters and state of container traversal.
@ -35,6 +37,8 @@ type traversal struct {
// container nodes which was processed during the primary object placement
mExclude map[string]*bool
resetSuccessAfterOnBroadcast bool
}
// updates traversal parameters after the primary placement finish and
@ -44,6 +48,10 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
// do not track success during container broadcast (best-effort)
x.opts = append(x.opts, placement.WithoutSuccessTracking())
if x.resetSuccessAfterOnBroadcast {
x.opts = append(x.opts, placement.ResetSuccessAfter())
}
// avoid 2nd broadcast
x.extraBroadcastEnabled = false
@ -118,5 +126,6 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
return iter.forEachNode(ctx, t.sendObject)
}

View file

@ -166,6 +166,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
iter := s.cfg.newNodeIterator(placement.placementOptions)
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
signer := &putSingleRequestSigner{
req: req,
@ -209,9 +210,10 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace
}
type putSinglePlacement struct {
placementOptions []placement.Option
isEC bool
container containerSDK.Container
placementOptions []placement.Option
isEC bool
container containerSDK.Container
resetSuccessAfterOnBroadcast bool
}
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
@ -232,6 +234,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
}
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
result.resetSuccessAfterOnBroadcast = true
}
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))

View file

@ -246,16 +246,19 @@ func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool)
}
}
var resetSuccessAfterOnBroadcast bool
traverseOpts := prm.traverseOpts
if forECPlacement && !prm.common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
resetSuccessAfterOnBroadcast = true
}
return &distributedTarget{
cfg: p.cfg,
placementOpts: traverseOpts,
cfg: p.cfg,
placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local {
return localTarget{

View file

@ -303,6 +303,13 @@ func SuccessAfter(v uint32) Option {
}
}
// ResetSuccessAfter resets flat success number setting option.
func ResetSuccessAfter() Option {
return func(c *cfg) {
c.flatSuccess = nil
}
}
// WithoutSuccessTracking disables success tracking in traversal.
func WithoutSuccessTracking() Option {
return func(c *cfg) {