forked from TrueCloudLab/frostfs-sdk-go
[#360] object: Add new package relations
Package relations provides feature to process inner object structure. Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
1cacf472a3
commit
d047289182
3 changed files with 277 additions and 151 deletions
7
object/relations/doc.go
Normal file
7
object/relations/doc.go
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
/*
|
||||||
|
Package relations provides feature to process inner object structure.
|
||||||
|
|
||||||
|
Relations is an interface of entity that can receive object header or
|
||||||
|
the information about the object relations.
|
||||||
|
*/
|
||||||
|
package relations
|
120
object/relations/relations.go
Normal file
120
object/relations/relations.go
Normal file
|
@ -0,0 +1,120 @@
|
||||||
|
package relations
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/bearer"
|
||||||
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/session"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Tokens contains different tokens to perform requests in Relations implementations.
|
||||||
|
type Tokens struct {
|
||||||
|
Session *session.Object
|
||||||
|
Bearer *bearer.Token
|
||||||
|
}
|
||||||
|
|
||||||
|
type Relations interface {
|
||||||
|
// GetSplitInfo tries to get split info by root object id.
|
||||||
|
// If object isn't virtual it returns ErrNoSplitInfo.
|
||||||
|
GetSplitInfo(ctx context.Context, cnrID cid.ID, rootID oid.ID, tokens Tokens) (*object.SplitInfo, error)
|
||||||
|
|
||||||
|
// ListChildrenByLinker returns list of children for link object.
|
||||||
|
// Result doesn't include link object itself.
|
||||||
|
ListChildrenByLinker(ctx context.Context, cnrID cid.ID, linkerID oid.ID, tokens Tokens) ([]oid.ID, error)
|
||||||
|
|
||||||
|
// GetLeftSibling return previous object id in object chain.
|
||||||
|
// If no previous object it returns ErrNoLeftSibling.
|
||||||
|
GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens Tokens) (oid.ID, error)
|
||||||
|
|
||||||
|
// FindSiblingBySplitID returns all objects that relates to the provided split id.
|
||||||
|
FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens Tokens) ([]oid.ID, error)
|
||||||
|
|
||||||
|
// FindSiblingByParentID returns all object that relates to the provided parent id.
|
||||||
|
FindSiblingByParentID(ctx context.Context, cnrID cid.ID, parentID oid.ID, tokens Tokens) ([]oid.ID, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrNoLeftSibling is an error that must be returned if object doesn't have left sibling in objects chain.
|
||||||
|
ErrNoLeftSibling = errors.New("no left siblings")
|
||||||
|
|
||||||
|
// ErrNoSplitInfo is an error that must be returned if requested object isn't virtual.
|
||||||
|
ErrNoSplitInfo = errors.New("no split info")
|
||||||
|
)
|
||||||
|
|
||||||
|
// ListAllRelations return all related phy objects for provided root object ID.
|
||||||
|
// Result doesn't include root object ID itself.
|
||||||
|
func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens) ([]oid.ID, error) {
|
||||||
|
splitInfo, err := rels.GetSplitInfo(ctx, cnrID, rootObjID, tokens)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrNoSplitInfo) {
|
||||||
|
return []oid.ID{}, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect split chain by the descending ease of operations (ease is evaluated heuristically).
|
||||||
|
// If any approach fails, we don't try the next since we assume that it will fail too.
|
||||||
|
if idLinking, ok := splitInfo.Link(); ok {
|
||||||
|
children, err := rels.ListChildrenByLinker(ctx, cnrID, idLinking, tokens)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get linking object's header: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// include linking object
|
||||||
|
return append(children, idLinking), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if idSplit := splitInfo.SplitID(); idSplit != nil {
|
||||||
|
members, err := rels.FindSiblingBySplitID(ctx, cnrID, idSplit, tokens)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
|
||||||
|
}
|
||||||
|
return members, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
idMember, ok := splitInfo.LastPart()
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("missing any data in received object split information")
|
||||||
|
}
|
||||||
|
|
||||||
|
chain := []oid.ID{idMember}
|
||||||
|
chainSet := map[oid.ID]struct{}{idMember: {}}
|
||||||
|
|
||||||
|
// prmHead.SetRawFlag(false)
|
||||||
|
// split members are almost definitely singular, but don't get hung up on it
|
||||||
|
|
||||||
|
for {
|
||||||
|
idMember, err = rels.GetLeftSibling(ctx, cnrID, idMember, tokens)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrNoLeftSibling) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("failed to read split chain member's header: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok = chainSet[idMember]; ok {
|
||||||
|
return nil, fmt.Errorf("duplicated member in the split chain %s", idMember)
|
||||||
|
}
|
||||||
|
|
||||||
|
chain = append(chain, idMember)
|
||||||
|
chainSet[idMember] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
list, err := rels.FindSiblingByParentID(ctx, cnrID, rootObjID, tokens)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to find object children: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range list {
|
||||||
|
if _, ok = chainSet[list[i]]; !ok {
|
||||||
|
chain = append(chain, list[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return chain, nil
|
||||||
|
}
|
301
pool/pool.go
301
pool/pool.go
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/object/relations"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/session"
|
"github.com/nspcc-dev/neofs-sdk-go/session"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
|
@ -2007,9 +2008,11 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
||||||
prmCtx.useVerb(session.VerbObjectDelete)
|
prmCtx.useVerb(session.VerbObjectDelete)
|
||||||
prmCtx.useAddress(prm.addr)
|
prmCtx.useAddress(prm.addr)
|
||||||
|
|
||||||
if prm.stoken == nil {
|
if prm.stoken == nil { // collect phy objects only if we are about to open default session
|
||||||
// collect phy objects only if we are about to open default session
|
var tokens relations.Tokens
|
||||||
relatives, err := p.collectObjectRelatives(ctx, prm.addr.Container(), prm.addr.Object(), prm.btoken)
|
tokens.Bearer = prm.btoken
|
||||||
|
|
||||||
|
relatives, err := relations.ListAllRelations(ctx, p, prm.addr.Container(), prm.addr.Object(), tokens)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to collect relatives: %w", err)
|
return fmt.Errorf("failed to collect relatives: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -2041,154 +2044,6 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) collectObjectRelatives(ctx context.Context, cnr cid.ID, obj oid.ID, btoken *bearer.Token) ([]oid.ID, error) {
|
|
||||||
var addrObj oid.Address
|
|
||||||
addrObj.SetContainer(cnr)
|
|
||||||
addrObj.SetObject(obj)
|
|
||||||
|
|
||||||
var prmHead PrmObjectHead
|
|
||||||
prmHead.SetAddress(addrObj)
|
|
||||||
if btoken != nil {
|
|
||||||
prmHead.UseBearer(*btoken)
|
|
||||||
}
|
|
||||||
prmHead.MarkRaw()
|
|
||||||
|
|
||||||
_, err := p.HeadObject(ctx, prmHead)
|
|
||||||
|
|
||||||
var errSplit *object.SplitInfoError
|
|
||||||
|
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("failed to get raw object header: %w", err)
|
|
||||||
case err == nil:
|
|
||||||
return nil, nil
|
|
||||||
case errors.As(err, &errSplit):
|
|
||||||
}
|
|
||||||
|
|
||||||
splitInfo := errSplit.SplitInfo()
|
|
||||||
|
|
||||||
// collect split chain by the descending ease of operations (ease is evaluated heuristically).
|
|
||||||
// If any approach fails, we don't try the next since we assume that it will fail too.
|
|
||||||
|
|
||||||
if idLinking, ok := splitInfo.Link(); ok {
|
|
||||||
addrObj = oid.Address{}
|
|
||||||
addrObj.SetContainer(cnr)
|
|
||||||
addrObj.SetObject(idLinking)
|
|
||||||
|
|
||||||
prmHead = PrmObjectHead{}
|
|
||||||
prmHead.SetAddress(addrObj)
|
|
||||||
if btoken != nil {
|
|
||||||
prmHead.UseBearer(*btoken)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := p.HeadObject(ctx, prmHead)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to get linking object's header: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
children := res.Children()
|
|
||||||
|
|
||||||
// include linking object
|
|
||||||
return append(children, idLinking), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if idSplit := splitInfo.SplitID(); idSplit != nil {
|
|
||||||
var query object.SearchFilters
|
|
||||||
query.AddSplitIDFilter(object.MatchStringEqual, idSplit)
|
|
||||||
|
|
||||||
var prm PrmObjectSearch
|
|
||||||
prm.SetContainerID(cnr)
|
|
||||||
prm.SetFilters(query)
|
|
||||||
if btoken != nil {
|
|
||||||
prm.UseBearer(*btoken)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := p.SearchObjects(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var members []oid.ID
|
|
||||||
err = res.Iterate(func(id oid.ID) bool {
|
|
||||||
members = append(members, id)
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return members, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
idMember, ok := splitInfo.LastPart()
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("missing any data in received object split information")
|
|
||||||
}
|
|
||||||
|
|
||||||
var res object.Object
|
|
||||||
chain := []oid.ID{idMember}
|
|
||||||
chainSet := map[oid.ID]struct{}{idMember: {}}
|
|
||||||
|
|
||||||
addrObj = oid.Address{}
|
|
||||||
addrObj.SetContainer(cnr)
|
|
||||||
|
|
||||||
for {
|
|
||||||
addrObj.SetObject(idMember)
|
|
||||||
|
|
||||||
prmHead = PrmObjectHead{}
|
|
||||||
prmHead.SetAddress(addrObj)
|
|
||||||
if btoken != nil {
|
|
||||||
prmHead.UseBearer(*btoken)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err = p.HeadObject(ctx, prmHead)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to read split chain member's header: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
idMember, ok = res.PreviousID()
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, ok = chainSet[idMember]; ok {
|
|
||||||
return nil, fmt.Errorf("duplicated member in the split chain: %s", idMember)
|
|
||||||
}
|
|
||||||
|
|
||||||
chain = append(chain, idMember)
|
|
||||||
chainSet[idMember] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Looking for a linking object
|
|
||||||
|
|
||||||
var query object.SearchFilters
|
|
||||||
query.AddParentIDFilter(object.MatchStringEqual, obj)
|
|
||||||
|
|
||||||
var prm PrmObjectSearch
|
|
||||||
prm.SetContainerID(cnr)
|
|
||||||
prm.SetFilters(query)
|
|
||||||
if btoken != nil {
|
|
||||||
prm.UseBearer(*btoken)
|
|
||||||
}
|
|
||||||
|
|
||||||
resSearch, err := p.SearchObjects(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to find object children: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = resSearch.Iterate(func(id oid.ID) bool {
|
|
||||||
if _, ok = chainSet[id]; !ok {
|
|
||||||
chain = append(chain, id)
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return chain, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type objectReadCloser struct {
|
type objectReadCloser struct {
|
||||||
reader *sdkClient.ObjectReader
|
reader *sdkClient.ObjectReader
|
||||||
elapsedTimeCallback func(time.Duration)
|
elapsedTimeCallback func(time.Duration)
|
||||||
|
@ -2613,3 +2468,147 @@ func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetSplitInfo implements relations.Relations.
|
||||||
|
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(cnrID)
|
||||||
|
addr.SetObject(objID)
|
||||||
|
|
||||||
|
var prm PrmObjectHead
|
||||||
|
prm.SetAddress(addr)
|
||||||
|
if tokens.Bearer != nil {
|
||||||
|
prm.UseBearer(*tokens.Bearer)
|
||||||
|
}
|
||||||
|
if tokens.Session != nil {
|
||||||
|
prm.UseSession(*tokens.Session)
|
||||||
|
}
|
||||||
|
prm.MarkRaw()
|
||||||
|
|
||||||
|
_, err := p.HeadObject(ctx, prm)
|
||||||
|
|
||||||
|
var errSplit *object.SplitInfoError
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case errors.As(err, &errSplit):
|
||||||
|
return errSplit.SplitInfo(), nil
|
||||||
|
case err == nil:
|
||||||
|
return nil, relations.ErrNoSplitInfo
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("failed to get raw object header: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListChildrenByLinker implements relations.Relations.
|
||||||
|
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(cnrID)
|
||||||
|
addr.SetObject(objID)
|
||||||
|
|
||||||
|
var prm PrmObjectHead
|
||||||
|
prm.SetAddress(addr)
|
||||||
|
if tokens.Bearer != nil {
|
||||||
|
prm.UseBearer(*tokens.Bearer)
|
||||||
|
}
|
||||||
|
if tokens.Session != nil {
|
||||||
|
prm.UseSession(*tokens.Session)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := p.HeadObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get linking object's header: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Children(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLeftSibling implements relations.Relations.
|
||||||
|
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(cnrID)
|
||||||
|
addr.SetObject(objID)
|
||||||
|
|
||||||
|
var prm PrmObjectHead
|
||||||
|
prm.SetAddress(addr)
|
||||||
|
if tokens.Bearer != nil {
|
||||||
|
prm.UseBearer(*tokens.Bearer)
|
||||||
|
}
|
||||||
|
if tokens.Session != nil {
|
||||||
|
prm.UseSession(*tokens.Session)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := p.HeadObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return oid.ID{}, fmt.Errorf("failed to read split chain member's header: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
idMember, ok := res.PreviousID()
|
||||||
|
if !ok {
|
||||||
|
return oid.ID{}, relations.ErrNoLeftSibling
|
||||||
|
}
|
||||||
|
return idMember, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindSiblingBySplitID implements relations.Relations.
|
||||||
|
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
|
var query object.SearchFilters
|
||||||
|
query.AddSplitIDFilter(object.MatchStringEqual, splitID)
|
||||||
|
|
||||||
|
var prm PrmObjectSearch
|
||||||
|
prm.SetContainerID(cnrID)
|
||||||
|
prm.SetFilters(query)
|
||||||
|
if tokens.Bearer != nil {
|
||||||
|
prm.UseBearer(*tokens.Bearer)
|
||||||
|
}
|
||||||
|
if tokens.Session != nil {
|
||||||
|
prm.UseSession(*tokens.Session)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := p.SearchObjects(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var members []oid.ID
|
||||||
|
err = res.Iterate(func(id oid.ID) bool {
|
||||||
|
members = append(members, id)
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return members, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindSiblingByParentID implements relations.Relations.
|
||||||
|
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
|
var query object.SearchFilters
|
||||||
|
query.AddParentIDFilter(object.MatchStringEqual, objID)
|
||||||
|
|
||||||
|
var prm PrmObjectSearch
|
||||||
|
prm.SetContainerID(cnrID)
|
||||||
|
prm.SetFilters(query)
|
||||||
|
if tokens.Bearer != nil {
|
||||||
|
prm.UseBearer(*tokens.Bearer)
|
||||||
|
}
|
||||||
|
if tokens.Session != nil {
|
||||||
|
prm.UseSession(*tokens.Session)
|
||||||
|
}
|
||||||
|
|
||||||
|
resSearch, err := p.SearchObjects(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to find object children: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var res []oid.ID
|
||||||
|
err = resSearch.Iterate(func(id oid.ID) bool {
|
||||||
|
res = append(res, id)
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue