From 98ac525272ef0d2f4a3303b13f81a9b1fedcab67 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 27 Oct 2022 10:49:56 +0400 Subject: [PATCH] [#1978] cli/object: Gather all related object in delete session Object removal session should reflect all objects related to the removing one. Make `OpenSessionViaClient` to gather the split members of the original object in order to spread the session to them. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 1 + cmd/neofs-cli/modules/object/util.go | 177 +++++++++++++++++++++++++-- 2 files changed, 171 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8747b1e50..b7534810b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ Changelog for NeoFS Node - Closing a shard now waits until GC background workers stop (#1964) - Make it possible to use `shard.ContainerSize` in read-only mode (#1975) - Storage node now starts if at least one gRPC enpoint is available (#1893) +- Missing object relatives in object removal session opened by NeoFS CLI (#1978) ### Removed ### Updated diff --git a/cmd/neofs-cli/modules/object/util.go b/cmd/neofs-cli/modules/object/util.go index 71ea61b27..1139aa7e6 100644 --- a/cmd/neofs-cli/modules/object/util.go +++ b/cmd/neofs-cli/modules/object/util.go @@ -14,6 +14,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/client" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" + "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/spf13/cobra" @@ -204,7 +205,12 @@ func ReadOrOpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client. return } - finalizeSession(cmd, dst, tok, key, cnr, obj) + var objs []oid.ID + if obj != nil { + objs = []oid.ID{*obj} + } + + finalizeSession(cmd, dst, tok, key, cnr, objs...) dst.SetClient(cli) } @@ -222,7 +228,26 @@ func OpenSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.PrivateKey, cnr // // *internal.PutObjectPrm // *internal.DeleteObjectPrm +// +// If provided SessionPrm is of type internal.DeleteObjectPrm, OpenSessionViaClient +// spreads the session to all object's relatives. func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client, key *ecdsa.PrivateKey, cnr cid.ID, obj *oid.ID) { + var objs []oid.ID + + if obj != nil { + if _, ok := dst.(*internal.DeleteObjectPrm); ok { + common.PrintVerbose("Collecting relatives of the removal object...") + + rels := collectObjectRelatives(cmd, cli, cnr, *obj) + + if len(rels) == 0 { + objs = []oid.ID{*obj} + } else { + objs = append(rels, *obj) + } + } + } + var tok session.Object const sessionLifetime = 10 // in NeoFS epochs @@ -234,13 +259,13 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client common.PrintVerbose("Session successfully opened.") - finalizeSession(cmd, dst, &tok, key, cnr, obj) + finalizeSession(cmd, dst, &tok, key, cnr, objs...) dst.SetClient(cli) } // specifies session verb, binds the session to the given container and limits -// the session by the given object (if specified). After all data is written, +// the session by the given objects (if specified). After all data is written, // signs session using provided private key and writes the session into the // given SessionPrm. // @@ -248,7 +273,7 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client // // *internal.PutObjectPrm // *internal.DeleteObjectPrm -func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, key *ecdsa.PrivateKey, cnr cid.ID, obj *oid.ID) { +func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, key *ecdsa.PrivateKey, cnr cid.ID, objs ...oid.ID) { common.PrintVerbose("Finalizing session token...") switch dst.(type) { @@ -265,9 +290,9 @@ func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, ke common.PrintVerbose("Binding session to container %s...", cnr) tok.BindContainer(cnr) - if obj != nil { - common.PrintVerbose("Limiting session by object %s...", obj) - tok.LimitByObjects(*obj) + if len(objs) > 0 { + common.PrintVerbose("Limiting session by the objects %v...", objs) + tok.LimitByObjects(objs...) } common.PrintVerbose("Signing session...") @@ -284,3 +309,141 @@ func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, ke func initFlagSession(cmd *cobra.Command, verb string) { commonflags.InitSession(cmd, "object "+verb) } + +// collects and returns all relatives of the given object stored in the specified +// container. Empty result without an error means lack of relationship in the +// container. +// +// The object itself is not included in the result. +func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, obj oid.ID) []oid.ID { + common.PrintVerbose("Fetching raw object header...") + + // request raw header first + var addrObj oid.Address + addrObj.SetContainer(cnr) + addrObj.SetObject(obj) + + var prmHead internal.HeadObjectPrm + prmHead.SetClient(cli) + prmHead.SetAddress(addrObj) + prmHead.SetRawFlag(true) + + _, err := internal.HeadObject(prmHead) + + var errSplit *object.SplitInfoError + + switch { + default: + common.ExitOnErr(cmd, "failed to get raw object header: %w", err) + case err == nil: + common.PrintVerbose("Raw header received - object is singular.") + return nil + case errors.As(err, &errSplit): + common.PrintVerbose("Split information received - object is virtual.") + } + + splitInfo := errSplit.SplitInfo() + + // collect split chain by the descending ease of operations (ease is evaluated heuristically). + // If any approach fails, we don't try the next since we assume that it will fail too. + + if idLinking, ok := splitInfo.Link(); ok { + common.PrintVerbose("Collecting split members using linking object %s...", idLinking) + + addrObj.SetObject(idLinking) + prmHead.SetAddress(addrObj) + prmHead.SetRawFlag(false) + // client is already set + + res, err := internal.HeadObject(prmHead) + common.ExitOnErr(cmd, "failed to get linking object's header: %w", err) + + children := res.Header().Children() + + common.PrintVerbose("Received split members from the linking object: %v", children) + + // include linking object + return append(children, idLinking) + } + + if idSplit := splitInfo.SplitID(); idSplit != nil { + common.PrintVerbose("Collecting split members by split ID...") + + var query object.SearchFilters + query.AddSplitIDFilter(object.MatchStringEqual, idSplit) + + var prm internal.SearchObjectsPrm + prm.SetContainerID(cnr) + prm.SetClient(cli) + prm.SetFilters(query) + + res, err := internal.SearchObjects(prm) + common.ExitOnErr(cmd, "failed to search objects by split ID: %w", err) + + members := res.IDList() + + common.PrintVerbose("Found objects by split ID: %v", res.IDList()) + + return members + } + + idMember, ok := splitInfo.LastPart() + if !ok { + common.ExitOnErr(cmd, "", errors.New("missing any data in received object split information")) + } + + common.PrintVerbose("Traverse the object split chain in reverse...", idMember) + + var res *internal.HeadObjectRes + chain := []oid.ID{idMember} + chainSet := map[oid.ID]struct{}{idMember: {}} + + prmHead.SetRawFlag(false) + // split members are almost definitely singular, but don't get hung up on it + + for { + common.PrintVerbose("Reading previous element of the split chain member %s...", idMember) + + addrObj.SetObject(idMember) + + res, err = internal.HeadObject(prmHead) + common.ExitOnErr(cmd, "failed to read split chain member's header: %w", err) + + idMember, ok = res.Header().PreviousID() + if !ok { + common.PrintVerbose("Chain ended.") + break + } + + if _, ok = chainSet[idMember]; ok { + common.ExitOnErr(cmd, "", fmt.Errorf("duplicated member in the split chain %s", idMember)) + } + + chain = append(chain, idMember) + chainSet[idMember] = struct{}{} + } + + common.PrintVerbose("Looking for a linking object...") + + var query object.SearchFilters + query.AddParentIDFilter(object.MatchStringEqual, obj) + + var prmSearch internal.SearchObjectsPrm + prmSearch.SetClient(cli) + prmSearch.SetContainerID(cnr) + prmSearch.SetFilters(query) + + resSearch, err := internal.SearchObjects(prmSearch) + common.ExitOnErr(cmd, "failed to find object children: %w", err) + + list := resSearch.IDList() + + for i := range list { + if _, ok = chainSet[list[i]]; !ok { + common.PrintVerbose("Found one more related object %s.", list[i]) + chain = append(chain, list[i]) + } + } + + return chain +}