[#3] Integration with the tree service

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
This commit is contained in:
Roman Loginov 2024-03-27 13:46:05 +03:00
parent 42eafd1fff
commit 2d5b9b5ca2
5 changed files with 817 additions and 292 deletions

View file

@ -83,7 +83,7 @@ func newSizeLimiterWriter(ctx context.Context, d *driver, path string, splitInfo
previous: formPreviousChain(splitInfo, parts),
parentHashers: parentHashers,
targetInit: func() transformer.ChunkedObjectWriter {
return d.newObjTarget()
return d.newObjTarget(path, splitInfo.SplitID())
},
parent: parent,
}
@ -233,6 +233,11 @@ func (w *writer) release(withParent bool) (*transformer.AccessIdentifiers, error
if _, err = w.release(false); err != nil {
return nil, fmt.Errorf("could not release linking object: %w", err)
}
err = w.driver.treeService.DeleteObjectsBySplitID(w.ctx, w.driver.containerID, w.path, w.splitInfo.SplitID())
if err != nil {
return nil, fmt.Errorf("could not delete split objects: %w", err)
}
}
return ids, nil
@ -459,5 +464,10 @@ func (w *writer) deleteParts() error {
}
}
err := w.driver.treeService.DeleteObjectsBySplitID(w.ctx, w.driver.containerID, w.path, w.splitInfo.SplitID())
if err != nil {
return fmt.Errorf("could not delete split objects: %w", err)
}
return nil
}

View file

@ -3,9 +3,11 @@ package frostfs
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
@ -18,12 +20,15 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
dcontext "github.com/distribution/distribution/v3/context"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/base"
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree"
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/wrapper"
"github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/pkg/wallet"
)
@ -56,6 +61,11 @@ const (
defaultSessionExpirationDuration = 100 // in epoch
)
var (
filePathValidateRegex = regexp.MustCompile(`^/([^/]+/)*[^/]+$`)
prefixPathValidateRegex = regexp.MustCompile(`^/([^/]+/?)*$`)
)
// DriverParameters is a struct that encapsulates all of the driver parameters after all values have been set.
type DriverParameters struct {
ContainerID string
@ -94,6 +104,7 @@ func (n *frostfsDriverFactory) Create(parameters map[string]interface{}) (storag
type driver struct {
sdkPool *pool.Pool
treeService *tree.Tree
owner *user.ID
key *ecdsa.PrivateKey
containerID cid.ID
@ -306,7 +317,7 @@ func New(params DriverParameters) (*Driver, error) {
var owner user.ID
user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey)
sdkPool, err := createPool(ctx, acc, params)
sdkPool, treePool, err := createPool(ctx, acc, params)
if err != nil {
return nil, fmt.Errorf("couldn't create sdk pool: %w", err)
}
@ -321,8 +332,11 @@ func New(params DriverParameters) (*Driver, error) {
return nil, fmt.Errorf("couldn't get container id: %w", err)
}
treeService := wrapper.NewPoolWrapper(treePool)
d := &driver{
sdkPool: sdkPool,
treeService: tree.NewTree(treeService, nil),
owner: &owner,
key: &acc.PrivateKey().PrivateKey,
containerID: cnrID,
@ -376,28 +390,49 @@ func getContainerID(params DriverParameters) (cid.ID, error) {
return cnrID, nil
}
func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, error) {
func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, *treepool.Pool, error) {
var prm pool.InitParameters
var prmTree treepool.InitParameters
prm.SetKey(&acc.PrivateKey().PrivateKey)
prmTree.SetKey(acc.PrivateKey())
prm.SetNodeDialTimeout(param.ConnectionTimeout)
prmTree.SetNodeDialTimeout(param.ConnectionTimeout)
prm.SetHealthcheckTimeout(param.RequestTimeout)
prmTree.SetHealthcheckTimeout(param.RequestTimeout)
prm.SetClientRebalanceInterval(param.RebalanceInterval)
prmTree.SetClientRebalanceInterval(param.RebalanceInterval)
prm.SetSessionExpirationDuration(param.SessionExpirationDuration)
for _, peer := range param.Peers {
prm.AddNode(pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight))
nodeParam := pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight)
prm.AddNode(nodeParam)
prmTree.AddNode(nodeParam)
}
p, err := pool.NewPool(prm)
if err != nil {
return nil, fmt.Errorf("create pool: %w", err)
return nil, nil, fmt.Errorf("create pool: %w", err)
}
if err = p.Dial(ctx); err != nil {
return nil, fmt.Errorf("dial pool: %w", err)
return nil, nil, fmt.Errorf("dial pool: %w", err)
}
return p, nil
treePool, err := treepool.NewPool(prmTree)
if err != nil {
return nil, nil, fmt.Errorf("create tree pool: %w", err)
}
if err = treePool.Dial(ctx); err != nil {
return nil, nil, fmt.Errorf("dial tree pool: %w", err)
}
return p, treePool, nil
}
func createNnsResolver(params DriverParameters) (*resolver.NNS, error) {
@ -468,23 +503,34 @@ func (d *driver) Name() string {
}
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
id, err := d.searchOne(ctx, path)
if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
if err != nil {
return nil, err
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
}
var prm pool.PrmObjectGet
prm.SetAddress(d.objectAddress(id))
prm.SetAddress(d.objectAddress(treeObj.ObjID))
obj, err := d.sdkPool.GetObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("couldn't get object '%s': %w", id, err)
return nil, fmt.Errorf("couldn't get object '%s': %w", treeObj.ObjID, err)
}
return io.ReadAll(obj.Payload)
}
func (d *driver) PutContent(ctx context.Context, path string, content []byte) error {
if ok := validateFilePath(path); !ok {
return storagedriver.InvalidPathError{Path: path}
}
if err := d.Delete(ctx, path); err != nil {
return fmt.Errorf("couldn't delete '%s': %s", path, err)
}
@ -495,34 +541,35 @@ func (d *driver) PutContent(ctx context.Context, path string, content []byte) er
var prm pool.PrmObjectPut
prm.SetHeader(*obj)
if _, err := d.sdkPool.PutObject(ctx, prm); err != nil {
id, err := d.sdkPool.PutObject(ctx, prm)
if err != nil {
return fmt.Errorf("couldn't put object '%s': %w", path, err)
}
_, err = d.treeService.AddObject(ctx, d.containerID, path, id, uint64(len(content)))
if err != nil {
return fmt.Errorf("can't add object to tree: %w", err)
}
return nil
}
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
id, err := d.searchOne(ctx, path)
if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
}
addr := d.objectAddress(id)
addr := d.objectAddress(treeObj.ObjID)
var prmHead pool.PrmObjectHead
prmHead.SetAddress(addr)
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
return nil, fmt.Errorf("couldn't head object '%s', id '%s': %w", path, id, err)
if uint64(offset) >= treeObj.PayloadSize {
return nil, fmt.Errorf("invalid offset %d for object length %d", offset, treeObj.PayloadSize)
}
if uint64(offset) >= obj.PayloadSize() {
return nil, fmt.Errorf("invalid offset %d for object length %d", offset, obj.PayloadSize())
}
length := obj.PayloadSize() - uint64(offset)
length := treeObj.PayloadSize - uint64(offset)
var prmRange pool.PrmObjectRange
prmRange.SetAddress(addr)
@ -532,7 +579,7 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
res, err := d.sdkPool.ObjectRange(ctx, prmRange)
if err != nil {
return nil, fmt.Errorf("couldn't get payload range of object '%s', offset %d, length %d, id '%s': %w",
path, offset, length, id, err)
path, offset, length, treeObj.ObjID, err)
}
return &res, nil
@ -543,6 +590,10 @@ func getUploadUUID(ctx context.Context) (uuid string) {
}
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
splitID := object.NewSplitID()
uploadUUID := getUploadUUID(ctx)
if err := splitID.Parse(uploadUUID); err != nil {
@ -591,56 +642,42 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
return newFileInfoDir(path), nil
}
id, ok, err := d.searchOneBase(ctx, path, true)
if ok := validatePrefixPath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
path = preparePrefixPath(path)
treeObj, err := d.treeService.GetObjectByPathDir(ctx, d.containerID, path)
if err != nil {
return nil, err
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
} else if errors.Is(err, tree.ErrOnlyDirFound) {
return newFileInfoDir(path), nil
}
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
}
// assume there is not object with directory name
// e.g. if file '/a/b/c' exists, files '/a/b' and '/a' don't
if !ok {
return newFileInfoDir(path), nil
}
var prm pool.PrmObjectHead
prm.SetAddress(d.objectAddress(id))
obj, err := d.sdkPool.HeadObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("couldn't get head object '%s': %w", id, err)
}
fileInfo := newFileInfo(ctx, obj, "")
// e.g. search '/a/b' but because of prefix search we found '/a/b/c'
// so we should return directory
if fileInfo.Path() != path {
return newFileInfoDir(path), nil
}
fileInfo := newFileInfo(*treeObj, "")
return fileInfo, nil
}
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
ids, err := d.searchByPrefix(ctx, path)
if ok := validatePrefixPath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
path = preparePrefixPath(path)
treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path)
if err != nil {
return nil, fmt.Errorf("couldn't search by prefix '%s': %w", path, err)
return nil, fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err)
}
added := make(map[string]bool)
result := make([]string, 0, len(ids))
for _, id := range ids {
var prm pool.PrmObjectHead
prm.SetAddress(d.objectAddress(id))
obj, err := d.sdkPool.HeadObject(ctx, prm)
if err != nil {
dcontext.GetLogger(ctx).Warnf("couldn't get list object '%s' in path '%s': %s", id, path, err)
continue
}
fileInf := newFileInfo(ctx, obj, path)
result := make([]string, 0, len(treeObjList))
for _, treeObj := range treeObjList {
fileInf := newFileInfo(treeObj, path)
if !added[fileInf.Path()] {
result = append(result, fileInf.Path())
added[fileInf.Path()] = true
@ -652,106 +689,64 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) {
}
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
sourceID, err := d.searchOne(ctx, sourcePath)
if ok := validateFilePath(sourcePath); !ok {
return storagedriver.InvalidPathError{Path: sourcePath}
}
if ok := validateFilePath(destPath); !ok {
return storagedriver.InvalidPathError{Path: destPath}
}
sourceTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, sourcePath)
if err != nil {
return err
return fmt.Errorf("couldn't get object from tree: %w", err)
}
if err = d.Delete(ctx, destPath); err != nil {
return fmt.Errorf("couldn't delete '%s' object: %w", destPath, err)
}
sourceAddr := d.objectAddress(sourceID)
var prmGet pool.PrmObjectGet
prmGet.SetAddress(sourceAddr)
obj, err := d.sdkPool.GetObject(ctx, prmGet)
_, err = d.treeService.AddObject(ctx, d.containerID, destPath, sourceTreeObj.ObjID, sourceTreeObj.PayloadSize)
if err != nil {
return fmt.Errorf("could not get source object '%s' by oid '%s': %w", sourcePath, sourceID, err)
}
defer func() {
if err = obj.Payload.Close(); err != nil {
dcontext.GetLogger(ctx).Errorf("couldn't close object payload reader, path '%s' by oid '%s': %s",
sourcePath, sourceID, err.Error())
}
}()
objHeader := d.formObject(destPath)
var prmPut pool.PrmObjectPut
prmPut.SetHeader(*objHeader)
prmPut.SetPayload(obj.Payload)
if _, err = d.sdkPool.PutObject(ctx, prmPut); err != nil {
return fmt.Errorf("couldn't put object '%s': %w", destPath, err)
return fmt.Errorf("can't add object to tree: %w", err)
}
var prmDelete pool.PrmObjectDelete
prmDelete.SetAddress(sourceAddr)
if err = d.sdkPool.DeleteObject(ctx, prmDelete); err != nil {
return fmt.Errorf("couldn't remove source file '%s', id '%s': %w", sourcePath, sourceID, err)
if err = d.treeService.DeleteObject(ctx, d.containerID, sourceTreeObj.ID); err != nil {
return fmt.Errorf("couldn't delete object from tree: %w", err)
}
return nil
}
func (d *driver) Delete(ctx context.Context, path string) error {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
if ok := validatePrefixPath(path); !ok {
return storagedriver.InvalidPathError{Path: path}
}
path = preparePrefixPath(path)
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(d.containerID)
prmSearch.SetFilters(filters)
res, err := d.sdkPool.SearchObjects(ctx, prmSearch)
treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path)
if err != nil {
return fmt.Errorf("init searching using client: %w", err)
return fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err)
}
defer res.Close()
var inErr error
err = res.Iterate(func(id oid.ID) bool {
// Check if a key is a subpath (so that deleting "/a" does not delete "/ab").
isSubPath, err := d.checkIsSubPath(ctx, path, id)
if err != nil {
inErr = err
return true
}
if isSubPath {
if err = d.delete(ctx, id); err != nil {
inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err)
return true
}
}
return false
})
if err == nil {
err = inErr
}
rootTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
if err != nil {
return fmt.Errorf("iterate objects: %w", err)
if !errors.Is(err, tree.ErrNodeNotFound) {
return fmt.Errorf("couldn't get object from tree: %w", err)
}
} else {
treeObjList = append(treeObjList, *rootTreeObj)
}
for _, treeObj := range treeObjList {
if err := d.delete(ctx, treeObj.ObjID); err != nil {
return fmt.Errorf("error in delete object oid: %s : %w", treeObj.ObjID, err)
}
if err = d.treeService.DeleteObject(ctx, d.containerID, treeObj.ID); err != nil {
return fmt.Errorf("couldn't delete object from tree: %w", err)
}
}
return nil
}
func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) {
var prmHead pool.PrmObjectHead
prmHead.SetAddress(d.objectAddress(id))
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err)
}
fileInf := newFileInfo(ctx, obj, "")
return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil
}
func (d *driver) delete(ctx context.Context, id oid.ID) error {
var prm pool.PrmObjectDelete
prm.SetAddress(d.objectAddress(id))
@ -770,166 +765,47 @@ func (d *driver) Walk(ctx context.Context, path string, fn storagedriver.WalkFn)
return storagedriver.WalkFallback(ctx, d, path, fn)
}
func (d *driver) searchByPrefix(ctx context.Context, prefix string) ([]oid.ID, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(attributeFilePath, prefix, object.MatchCommonPrefix)
return d.baseSearch(ctx, filters)
}
func (d *driver) headSplitParts(ctx context.Context, splitID *object.SplitID, path string, isAppend bool) ([]*object.Object, map[oid.ID]struct{}, error) {
filters := object.NewSearchFilters()
filters.AddPhyFilter()
filters.AddSplitIDFilter(object.MatchStringEqual, splitID)
var prm pool.PrmObjectSearch
prm.SetContainerID(d.containerID)
prm.SetFilters(filters)
res, err := d.sdkPool.SearchObjects(ctx, prm)
ids, err := d.treeService.GetListOIDBySplitID(ctx, d.containerID, path, splitID)
if err != nil {
return nil, nil, fmt.Errorf("init searching using client: %w", err)
return nil, nil, fmt.Errorf("couldn't get object from tree by split id '%s': %w", path, err)
}
defer res.Close()
var (
addr oid.Address
prmHead pool.PrmObjectHead
objects []*object.Object
)
var addr oid.Address
addr.SetContainer(d.containerID)
var inErr error
var prmHead pool.PrmObjectHead
var objects []*object.Object
noChild := make(map[oid.ID]struct{})
err = res.Iterate(func(id oid.ID) bool {
addr.SetObject(id)
for _, objID := range ids {
addr.SetObject(objID)
prmHead.SetAddress(addr)
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
inErr = fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, id, splitID, err)
return true
return nil, nil, fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, objID, splitID, err)
}
if isAppend {
objects = append(objects, &obj)
noChild[id] = struct{}{}
return false
noChild[objID] = struct{}{}
continue
}
inErr = fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID)
return true
})
if err == nil {
err = inErr
}
if err != nil {
return nil, nil, fmt.Errorf("iterate objects: %w", err)
return nil, nil, fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID)
}
return objects, noChild, nil
}
func (d *driver) baseSearch(ctx context.Context, filters object.SearchFilters) ([]oid.ID, error) {
var prm pool.PrmObjectSearch
prm.SetContainerID(d.containerID)
prm.SetFilters(filters)
res, err := d.sdkPool.SearchObjects(ctx, prm)
if err != nil {
return nil, fmt.Errorf("init searching using client: %w", err)
}
defer res.Close()
var buf []oid.ID
err = res.Iterate(func(id oid.ID) bool {
buf = append(buf, id)
return false
})
if err != nil {
return nil, fmt.Errorf("iterate objects: %w", err)
}
return buf, nil
}
func (d *driver) searchOne(ctx context.Context, path string) (oid.ID, error) {
id, ok, err := d.searchOneBase(ctx, path, false)
if err != nil {
return oid.ID{}, err
}
if !ok {
return oid.ID{}, fmt.Errorf("found more than one object by path '%s'", path)
}
return id, nil
}
func (d *driver) searchOneBase(ctx context.Context, path string, byPrefix bool) (resID oid.ID, ok bool, err error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
if byPrefix {
filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix)
} else {
filters.AddFilter(attributeFilePath, path, object.MatchStringEqual)
}
var prm pool.PrmObjectSearch
prm.SetContainerID(d.containerID)
prm.SetFilters(filters)
res, err := d.sdkPool.SearchObjects(ctx, prm)
if err != nil {
return oid.ID{}, false, fmt.Errorf("init searching using client: %w", err)
}
defer res.Close()
var found bool
err = res.Iterate(func(id oid.ID) bool {
if found {
ok = false
return true
}
found = true
resID = id
ok = true
return false
})
if err != nil {
return oid.ID{}, false, fmt.Errorf("iterate objects by path '%s': %w", path, err)
}
if !found {
return oid.ID{}, false, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
return resID, ok, nil
}
func newFileInfo(ctx context.Context, obj object.Object, prefix string) storagedriver.FileInfo {
func newFileInfo(treeObj tree.TreeNode, prefix string) storagedriver.FileInfo {
fileInfoFields := storagedriver.FileInfoFields{
Size: int64(obj.PayloadSize()),
}
for _, attr := range obj.Attributes() {
switch attr.Key() {
case attributeFilePath:
fileInfoFields.Path = attr.Value()
case object.AttributeTimestamp:
timestamp, err := strconv.ParseInt(attr.Value(), 10, 64)
if err != nil {
objID, _ := obj.ID()
dcontext.GetLogger(ctx).Warnf("object '%s' has invalid timestamp '%s'", objID.EncodeToString(), attr.Value())
continue
}
fileInfoFields.ModTime = time.Unix(timestamp, 0)
}
Path: treeObj.FullPath,
Size: int64(treeObj.PayloadSize),
ModTime: treeObj.ModificationTime,
}
if len(prefix) > 0 {
@ -956,18 +832,39 @@ func newFileInfoDir(path string) storagedriver.FileInfo {
}
}
func (d *driver) newObjTarget() transformer.ChunkedObjectWriter {
func validatePrefixPath(path string) bool {
return prefixPathValidateRegex.MatchString(path)
}
func validateFilePath(path string) bool {
return filePathValidateRegex.MatchString(path)
}
func preparePrefixPath(path string) string {
if strings.HasSuffix(path, "/") {
return path[:len(path)-1]
}
return path
}
func (d *driver) newObjTarget(path string, splitID *object.SplitID) transformer.ChunkedObjectWriter {
return &objTarget{
sdkPool: d.sdkPool,
key: d.key,
sdkPool: d.sdkPool,
treeService: d.treeService,
splitID: splitID,
key: d.key,
path: path,
}
}
type objTarget struct {
sdkPool *pool.Pool
key *ecdsa.PrivateKey
obj *object.Object
chunks [][]byte
sdkPool *pool.Pool
treeService *tree.Tree
path string
splitID *object.SplitID
key *ecdsa.PrivateKey
obj *object.Object
chunks [][]byte
}
func (t *objTarget) WriteHeader(_ context.Context, obj *object.Object) error {
@ -999,11 +896,14 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
t.obj.SetCreationEpoch(currEpoch)
var (
parID oid.ID
parHdr *object.Object
parID oid.ID
parHdr *object.Object
isCommit bool
sizeFinalObject uint64
)
if par := t.obj.Parent(); par != nil && par.Signature() == nil {
isCommit = true
objPar := object.NewFromV2(par.ToV2())
objPar.SetCreationEpoch(currEpoch)
@ -1013,6 +913,7 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
}
parID, _ = objPar.ID()
sizeFinalObject = objPar.PayloadSize()
t.obj.SetParent(objPar)
}
@ -1029,12 +930,28 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers,
var prm pool.PrmObjectPut
prm.SetHeader(*t.obj)
_, err = t.sdkPool.PutObject(ctx, prm)
id, err := t.sdkPool.PutObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("couldn't put part: %w", err)
}
containerID, ok := t.obj.ContainerID()
if !ok {
return nil, fmt.Errorf("error in get container id from object")
}
if isCommit {
_, err := t.treeService.AddObject(ctx, containerID, t.path, parID, sizeFinalObject)
if err != nil {
return nil, fmt.Errorf("can't add object to tree: %w", err)
}
}
_, err = t.treeService.AddPHYObject(ctx, containerID, t.path, id, t.splitID)
if err != nil {
return nil, fmt.Errorf("can't add phy object to tree: %w", err)
}
objID, _ := t.obj.ID()
return &transformer.AccessIdentifiers{

View file

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
"strings"
"testing"
"time"
@ -75,8 +76,7 @@ func TestIntegration(t *testing.T) {
rootCtx := context.Background()
aioImage := "truecloudlab/frostfs-aio:"
versions := []string{
"1.2.7",
"1.2.8",
"1.3.0",
}
for _, aioVersion := range versions {
@ -214,6 +214,7 @@ func testWriteRead(rootCtx context.Context, t *testing.T, drvr storagedriver.Sto
func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageDriver, version string) {
ctx, path := formCtxAndPath(rootCtx, version)
path = "/list" + path
fileWriter, err := drvr.Writer(ctx, path, false)
require.NoError(t, err)
@ -229,7 +230,7 @@ func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageD
err = fileWriter.Commit()
require.NoError(t, err)
res, err := drvr.List(ctx, path)
res, err := drvr.List(ctx, getParentDirPath(path))
require.NoError(t, err)
require.Len(t, res, 1)
require.Contains(t, res, path)
@ -340,3 +341,12 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
return cnrID
}
func getParentDirPath(path string) string {
lastSlashIndex := strings.LastIndex(path, "/")
if lastSlashIndex != -1 {
path = path[:lastSlashIndex]
}
return path
}

View file

@ -0,0 +1,410 @@
package tree
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type (
Tree struct {
service ServiceClient
log *zap.Logger
}
// ServiceClient is a client to interact with tree service.
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error)
AddNodeByPath(ctx context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error)
RemoveNode(ctx context.Context, containerID cid.ID, treeID string, nodeID uint64) error
}
SubTreeStream interface {
Next() (NodeResponse, error)
}
TreeNode struct {
ID uint64
ParentID uint64
ObjID oid.ID
TimeStamp uint64
ModificationTime time.Time
PayloadSize uint64
FullPath string
Meta map[string]string
}
GetNodesParams struct {
ContainerID cid.ID
TreeID string
Path []string
Meta []string
LatestOnly bool
AllAttrs bool
}
)
type Meta interface {
GetKey() string
GetValue() []byte
}
type NodeResponse interface {
GetMeta() []Meta
GetNodeID() uint64
GetParentID() uint64
GetTimestamp() uint64
}
const (
FileNameKey = "FileName"
oidKV = "OID"
splitIDKV = "SplitID"
payloadSize = "PayloadSize"
fullPath = "FullPath"
modificationTime = "ModificationTime"
objectTree = "distribution-object"
splitTree = "distribution-split"
separator = "/"
maxDepth = 0
)
var (
// ErrNodeNotFound is returned from ServiceClient in case of not found error.
ErrNodeNotFound = errors.New("not found")
ErrOnlyDirFound = errors.New("only directory found")
// ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error.
ErrNodeAccessDenied = errors.New("access denied")
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
ErrGatewayTimeout = errors.New("gateway timeout")
)
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{
service: service,
log: log,
}
}
func (c *Tree) GetObjectByPath(ctx context.Context, containerID cid.ID, path string) (*TreeNode, error) {
newPath := pathFromName(path)
p := &GetNodesParams{
ContainerID: containerID,
TreeID: objectTree,
Path: newPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
var result NodeResponse
switch {
case len(nodes) == 0 || len(nodes) == 1 && isIntermediate(nodes[0]):
return nil, ErrNodeNotFound
case len(nodes) > 2:
return nil, fmt.Errorf("found more than two nodes")
default:
result = nodes[0]
if len(nodes) == 2 && isIntermediate(result) {
result = nodes[1]
}
}
treeNode, err := newTreeNode(result)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
}
return treeNode, nil
}
func (c *Tree) GetObjectByPathDir(ctx context.Context, containerID cid.ID, path string) (*TreeNode, error) {
newPath := pathFromName(path)
p := &GetNodesParams{
ContainerID: containerID,
TreeID: objectTree,
Path: newPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
var result NodeResponse
switch {
case len(nodes) == 0:
return nil, ErrNodeNotFound
case len(nodes) == 1 && isIntermediate(nodes[0]):
return nil, ErrOnlyDirFound
case len(nodes) > 2:
return nil, fmt.Errorf("found more than two nodes")
default:
result = nodes[0]
if len(nodes) == 2 && isIntermediate(result) {
result = nodes[1]
}
}
treeNode, err := newTreeNode(result)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
}
return treeNode, nil
}
func (c *Tree) GetListObjectByPrefix(ctx context.Context, containerID cid.ID, prefixPath string) ([]TreeNode, error) {
rootID, err := c.determinePrefixNode(ctx, containerID, objectTree, prefixPath)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
subTree, err := c.service.GetSubTree(ctx, containerID, objectTree, rootID, maxDepth)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
nodes := make([]NodeResponse, 0)
for _, node := range subTree {
if node.GetNodeID() == rootID {
continue
}
if !isIntermediate(node) {
nodes = append(nodes, node)
}
}
result := make([]TreeNode, 0, len(nodes))
for _, v := range nodes {
node, err := newTreeNode(v)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
}
result = append(result, *node)
}
return result, nil
}
func (c *Tree) GetListOIDBySplitID(ctx context.Context, containerID cid.ID, path string, splitID *object.SplitID) ([]oid.ID, error) {
nodes, err := c.getNodesBySplitID(ctx, containerID, path, splitID)
if err != nil {
return nil, err
}
ids := make([]oid.ID, len(nodes))
for i, node := range nodes {
ids[i] = node.ObjID
}
return ids, nil
}
func (c *Tree) AddObject(ctx context.Context, containerID cid.ID, filePath string, oid oid.ID, size uint64) (uint64, error) {
path := pathFromName(filePath)
meta := map[string]string{
oidKV: oid.String(),
FileNameKey: path[len(path)-1],
payloadSize: strconv.FormatUint(size, 10),
fullPath: filePath,
modificationTime: strconv.FormatInt(time.Now().Unix(), 10),
}
nodeID, err := c.service.AddNodeByPath(ctx, containerID, objectTree, path[:len(path)-1], meta)
return nodeID, err
}
func (c *Tree) AddPHYObject(ctx context.Context, containerID cid.ID, filePath string, oid oid.ID, splitID *object.SplitID) (uint64, error) {
meta := map[string]string{
oidKV: oid.String(),
splitIDKV: splitID.String(),
}
nodeID, err := c.service.AddNodeByPath(ctx, containerID, splitTree, pathFromName(filePath), meta)
return nodeID, err
}
func (c *Tree) DeleteObject(ctx context.Context, containerID cid.ID, nodeID uint64) error {
return c.service.RemoveNode(ctx, containerID, objectTree, nodeID)
}
func (c *Tree) DeleteObjectsBySplitID(ctx context.Context, containerID cid.ID, path string, splitID *object.SplitID) error {
splitNodes, err := c.getNodesBySplitID(ctx, containerID, path, splitID)
if err != nil {
return err
}
for _, node := range splitNodes {
err = c.service.RemoveNode(ctx, containerID, splitTree, node.ID)
if err != nil {
return err
}
}
return nil
}
func (c *Tree) determinePrefixNode(ctx context.Context, containerID cid.ID, treeID, prefixPath string) (uint64, error) {
var rootID uint64
path := pathFromName(prefixPath)
if len(path) > 1 {
var err error
rootID, err = c.getPrefixNodeID(ctx, containerID, treeID, path)
if err != nil {
return 0, err
}
}
return rootID, nil
}
func (c *Tree) getPrefixNodeID(ctx context.Context, containerID cid.ID, treeID string, prefixPath []string) (uint64, error) {
p := &GetNodesParams{
ContainerID: containerID,
TreeID: treeID,
Path: prefixPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return 0, err
}
var intermediateNodes []uint64
for _, node := range nodes {
if isIntermediate(node) {
intermediateNodes = append(intermediateNodes, node.GetNodeID())
}
}
if len(intermediateNodes) == 0 {
return 0, ErrNodeNotFound
}
if len(intermediateNodes) > 1 {
return 0, fmt.Errorf("found more than one intermediate nodes")
}
return intermediateNodes[0], nil
}
func (c *Tree) getNodesBySplitID(ctx context.Context, containerID cid.ID, filePath string, splitID *object.SplitID) ([]TreeNode, error) {
rootID, err := c.determinePrefixNode(ctx, containerID, splitTree, filePath)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
subTree, err := c.service.GetSubTree(ctx, containerID, splitTree, rootID, 2)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
nodes := make([]TreeNode, 0, len(subTree))
for _, v := range subTree {
node, err := newTreeNode(v)
if err != nil {
return nil, fmt.Errorf("error parse tree node: %w", err)
}
nodes = append(nodes, *node)
}
result := make([]TreeNode, 0)
for _, node := range nodes {
if val, ok := node.Meta[splitIDKV]; ok {
if val == splitID.String() {
result = append(result, node)
}
}
}
return result, nil
}
func isIntermediate(node NodeResponse) bool {
if len(node.GetMeta()) != 1 {
return false
}
return node.GetMeta()[0].GetKey() == FileNameKey
}
func newTreeNode(nodeInfo NodeResponse) (*TreeNode, error) {
treeNode := &TreeNode{
ID: nodeInfo.GetNodeID(),
ParentID: nodeInfo.GetParentID(),
TimeStamp: nodeInfo.GetTimestamp(),
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
}
for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() {
case oidKV:
if err := treeNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
return nil, err
}
case payloadSize:
if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
var err error
if treeNode.PayloadSize, err = strconv.ParseUint(sizeStr, 10, 64); err != nil {
return nil, fmt.Errorf("invalid payload size value '%s': %w", sizeStr, err)
}
}
case modificationTime:
timestamp, err := strconv.ParseInt(string(kv.GetValue()), 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid modification time value '%s': %w", string(kv.GetValue()), err)
}
treeNode.ModificationTime = time.Unix(timestamp, 0)
case fullPath:
treeNode.FullPath = string(kv.GetValue())
default:
treeNode.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
return treeNode, nil
}
func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
}

