Compare commits

...

2 commits

Author SHA1 Message Date
Denis Kirillov
8c524047f6 [#308] audit: Add CollectMembers function
Add new function to build storage group using OIDs of virtual objects.

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
2023-01-12 16:27:06 +03:00
Denis Kirillov
813f0c0646 [#308] object/relations: List relations in split order
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
2023-01-12 16:24:42 +03:00
3 changed files with 131 additions and 65 deletions

76
audit/collect.go Normal file
View file

@ -0,0 +1,76 @@
package audit
import (
"context"
"fmt"
"github.com/TrueCloudLab/frostfs-sdk-go/checksum"
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/TrueCloudLab/frostfs-sdk-go/object"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/TrueCloudLab/frostfs-sdk-go/object/relations"
"github.com/TrueCloudLab/frostfs-sdk-go/storagegroup"
"github.com/TrueCloudLab/tzhash/tz"
)
type Collector interface {
Head(ctx context.Context, addr oid.Address) (*object.Object, error)
relations.Relations
}
// CollectMembers creates new storage group structure and fills it
// with information about members collected via HeadReceiver.
//
// Resulting storage group consists of physically stored objects only.
func CollectMembers(ctx context.Context, collector Collector, cnr cid.ID, members []oid.ID, tokens relations.Tokens, calcHomoHash bool) (*storagegroup.StorageGroup, error) {
var (
err error
sumPhySize uint64
phyMembers []oid.ID
phyHashes [][]byte
addr oid.Address
sg storagegroup.StorageGroup
)
addr.SetContainer(cnr)
for i := range members {
if phyMembers, err = relations.ListRelations(ctx, collector, cnr, members[i], tokens, false); err != nil {
return nil, err
}
for _, phyMember := range phyMembers {
addr.SetObject(phyMember)
leaf, err := collector.Head(ctx, addr)
if err != nil {
return nil, fmt.Errorf("head phy member '%s': %w", phyMember.EncodeToString(), err)
}
sumPhySize += leaf.PayloadSize()
cs, _ := leaf.PayloadHomomorphicHash()
if calcHomoHash {
phyHashes = append(phyHashes, cs.Value())
}
}
}
sg.SetMembers(phyMembers)
sg.SetValidationDataSize(sumPhySize)
if calcHomoHash {
sumHash, err := tz.Concat(phyHashes)
if err != nil {
return nil, err
}
var cs checksum.Checksum
tzHash := [64]byte{}
copy(tzHash[:], sumHash)
cs.SetTillichZemor(tzHash)
sg.SetValidationDataHash(cs)
}
return &sg, nil
}

View file

@ -19,8 +19,9 @@ type Tokens struct {
}
type Relations interface {
// GetSplitInfo tries to get split info by root object id.
// If object isn't virtual it returns ErrNoSplitInfo.
// GetSplitInfo tries to get split info by some object id.
// This method must return split info on any object from split chain as well as on parent/linking object.
// If object doesn't have any split information returns ErrNoSplitInfo.
GetSplitInfo(ctx context.Context, cnrID cid.ID, rootID oid.ID, tokens Tokens) (*object.SplitInfo, error)
// ListChildrenByLinker returns list of children for link object.
@ -31,9 +32,6 @@ type Relations interface {
// If no previous object it returns ErrNoLeftSibling.
GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens Tokens) (oid.ID, error)
// FindSiblingBySplitID returns all objects that relates to the provided split id.
FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens Tokens) ([]oid.ID, error)
// FindSiblingByParentID returns all object that relates to the provided parent id.
FindSiblingByParentID(ctx context.Context, cnrID cid.ID, parentID oid.ID, tokens Tokens) ([]oid.ID, error)
}
@ -46,9 +44,15 @@ var (
ErrNoSplitInfo = errors.New("no split info")
)
// ListAllRelations return all related phy objects for provided root object ID.
// Result doesn't include root object ID itself.
// ListAllRelations return all related phy objects for provided root object ID in split-chain order.
// Result doesn't include root object ID itself. If linking object is found its id will be the last one.
func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens) ([]oid.ID, error) {
return ListRelations(ctx, rels, cnrID, rootObjID, tokens, true)
}
// ListRelations return all related phy objects for provided root object ID in split-chain order.
// Result doesn't include root object ID itself.
func ListRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens, includeLinking bool) ([]oid.ID, error) {
splitInfo, err := rels.GetSplitInfo(ctx, cnrID, rootObjID, tokens)
if err != nil {
if errors.Is(err, ErrNoSplitInfo) {
@ -59,22 +63,40 @@ func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObj
// collect split chain by the descending ease of operations (ease is evaluated heuristically).
// If any approach fails, we don't try the next since we assume that it will fail too.
if _, ok := splitInfo.Link(); !ok {
// the list is expected to contain last part and (probably) split info
list, err := rels.FindSiblingByParentID(ctx, cnrID, rootObjID, tokens)
if err != nil {
return nil, fmt.Errorf("failed to find object children: %w", err)
}
for _, id := range list {
split, err := rels.GetSplitInfo(ctx, cnrID, id, tokens)
if err != nil {
if errors.Is(err, ErrNoSplitInfo) {
continue
}
return nil, fmt.Errorf("failed to get split info: %w", err)
}
if link, ok := split.Link(); ok {
splitInfo.SetLink(link)
}
if last, ok := split.LastPart(); ok {
splitInfo.SetLastPart(last)
}
}
}
if idLinking, ok := splitInfo.Link(); ok {
children, err := rels.ListChildrenByLinker(ctx, cnrID, idLinking, tokens)
if err != nil {
return nil, fmt.Errorf("failed to get linking object's header: %w", err)
}
// include linking object
return append(children, idLinking), nil
}
if idSplit := splitInfo.SplitID(); idSplit != nil {
members, err := rels.FindSiblingBySplitID(ctx, cnrID, idSplit, tokens)
if err != nil {
return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
if includeLinking {
children = append(children, idLinking)
}
return members, nil
return children, nil
}
idMember, ok := splitInfo.LastPart()
@ -85,9 +107,6 @@ func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObj
chain := []oid.ID{idMember}
chainSet := map[oid.ID]struct{}{idMember: {}}
// prmHead.SetRawFlag(false)
// split members are almost definitely singular, but don't get hung up on it
for {
idMember, err = rels.GetLeftSibling(ctx, cnrID, idMember, tokens)
if err != nil {
@ -101,20 +120,9 @@ func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObj
return nil, fmt.Errorf("duplicated member in the split chain %s", idMember)
}
chain = append(chain, idMember)
chain = append([]oid.ID{idMember}, chain...)
chainSet[idMember] = struct{}{}
}
list, err := rels.FindSiblingByParentID(ctx, cnrID, rootObjID, tokens)
if err != nil {
return nil, fmt.Errorf("failed to find object children: %w", err)
}
for i := range list {
if _, ok = chainSet[list[i]]; !ok {
chain = append(chain, list[i])
}
}
return chain, nil
}

View file

@ -2538,7 +2538,7 @@ func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tok
}
prm.MarkRaw()
_, err := p.HeadObject(ctx, prm)
res, err := p.HeadObject(ctx, prm)
var errSplit *object.SplitInfoError
@ -2546,7 +2546,21 @@ func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tok
case errors.As(err, &errSplit):
return errSplit.SplitInfo(), nil
case err == nil:
return nil, relations.ErrNoSplitInfo
if res.SplitID() == nil {
return nil, relations.ErrNoSplitInfo
}
splitInfo := object.NewSplitInfo()
splitInfo.SetSplitID(res.SplitID())
if res.HasParent() {
if len(res.Children()) > 0 {
splitInfo.SetLink(objID)
} else {
splitInfo.SetLastPart(objID)
}
}
return splitInfo, nil
default:
return nil, fmt.Errorf("failed to get raw object header: %w", err)
}
@ -2602,38 +2616,6 @@ func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, t
return idMember, nil
}
// FindSiblingBySplitID implements relations.Relations.
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
var query object.SearchFilters
query.AddSplitIDFilter(object.MatchStringEqual, splitID)
var prm PrmObjectSearch
prm.SetContainerID(cnrID)
prm.SetFilters(query)
if tokens.Bearer != nil {
prm.UseBearer(*tokens.Bearer)
}
if tokens.Session != nil {
prm.UseSession(*tokens.Session)
}
res, err := p.SearchObjects(ctx, prm)
if err != nil {
return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
}
var members []oid.ID
err = res.Iterate(func(id oid.ID) bool {
members = append(members, id)
return false
})
if err != nil {
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
}
return members, nil
}
// FindSiblingByParentID implements relations.Relations.
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
var query object.SearchFilters