From a059a7dcf06ed41354f1defe5d1822b79d4c1deb Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 23 Aug 2024 13:36:52 +0300 Subject: [PATCH] [#1329] cli: Skip linking objects in complex object processing Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-cli/modules/object/nodes.go | 74 ++++++++++++------------- cmd/frostfs-cli/modules/object/util.go | 9 +-- 2 files changed, 39 insertions(+), 44 deletions(-) diff --git a/cmd/frostfs-cli/modules/object/nodes.go b/cmd/frostfs-cli/modules/object/nodes.go index 42ae7324e..4efe04d16 100644 --- a/cmd/frostfs-cli/modules/object/nodes.go +++ b/cmd/frostfs-cli/modules/object/nodes.go @@ -172,7 +172,7 @@ func getComplexObjectParts(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli * func getCompexObjectMembers(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) []oid.ID { splitInfo := errSplitInfo.SplitInfo() - if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID, false); ok { + if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID); ok { return members } @@ -185,6 +185,7 @@ func getCompexObjectMembers(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, members []oid.ID, prmHead internalclient.HeadObjectPrm) []phyObject { result := make([]phyObject, 0, len(members)) + var hasNonEC, hasEC bool var resultGuard sync.Mutex if len(members) == 0 { @@ -193,31 +194,8 @@ func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, member prmHead.SetRawFlag(true) // to get an error instead of whole object - first := members[0] - var addrObj oid.Address - addrObj.SetContainer(cnrID) - addrObj.SetObject(first) - prmHead.SetAddress(addrObj) - - _, err := internalclient.HeadObject(cmd.Context(), prmHead) - var ecInfoError *objectSDK.ECInfoError - if errors.As(err, &ecInfoError) { - chunks := getECObjectChunks(cmd, cnrID, first, ecInfoError) - result = append(result, chunks...) - } else if err == nil { // not EC object, so all members must be phy objects - for _, member := range members { - result = append(result, phyObject{ - containerID: cnrID, - objectID: member, - }) - } - return result - } else { - commonCmd.ExitOnErr(cmd, "failed to flatten parts of complex object: %w", err) - } - eg, egCtx := errgroup.WithContext(cmd.Context()) - for idx := 1; idx < len(members); idx++ { + for idx := 0; idx < len(members); idx++ { partObjID := members[idx] eg.Go(func() error { @@ -227,24 +205,44 @@ func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, member partAddr.SetObject(partObjID) partHeadPrm.SetAddress(partAddr) - _, err := internalclient.HeadObject(egCtx, partHeadPrm) - var ecInfoError *objectSDK.ECInfoError - if errors.As(err, &ecInfoError) { - chunks := getECObjectChunks(cmd, cnrID, partObjID, ecInfoError) - - resultGuard.Lock() - defer resultGuard.Unlock() - result = append(result, chunks...) - - return nil - } else if err == nil { - return errMalformedComplexObject + obj, err := internalclient.HeadObject(egCtx, partHeadPrm) + if err != nil { + var ecInfoError *objectSDK.ECInfoError + if errors.As(err, &ecInfoError) { + resultGuard.Lock() + defer resultGuard.Unlock() + result = append(result, getECObjectChunks(cmd, cnrID, partObjID, ecInfoError)...) + hasEC = true + return nil + } + return err } - return err + + if obj.Header().Type() != objectSDK.TypeRegular { + commonCmd.ExitOnErr(cmd, "failed to flatten parts of complex object: %w", fmt.Errorf("object '%s' with type '%s' is not supported as part of complex object", partAddr, obj.Header().Type())) + } + + if len(obj.Header().Children()) > 0 { + // linking object is not data object, so skip it + return nil + } + + resultGuard.Lock() + defer resultGuard.Unlock() + result = append(result, phyObject{ + containerID: cnrID, + objectID: partObjID, + }) + hasNonEC = true + + return nil }) } commonCmd.ExitOnErr(cmd, "failed to flatten parts of complex object: %w", eg.Wait()) + if hasEC && hasNonEC { + commonCmd.ExitOnErr(cmd, "failed to flatten parts of complex object: %w", errMalformedComplexObject) + } return result } diff --git a/cmd/frostfs-cli/modules/object/util.go b/cmd/frostfs-cli/modules/object/util.go index 96b80fe1b..b090c9f8c 100644 --- a/cmd/frostfs-cli/modules/object/util.go +++ b/cmd/frostfs-cli/modules/object/util.go @@ -374,7 +374,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, common.PrintVerbose(cmd, "Split information received - object is virtual.") splitInfo := errSplit.SplitInfo() - if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok { + if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr); ok { return members } @@ -390,7 +390,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, return nil } -func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) { +func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID) ([]oid.ID, bool) { // collect split chain by the descending ease of operations (ease is evaluated heuristically). // If any approach fails, we don't try the next since we assume that it will fail too. @@ -411,10 +411,7 @@ func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK. common.PrintVerbose(cmd, "Received split members from the linking object: %v", children) - if withLinking { - return append(children, idLinking), true - } - return children, true + return append(children, idLinking), true } // linking object is not required for -- 2.45.2