[#1978] cli/object: Gather all related object in delete session

Object removal session should reflect all objects related to the
removing one.

Make `OpenSessionViaClient` to gather the split members of the original
object in order to spread the session to them.

Signed-off-by: Leonard Lyubich <ctulhurider@gmail.com>
support/v0.34
Leonard Lyubich 2022-10-27 10:49:56 +04:00 committed by fyrchik
parent 7653a1f626
commit 98ac525272
2 changed files with 171 additions and 7 deletions

View File

@ -28,6 +28,7 @@ Changelog for NeoFS Node
- Closing a shard now waits until GC background workers stop (#1964)
- Make it possible to use `shard.ContainerSize` in read-only mode (#1975)
- Storage node now starts if at least one gRPC enpoint is available (#1893)
- Missing object relatives in object removal session opened by NeoFS CLI (#1978)
### Removed
### Updated

View File

@ -14,6 +14,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/spf13/cobra"
@ -204,7 +205,12 @@ func ReadOrOpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.
return
}
finalizeSession(cmd, dst, tok, key, cnr, obj)
var objs []oid.ID
if obj != nil {
objs = []oid.ID{*obj}
}
finalizeSession(cmd, dst, tok, key, cnr, objs...)
dst.SetClient(cli)
}
@ -222,7 +228,26 @@ func OpenSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.PrivateKey, cnr
//
// *internal.PutObjectPrm
// *internal.DeleteObjectPrm
//
// If provided SessionPrm is of type internal.DeleteObjectPrm, OpenSessionViaClient
// spreads the session to all object's relatives.
func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client, key *ecdsa.PrivateKey, cnr cid.ID, obj *oid.ID) {
var objs []oid.ID
if obj != nil {
if _, ok := dst.(*internal.DeleteObjectPrm); ok {
common.PrintVerbose("Collecting relatives of the removal object...")
rels := collectObjectRelatives(cmd, cli, cnr, *obj)
if len(rels) == 0 {
objs = []oid.ID{*obj}
} else {
objs = append(rels, *obj)
}
}
}
var tok session.Object
const sessionLifetime = 10 // in NeoFS epochs
@ -234,13 +259,13 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
common.PrintVerbose("Session successfully opened.")
finalizeSession(cmd, dst, &tok, key, cnr, obj)
finalizeSession(cmd, dst, &tok, key, cnr, objs...)
dst.SetClient(cli)
}
// specifies session verb, binds the session to the given container and limits
// the session by the given object (if specified). After all data is written,
// the session by the given objects (if specified). After all data is written,
// signs session using provided private key and writes the session into the
// given SessionPrm.
//
@ -248,7 +273,7 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
//
// *internal.PutObjectPrm
// *internal.DeleteObjectPrm
func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, key *ecdsa.PrivateKey, cnr cid.ID, obj *oid.ID) {
func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, key *ecdsa.PrivateKey, cnr cid.ID, objs ...oid.ID) {
common.PrintVerbose("Finalizing session token...")
switch dst.(type) {
@ -265,9 +290,9 @@ func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, ke
common.PrintVerbose("Binding session to container %s...", cnr)
tok.BindContainer(cnr)
if obj != nil {
common.PrintVerbose("Limiting session by object %s...", obj)
tok.LimitByObjects(*obj)
if len(objs) > 0 {
common.PrintVerbose("Limiting session by the objects %v...", objs)
tok.LimitByObjects(objs...)
}
common.PrintVerbose("Signing session...")
@ -284,3 +309,141 @@ func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, ke
func initFlagSession(cmd *cobra.Command, verb string) {
commonflags.InitSession(cmd, "object "+verb)
}
// collects and returns all relatives of the given object stored in the specified
// container. Empty result without an error means lack of relationship in the
// container.
//
// The object itself is not included in the result.
func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, obj oid.ID) []oid.ID {
common.PrintVerbose("Fetching raw object header...")
// request raw header first
var addrObj oid.Address
addrObj.SetContainer(cnr)
addrObj.SetObject(obj)
var prmHead internal.HeadObjectPrm
prmHead.SetClient(cli)
prmHead.SetAddress(addrObj)
prmHead.SetRawFlag(true)
_, err := internal.HeadObject(prmHead)
var errSplit *object.SplitInfoError
switch {
default:
common.ExitOnErr(cmd, "failed to get raw object header: %w", err)
case err == nil:
common.PrintVerbose("Raw header received - object is singular.")
return nil
case errors.As(err, &errSplit):
common.PrintVerbose("Split information received - object is virtual.")
}
splitInfo := errSplit.SplitInfo()
// collect split chain by the descending ease of operations (ease is evaluated heuristically).
// If any approach fails, we don't try the next since we assume that it will fail too.
if idLinking, ok := splitInfo.Link(); ok {
common.PrintVerbose("Collecting split members using linking object %s...", idLinking)
addrObj.SetObject(idLinking)
prmHead.SetAddress(addrObj)
prmHead.SetRawFlag(false)
// client is already set
res, err := internal.HeadObject(prmHead)
common.ExitOnErr(cmd, "failed to get linking object's header: %w", err)
children := res.Header().Children()
common.PrintVerbose("Received split members from the linking object: %v", children)
// include linking object
return append(children, idLinking)
}
if idSplit := splitInfo.SplitID(); idSplit != nil {
common.PrintVerbose("Collecting split members by split ID...")
var query object.SearchFilters
query.AddSplitIDFilter(object.MatchStringEqual, idSplit)
var prm internal.SearchObjectsPrm
prm.SetContainerID(cnr)
prm.SetClient(cli)
prm.SetFilters(query)
res, err := internal.SearchObjects(prm)
common.ExitOnErr(cmd, "failed to search objects by split ID: %w", err)
members := res.IDList()
common.PrintVerbose("Found objects by split ID: %v", res.IDList())
return members
}
idMember, ok := splitInfo.LastPart()
if !ok {
common.ExitOnErr(cmd, "", errors.New("missing any data in received object split information"))
}
common.PrintVerbose("Traverse the object split chain in reverse...", idMember)
var res *internal.HeadObjectRes
chain := []oid.ID{idMember}
chainSet := map[oid.ID]struct{}{idMember: {}}
prmHead.SetRawFlag(false)
// split members are almost definitely singular, but don't get hung up on it
for {
common.PrintVerbose("Reading previous element of the split chain member %s...", idMember)
addrObj.SetObject(idMember)
res, err = internal.HeadObject(prmHead)
common.ExitOnErr(cmd, "failed to read split chain member's header: %w", err)
idMember, ok = res.Header().PreviousID()
if !ok {
common.PrintVerbose("Chain ended.")
break
}
if _, ok = chainSet[idMember]; ok {
common.ExitOnErr(cmd, "", fmt.Errorf("duplicated member in the split chain %s", idMember))
}
chain = append(chain, idMember)
chainSet[idMember] = struct{}{}
}
common.PrintVerbose("Looking for a linking object...")
var query object.SearchFilters
query.AddParentIDFilter(object.MatchStringEqual, obj)
var prmSearch internal.SearchObjectsPrm
prmSearch.SetClient(cli)
prmSearch.SetContainerID(cnr)
prmSearch.SetFilters(query)
resSearch, err := internal.SearchObjects(prmSearch)
common.ExitOnErr(cmd, "failed to find object children: %w", err)
list := resSearch.IDList()
for i := range list {
if _, ok = chainSet[list[i]]; !ok {
common.PrintVerbose("Found one more related object %s.", list[i])
chain = append(chain, list[i])
}
}
return chain
}