View file

@ -0,0 +1,178 @@
package wrapper
import (
"context"
"errors"
"fmt"
"io"
"strings"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type GetNodeByPathResponseInfoWrapper struct {
response *grpcService.GetNodeByPathResponse_Info
}
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
}
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
return n.response.GetParentId()
}
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
}
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
}
return res
}
type GetSubTreeResponseBodyWrapper struct {
response *grpcService.GetSubTreeResponse_Body
}
func (n GetSubTreeResponseBodyWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
}
func (n GetSubTreeResponseBodyWrapper) GetParentID() uint64 {
return n.response.GetParentId()
}
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
}
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
}
return res
}
type PoolWrapper struct {
p *treepool.Pool
}
func NewPoolWrapper(p *treepool.Pool) *PoolWrapper {
return &PoolWrapper{p: p}
}
func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([]tree.NodeResponse, error) {
poolPrm := treepool.GetNodesParams{
CID: prm.ContainerID,
TreeID: prm.TreeID,
Path: prm.Path,
Meta: prm.Meta,
PathAttribute: tree.FileNameKey,
LatestOnly: prm.LatestOnly,
AllAttrs: prm.AllAttrs,
}
nodes, err := w.p.GetNodes(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
}
res := make([]tree.NodeResponse, len(nodes))
for i, info := range nodes {
res[i] = GetNodeByPathResponseInfoWrapper{info}
}
return res, nil
}
func (w *PoolWrapper) GetSubTree(ctx context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) {
poolPrm := treepool.GetSubTreeParams{
CID: containerID,
TreeID: treeID,
RootID: rootID,
Depth: depth,
}
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
}
var subtree []tree.NodeResponse
node, err := subTreeReader.Next()
for err == nil {
subtree = append(subtree, GetSubTreeResponseBodyWrapper{node})
node, err = subTreeReader.Next()
}
if err != nil && err != io.EOF {
return nil, handleError(err)
}
return subtree, nil
}
func (w *PoolWrapper) AddNodeByPath(ctx context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error) {
nodeID, err := w.p.AddNodeByPath(ctx, treepool.AddNodeByPathParams{
CID: containerID,
TreeID: treeID,
Path: path,
Meta: meta,
PathAttribute: tree.FileNameKey,
})
return nodeID, handleError(err)
}
func (w *PoolWrapper) RemoveNode(ctx context.Context, containerID cid.ID, treeID string, nodeID uint64) error {
return handleError(w.p.RemoveNode(ctx, treepool.RemoveNodeParams{
CID: containerID,
TreeID: treeID,
NodeID: nodeID,
}))
}
func handleError(err error) error {
if err == nil {
return nil
}
if errors.Is(err, treepool.ErrNodeNotFound) {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
}
if errors.Is(err, treepool.ErrNodeAccessDenied) {
return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error())
}
if isTimeoutError(err) {
return fmt.Errorf("%w: %s", tree.ErrGatewayTimeout, err.Error())
}
return err
}
func unwrapErr(err error) error {
unwrappedErr := errors.Unwrap(err)
for unwrappedErr != nil {
err = unwrappedErr
unwrappedErr = errors.Unwrap(err)
}
return err
}
func isTimeoutError(err error) bool {
if strings.Contains(err.Error(), "timeout") ||
errors.Is(err, context.DeadlineExceeded) {
return true
}
return status.Code(unwrapErr(err)) == codes.DeadlineExceeded
}