From ca2da5e23de0852dfac3ac0d1a6e9078d3f17b05 Mon Sep 17 00:00:00 2001 From: Roman Loginov Date: Wed, 27 Mar 2024 13:46:05 +0300 Subject: [PATCH] [#3] Integration with the tree service Signed-off-by: Roman Loginov --- registry/storage/driver/frostfs/fileWriter.go | 12 +- registry/storage/driver/frostfs/frostfs.go | 493 ++++++++---------- .../storage/driver/frostfs/frostfs_test.go | 16 +- registry/storage/driver/frostfs/tree/tree.go | 410 +++++++++++++++ .../driver/frostfs/wrapper/pool_wrapper.go | 178 +++++++ 5 files changed, 817 insertions(+), 292 deletions(-) create mode 100644 registry/storage/driver/frostfs/tree/tree.go create mode 100644 registry/storage/driver/frostfs/wrapper/pool_wrapper.go diff --git a/registry/storage/driver/frostfs/fileWriter.go b/registry/storage/driver/frostfs/fileWriter.go index 036ac7c3..062669a0 100644 --- a/registry/storage/driver/frostfs/fileWriter.go +++ b/registry/storage/driver/frostfs/fileWriter.go @@ -83,7 +83,7 @@ func newSizeLimiterWriter(ctx context.Context, d *driver, path string, splitInfo previous: formPreviousChain(splitInfo, parts), parentHashers: parentHashers, targetInit: func() transformer.ChunkedObjectWriter { - return d.newObjTarget() + return d.newObjTarget(path, splitInfo.SplitID()) }, parent: parent, } @@ -233,6 +233,11 @@ func (w *writer) release(withParent bool) (*transformer.AccessIdentifiers, error if _, err = w.release(false); err != nil { return nil, fmt.Errorf("could not release linking object: %w", err) } + + err = w.driver.treeService.DeleteObjectsBySplitID(w.ctx, w.driver.containerID, w.path, w.splitInfo.SplitID()) + if err != nil { + return nil, fmt.Errorf("could not delete split objects: %w", err) + } } return ids, nil @@ -459,5 +464,10 @@ func (w *writer) deleteParts() error { } } + err := w.driver.treeService.DeleteObjectsBySplitID(w.ctx, w.driver.containerID, w.path, w.splitInfo.SplitID()) + if err != nil { + return fmt.Errorf("could not delete split objects: %w", err) + } + return nil } diff --git a/registry/storage/driver/frostfs/frostfs.go b/registry/storage/driver/frostfs/frostfs.go index 690948ce..70a04705 100644 --- a/registry/storage/driver/frostfs/frostfs.go +++ b/registry/storage/driver/frostfs/frostfs.go @@ -3,10 +3,12 @@ package frostfs import ( "context" "crypto/ecdsa" + "errors" "fmt" "io" "net/http" "path/filepath" + "regexp" "sort" "strconv" "strings" @@ -19,12 +21,15 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" + treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "github.com/distribution/distribution/v3/internal/dcontext" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/distribution/v3/registry/storage/driver/base" "github.com/distribution/distribution/v3/registry/storage/driver/factory" + "github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree" + "github.com/distribution/distribution/v3/registry/storage/driver/frostfs/wrapper" "github.com/nspcc-dev/neo-go/cli/flags" "github.com/nspcc-dev/neo-go/pkg/wallet" ) @@ -57,6 +62,11 @@ const ( defaultSessionExpirationDuration = 100 // in epoch ) +var ( + filePathValidateRegex = regexp.MustCompile(`^/([^/]+/)*[^/]+$`) + prefixPathValidateRegex = regexp.MustCompile(`^/([^/]+/?)*$`) +) + // DriverParameters is a struct that encapsulates all of the driver parameters after all values have been set. type DriverParameters struct { ContainerID string @@ -95,6 +105,7 @@ func (n *frostfsDriverFactory) Create(_ context.Context, parameters map[string]i type driver struct { sdkPool *pool.Pool + treeService *tree.Tree owner *user.ID key *ecdsa.PrivateKey containerID cid.ID @@ -307,7 +318,7 @@ func New(params DriverParameters) (*Driver, error) { var owner user.ID user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey) - sdkPool, err := createPool(ctx, acc, params) + sdkPool, treePool, err := createPool(ctx, acc, params) if err != nil { return nil, fmt.Errorf("couldn't create sdk pool: %w", err) } @@ -322,8 +333,11 @@ func New(params DriverParameters) (*Driver, error) { return nil, fmt.Errorf("couldn't get container id: %w", err) } + treeService := wrapper.NewPoolWrapper(treePool) + d := &driver{ sdkPool: sdkPool, + treeService: tree.NewTree(treeService, nil), owner: &owner, key: &acc.PrivateKey().PrivateKey, containerID: cnrID, @@ -377,28 +391,49 @@ func getContainerID(params DriverParameters) (cid.ID, error) { return cnrID, nil } -func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, error) { +func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, *treepool.Pool, error) { var prm pool.InitParameters + var prmTree treepool.InitParameters + prm.SetKey(&acc.PrivateKey().PrivateKey) + prmTree.SetKey(acc.PrivateKey()) + prm.SetNodeDialTimeout(param.ConnectionTimeout) + prmTree.SetNodeDialTimeout(param.ConnectionTimeout) + prm.SetHealthcheckTimeout(param.RequestTimeout) + prmTree.SetHealthcheckTimeout(param.RequestTimeout) + prm.SetClientRebalanceInterval(param.RebalanceInterval) + prmTree.SetClientRebalanceInterval(param.RebalanceInterval) + prm.SetSessionExpirationDuration(param.SessionExpirationDuration) for _, peer := range param.Peers { - prm.AddNode(pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight)) + nodeParam := pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight) + prm.AddNode(nodeParam) + prmTree.AddNode(nodeParam) } p, err := pool.NewPool(prm) if err != nil { - return nil, fmt.Errorf("create pool: %w", err) + return nil, nil, fmt.Errorf("create pool: %w", err) } if err = p.Dial(ctx); err != nil { - return nil, fmt.Errorf("dial pool: %w", err) + return nil, nil, fmt.Errorf("dial pool: %w", err) } - return p, nil + treePool, err := treepool.NewPool(prmTree) + if err != nil { + return nil, nil, fmt.Errorf("create tree pool: %w", err) + } + + if err = treePool.Dial(ctx); err != nil { + return nil, nil, fmt.Errorf("dial tree pool: %w", err) + } + + return p, treePool, nil } func createNnsResolver(params DriverParameters) (*resolver.NNS, error) { @@ -469,23 +504,34 @@ func (d *driver) Name() string { } func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { - id, err := d.searchOne(ctx, path) + if ok := validateFilePath(path); !ok { + return nil, storagedriver.InvalidPathError{Path: path} + } + + treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path) if err != nil { - return nil, err + if errors.Is(err, tree.ErrNodeNotFound) { + return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName} + } + return nil, fmt.Errorf("couldn't get object from tree: %w", err) } var prm pool.PrmObjectGet - prm.SetAddress(d.objectAddress(id)) + prm.SetAddress(d.objectAddress(treeObj.ObjID)) obj, err := d.sdkPool.GetObject(ctx, prm) if err != nil { - return nil, fmt.Errorf("couldn't get object '%s': %w", id, err) + return nil, fmt.Errorf("couldn't get object '%s': %w", treeObj.ObjID, err) } return io.ReadAll(obj.Payload) } func (d *driver) PutContent(ctx context.Context, path string, content []byte) error { + if ok := validateFilePath(path); !ok { + return storagedriver.InvalidPathError{Path: path} + } + if err := d.Delete(ctx, path); err != nil { return fmt.Errorf("couldn't delete '%s': %s", path, err) } @@ -496,34 +542,35 @@ func (d *driver) PutContent(ctx context.Context, path string, content []byte) er var prm pool.PrmObjectPut prm.SetHeader(*obj) - if _, err := d.sdkPool.PutObject(ctx, prm); err != nil { + id, err := d.sdkPool.PutObject(ctx, prm) + if err != nil { return fmt.Errorf("couldn't put object '%s': %w", path, err) } + _, err = d.treeService.AddObject(ctx, d.containerID, path, id, uint64(len(content))) + if err != nil { + return fmt.Errorf("can't add object to tree: %w", err) + } return nil } func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { - id, err := d.searchOne(ctx, path) + if ok := validateFilePath(path); !ok { + return nil, storagedriver.InvalidPathError{Path: path} + } + + treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path) if err != nil { - return nil, err + return nil, fmt.Errorf("couldn't get object from tree: %w", err) } - addr := d.objectAddress(id) + addr := d.objectAddress(treeObj.ObjID) - var prmHead pool.PrmObjectHead - prmHead.SetAddress(addr) - - obj, err := d.sdkPool.HeadObject(ctx, prmHead) - if err != nil { - return nil, fmt.Errorf("couldn't head object '%s', id '%s': %w", path, id, err) + if uint64(offset) >= treeObj.PayloadSize { + return nil, fmt.Errorf("invalid offset %d for object length %d", offset, treeObj.PayloadSize) } - if uint64(offset) >= obj.PayloadSize() { - return nil, fmt.Errorf("invalid offset %d for object length %d", offset, obj.PayloadSize()) - } - - length := obj.PayloadSize() - uint64(offset) + length := treeObj.PayloadSize - uint64(offset) var prmRange pool.PrmObjectRange prmRange.SetAddress(addr) @@ -533,7 +580,7 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read res, err := d.sdkPool.ObjectRange(ctx, prmRange) if err != nil { return nil, fmt.Errorf("couldn't get payload range of object '%s', offset %d, length %d, id '%s': %w", - path, offset, length, id, err) + path, offset, length, treeObj.ObjID, err) } return &res, nil @@ -544,6 +591,10 @@ func getUploadUUID(ctx context.Context) (uuid string) { } func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { + if ok := validateFilePath(path); !ok { + return nil, storagedriver.InvalidPathError{Path: path} + } + splitID := object.NewSplitID() uploadUUID := getUploadUUID(ctx) if err := splitID.Parse(uploadUUID); err != nil { @@ -592,56 +643,42 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, return newFileInfoDir(path), nil } - id, ok, err := d.searchOneBase(ctx, path, true) + if ok := validatePrefixPath(path); !ok { + return nil, storagedriver.InvalidPathError{Path: path} + } + path = preparePrefixPath(path) + + treeObj, err := d.treeService.GetObjectByPathDir(ctx, d.containerID, path) if err != nil { - return nil, err + if errors.Is(err, tree.ErrNodeNotFound) { + return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName} + } else if errors.Is(err, tree.ErrOnlyDirFound) { + return newFileInfoDir(path), nil + } + return nil, fmt.Errorf("couldn't get object from tree: %w", err) } - // assume there is not object with directory name - // e.g. if file '/a/b/c' exists, files '/a/b' and '/a' don't - if !ok { - return newFileInfoDir(path), nil - } - - var prm pool.PrmObjectHead - prm.SetAddress(d.objectAddress(id)) - - obj, err := d.sdkPool.HeadObject(ctx, prm) - if err != nil { - return nil, fmt.Errorf("couldn't get head object '%s': %w", id, err) - } - - fileInfo := newFileInfo(ctx, obj, "") - - // e.g. search '/a/b' but because of prefix search we found '/a/b/c' - // so we should return directory - if fileInfo.Path() != path { - return newFileInfoDir(path), nil - } + fileInfo := newFileInfo(*treeObj, "") return fileInfo, nil } func (d *driver) List(ctx context.Context, path string) ([]string, error) { - ids, err := d.searchByPrefix(ctx, path) + if ok := validatePrefixPath(path); !ok { + return nil, storagedriver.InvalidPathError{Path: path} + } + path = preparePrefixPath(path) + + treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path) if err != nil { - return nil, fmt.Errorf("couldn't search by prefix '%s': %w", path, err) + return nil, fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err) } added := make(map[string]bool) - result := make([]string, 0, len(ids)) - for _, id := range ids { - var prm pool.PrmObjectHead - prm.SetAddress(d.objectAddress(id)) - - obj, err := d.sdkPool.HeadObject(ctx, prm) - if err != nil { - dcontext.GetLogger(ctx).Warnf("couldn't get list object '%s' in path '%s': %s", id, path, err) - continue - } - - fileInf := newFileInfo(ctx, obj, path) + result := make([]string, 0, len(treeObjList)) + for _, treeObj := range treeObjList { + fileInf := newFileInfo(treeObj, path) if !added[fileInf.Path()] { result = append(result, fileInf.Path()) added[fileInf.Path()] = true @@ -653,106 +690,64 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) { } func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { - sourceID, err := d.searchOne(ctx, sourcePath) + if ok := validateFilePath(sourcePath); !ok { + return storagedriver.InvalidPathError{Path: sourcePath} + } + + if ok := validateFilePath(destPath); !ok { + return storagedriver.InvalidPathError{Path: destPath} + } + + sourceTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, sourcePath) if err != nil { - return err + return fmt.Errorf("couldn't get object from tree: %w", err) } - if err = d.Delete(ctx, destPath); err != nil { - return fmt.Errorf("couldn't delete '%s' object: %w", destPath, err) - } - - sourceAddr := d.objectAddress(sourceID) - - var prmGet pool.PrmObjectGet - prmGet.SetAddress(sourceAddr) - - obj, err := d.sdkPool.GetObject(ctx, prmGet) + _, err = d.treeService.AddObject(ctx, d.containerID, destPath, sourceTreeObj.ObjID, sourceTreeObj.PayloadSize) if err != nil { - return fmt.Errorf("could not get source object '%s' by oid '%s': %w", sourcePath, sourceID, err) - } - defer func() { - if err = obj.Payload.Close(); err != nil { - dcontext.GetLogger(ctx).Errorf("couldn't close object payload reader, path '%s' by oid '%s': %s", - sourcePath, sourceID, err.Error()) - } - }() - - objHeader := d.formObject(destPath) - var prmPut pool.PrmObjectPut - prmPut.SetHeader(*objHeader) - prmPut.SetPayload(obj.Payload) - - if _, err = d.sdkPool.PutObject(ctx, prmPut); err != nil { - return fmt.Errorf("couldn't put object '%s': %w", destPath, err) + return fmt.Errorf("can't add object to tree: %w", err) } - var prmDelete pool.PrmObjectDelete - prmDelete.SetAddress(sourceAddr) - - if err = d.sdkPool.DeleteObject(ctx, prmDelete); err != nil { - return fmt.Errorf("couldn't remove source file '%s', id '%s': %w", sourcePath, sourceID, err) + if err = d.treeService.DeleteObject(ctx, d.containerID, sourceTreeObj.ID); err != nil { + return fmt.Errorf("couldn't delete object from tree: %w", err) } return nil } func (d *driver) Delete(ctx context.Context, path string) error { - filters := object.NewSearchFilters() - filters.AddRootFilter() - filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix) + if ok := validatePrefixPath(path); !ok { + return storagedriver.InvalidPathError{Path: path} + } + path = preparePrefixPath(path) - var prmSearch pool.PrmObjectSearch - prmSearch.SetContainerID(d.containerID) - prmSearch.SetFilters(filters) - - res, err := d.sdkPool.SearchObjects(ctx, prmSearch) + treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path) if err != nil { - return fmt.Errorf("init searching using client: %w", err) + return fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err) } - defer res.Close() - - var inErr error - err = res.Iterate(func(id oid.ID) bool { - // Check if a key is a subpath (so that deleting "/a" does not delete "/ab"). - isSubPath, err := d.checkIsSubPath(ctx, path, id) - if err != nil { - inErr = err - return true - } - - if isSubPath { - if err = d.delete(ctx, id); err != nil { - inErr = fmt.Errorf("couldn't delete object by path '%s': %w", path, err) - return true - } - } - return false - }) - if err == nil { - err = inErr - } + rootTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path) if err != nil { - return fmt.Errorf("iterate objects: %w", err) + if !errors.Is(err, tree.ErrNodeNotFound) { + return fmt.Errorf("couldn't get object from tree: %w", err) + } + } else { + treeObjList = append(treeObjList, *rootTreeObj) + } + + for _, treeObj := range treeObjList { + if err := d.delete(ctx, treeObj.ObjID); err != nil { + return fmt.Errorf("error in delete object oid: %s : %w", treeObj.ObjID, err) + } + + if err = d.treeService.DeleteObject(ctx, d.containerID, treeObj.ID); err != nil { + return fmt.Errorf("couldn't delete object from tree: %w", err) + } } return nil } -func (d *driver) checkIsSubPath(ctx context.Context, path string, id oid.ID) (bool, error) { - var prmHead pool.PrmObjectHead - prmHead.SetAddress(d.objectAddress(id)) - - obj, err := d.sdkPool.HeadObject(ctx, prmHead) - if err != nil { - return false, fmt.Errorf("couldn't head object part '%s', id '%s': %w", path, id, err) - } - - fileInf := newFileInfo(ctx, obj, "") - return fileInf.Path() <= path || fileInf.Path()[len(path)] == '/', nil -} - func (d *driver) delete(ctx context.Context, id oid.ID) error { var prm pool.PrmObjectDelete prm.SetAddress(d.objectAddress(id)) @@ -771,166 +766,47 @@ func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn, return storagedriver.WalkFallback(ctx, d, path, f, options...) } -func (d *driver) searchByPrefix(ctx context.Context, prefix string) ([]oid.ID, error) { - filters := object.NewSearchFilters() - filters.AddRootFilter() - filters.AddFilter(attributeFilePath, prefix, object.MatchCommonPrefix) - - return d.baseSearch(ctx, filters) -} - func (d *driver) headSplitParts(ctx context.Context, splitID *object.SplitID, path string, isAppend bool) ([]*object.Object, map[oid.ID]struct{}, error) { - filters := object.NewSearchFilters() - filters.AddPhyFilter() - filters.AddSplitIDFilter(object.MatchStringEqual, splitID) - - var prm pool.PrmObjectSearch - prm.SetContainerID(d.containerID) - prm.SetFilters(filters) - - res, err := d.sdkPool.SearchObjects(ctx, prm) + ids, err := d.treeService.GetListOIDBySplitID(ctx, d.containerID, path, splitID) if err != nil { - return nil, nil, fmt.Errorf("init searching using client: %w", err) + return nil, nil, fmt.Errorf("couldn't get object from tree by split id '%s': %w", path, err) } - defer res.Close() + var ( + addr oid.Address + prmHead pool.PrmObjectHead + objects []*object.Object + ) - var addr oid.Address addr.SetContainer(d.containerID) - - var inErr error - var prmHead pool.PrmObjectHead - - var objects []*object.Object noChild := make(map[oid.ID]struct{}) - err = res.Iterate(func(id oid.ID) bool { - addr.SetObject(id) + for _, objID := range ids { + addr.SetObject(objID) prmHead.SetAddress(addr) obj, err := d.sdkPool.HeadObject(ctx, prmHead) if err != nil { - inErr = fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, id, splitID, err) - return true + return nil, nil, fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, objID, splitID, err) } if isAppend { objects = append(objects, &obj) - noChild[id] = struct{}{} - return false + noChild[objID] = struct{}{} + continue } - inErr = fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID) - return true - }) - if err == nil { - err = inErr - } - if err != nil { - return nil, nil, fmt.Errorf("iterate objects: %w", err) + return nil, nil, fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID) } return objects, noChild, nil } -func (d *driver) baseSearch(ctx context.Context, filters object.SearchFilters) ([]oid.ID, error) { - var prm pool.PrmObjectSearch - prm.SetContainerID(d.containerID) - prm.SetFilters(filters) - - res, err := d.sdkPool.SearchObjects(ctx, prm) - if err != nil { - return nil, fmt.Errorf("init searching using client: %w", err) - } - - defer res.Close() - - var buf []oid.ID - - err = res.Iterate(func(id oid.ID) bool { - buf = append(buf, id) - return false - }) - if err != nil { - return nil, fmt.Errorf("iterate objects: %w", err) - } - - return buf, nil -} - -func (d *driver) searchOne(ctx context.Context, path string) (oid.ID, error) { - id, ok, err := d.searchOneBase(ctx, path, false) - if err != nil { - return oid.ID{}, err - } - - if !ok { - return oid.ID{}, fmt.Errorf("found more than one object by path '%s'", path) - } - - return id, nil -} - -func (d *driver) searchOneBase(ctx context.Context, path string, byPrefix bool) (resID oid.ID, ok bool, err error) { - filters := object.NewSearchFilters() - filters.AddRootFilter() - if byPrefix { - filters.AddFilter(attributeFilePath, path, object.MatchCommonPrefix) - } else { - filters.AddFilter(attributeFilePath, path, object.MatchStringEqual) - } - - var prm pool.PrmObjectSearch - prm.SetContainerID(d.containerID) - prm.SetFilters(filters) - - res, err := d.sdkPool.SearchObjects(ctx, prm) - if err != nil { - return oid.ID{}, false, fmt.Errorf("init searching using client: %w", err) - } - - defer res.Close() - - var found bool - err = res.Iterate(func(id oid.ID) bool { - if found { - ok = false - return true - } - found = true - resID = id - ok = true - return false - }) - if err != nil { - return oid.ID{}, false, fmt.Errorf("iterate objects by path '%s': %w", path, err) - } - - if !found { - return oid.ID{}, false, storagedriver.PathNotFoundError{Path: path, DriverName: driverName} - } - - return resID, ok, nil -} - -func newFileInfo(ctx context.Context, obj object.Object, prefix string) storagedriver.FileInfo { +func newFileInfo(treeObj tree.TreeNode, prefix string) storagedriver.FileInfo { fileInfoFields := storagedriver.FileInfoFields{ - Size: int64(obj.PayloadSize()), - } - - for _, attr := range obj.Attributes() { - switch attr.Key() { - case attributeFilePath: - fileInfoFields.Path = attr.Value() - case object.AttributeTimestamp: - timestamp, err := strconv.ParseInt(attr.Value(), 10, 64) - if err != nil { - objID, _ := obj.ID() - dcontext.GetLogger(ctx).Warnf("object '%s' has invalid timestamp '%s'", objID.EncodeToString(), attr.Value()) - continue - } - fileInfoFields.ModTime = time.Unix(timestamp, 0) - } + Path: treeObj.FullPath, + Size: int64(treeObj.PayloadSize), + ModTime: treeObj.ModificationTime, } if len(prefix) > 0 { @@ -957,18 +833,39 @@ func newFileInfoDir(path string) storagedriver.FileInfo { } } -func (d *driver) newObjTarget() transformer.ChunkedObjectWriter { +func validatePrefixPath(path string) bool { + return prefixPathValidateRegex.MatchString(path) +} + +func validateFilePath(path string) bool { + return filePathValidateRegex.MatchString(path) +} + +func preparePrefixPath(path string) string { + if strings.HasSuffix(path, "/") { + return path[:len(path)-1] + } + return path +} + +func (d *driver) newObjTarget(path string, splitID *object.SplitID) transformer.ChunkedObjectWriter { return &objTarget{ - sdkPool: d.sdkPool, - key: d.key, + sdkPool: d.sdkPool, + treeService: d.treeService, + splitID: splitID, + key: d.key, + path: path, } } type objTarget struct { - sdkPool *pool.Pool - key *ecdsa.PrivateKey - obj *object.Object - chunks [][]byte + sdkPool *pool.Pool + treeService *tree.Tree + path string + splitID *object.SplitID + key *ecdsa.PrivateKey + obj *object.Object + chunks [][]byte } func (t *objTarget) WriteHeader(_ context.Context, obj *object.Object) error { @@ -1000,11 +897,14 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, t.obj.SetCreationEpoch(currEpoch) var ( - parID oid.ID - parHdr *object.Object + parID oid.ID + parHdr *object.Object + isCommit bool + sizeFinalObject uint64 ) if par := t.obj.Parent(); par != nil && par.Signature() == nil { + isCommit = true objPar := object.NewFromV2(par.ToV2()) objPar.SetCreationEpoch(currEpoch) @@ -1014,6 +914,7 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, } parID, _ = objPar.ID() + sizeFinalObject = objPar.PayloadSize() t.obj.SetParent(objPar) } @@ -1030,12 +931,28 @@ func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, var prm pool.PrmObjectPut prm.SetHeader(*t.obj) - - _, err = t.sdkPool.PutObject(ctx, prm) + id, err := t.sdkPool.PutObject(ctx, prm) if err != nil { return nil, fmt.Errorf("couldn't put part: %w", err) } + containerID, ok := t.obj.ContainerID() + if !ok { + return nil, fmt.Errorf("error in get container id from object") + } + + if isCommit { + _, err := t.treeService.AddObject(ctx, containerID, t.path, parID, sizeFinalObject) + if err != nil { + return nil, fmt.Errorf("can't add object to tree: %w", err) + } + } + + _, err = t.treeService.AddPHYObject(ctx, containerID, t.path, id, t.splitID) + if err != nil { + return nil, fmt.Errorf("can't add phy object to tree: %w", err) + } + objID, _ := t.obj.ID() return &transformer.AccessIdentifiers{ diff --git a/registry/storage/driver/frostfs/frostfs_test.go b/registry/storage/driver/frostfs/frostfs_test.go index a11d15b1..25f2bd10 100644 --- a/registry/storage/driver/frostfs/frostfs_test.go +++ b/registry/storage/driver/frostfs/frostfs_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "strings" "testing" "time" @@ -75,8 +76,7 @@ func TestIntegration(t *testing.T) { rootCtx := context.Background() aioImage := "truecloudlab/frostfs-aio:" versions := []string{ - "1.2.7", - "1.2.8", + "1.3.0", } for _, aioVersion := range versions { @@ -214,6 +214,7 @@ func testWriteRead(rootCtx context.Context, t *testing.T, drvr storagedriver.Sto func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageDriver, version string) { ctx, path := formCtxAndPath(rootCtx, version) + path = "/list" + path fileWriter, err := drvr.Writer(ctx, path, false) require.NoError(t, err) @@ -229,7 +230,7 @@ func testList(rootCtx context.Context, t *testing.T, drvr storagedriver.StorageD err = fileWriter.Commit(ctx) require.NoError(t, err) - res, err := drvr.List(ctx, path) + res, err := drvr.List(ctx, getParentDirPath(path)) require.NoError(t, err) require.Len(t, res, 1) require.Contains(t, res, path) @@ -340,3 +341,12 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, o return cnrID } + +func getParentDirPath(path string) string { + lastSlashIndex := strings.LastIndex(path, "/") + if lastSlashIndex != -1 { + path = path[:lastSlashIndex] + } + + return path +} diff --git a/registry/storage/driver/frostfs/tree/tree.go b/registry/storage/driver/frostfs/tree/tree.go new file mode 100644 index 00000000..cbfcd6a0 --- /dev/null +++ b/registry/storage/driver/frostfs/tree/tree.go @@ -0,0 +1,410 @@ +package tree + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "time" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" +) + +type ( + Tree struct { + service ServiceClient + log *zap.Logger + } + + // ServiceClient is a client to interact with tree service. + // Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant. + ServiceClient interface { + GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error) + GetSubTree(ctx context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error) + AddNodeByPath(ctx context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error) + RemoveNode(ctx context.Context, containerID cid.ID, treeID string, nodeID uint64) error + } + + SubTreeStream interface { + Next() (NodeResponse, error) + } + + TreeNode struct { + ID uint64 + ParentID uint64 + ObjID oid.ID + TimeStamp uint64 + ModificationTime time.Time + PayloadSize uint64 + FullPath string + Meta map[string]string + } + + GetNodesParams struct { + ContainerID cid.ID + TreeID string + Path []string + Meta []string + LatestOnly bool + AllAttrs bool + } +) + +type Meta interface { + GetKey() string + GetValue() []byte +} + +type NodeResponse interface { + GetMeta() []Meta + GetNodeID() uint64 + GetParentID() uint64 + GetTimestamp() uint64 +} + +const ( + FileNameKey = "FileName" + oidKV = "OID" + splitIDKV = "SplitID" + payloadSize = "PayloadSize" + fullPath = "FullPath" + modificationTime = "ModificationTime" + + objectTree = "distribution-object" + splitTree = "distribution-split" + + separator = "/" + + maxDepth = 0 +) + +var ( + // ErrNodeNotFound is returned from ServiceClient in case of not found error. + ErrNodeNotFound = errors.New("not found") + + ErrOnlyDirFound = errors.New("only directory found") + + // ErrNodeAccessDenied is returned from ServiceClient service in case of access denied error. + ErrNodeAccessDenied = errors.New("access denied") + + // ErrGatewayTimeout is returned from ServiceClient service in case of timeout error. + ErrGatewayTimeout = errors.New("gateway timeout") +) + +func NewTree(service ServiceClient, log *zap.Logger) *Tree { + return &Tree{ + service: service, + log: log, + } +} + +func (c *Tree) GetObjectByPath(ctx context.Context, containerID cid.ID, path string) (*TreeNode, error) { + newPath := pathFromName(path) + p := &GetNodesParams{ + ContainerID: containerID, + TreeID: objectTree, + Path: newPath, + LatestOnly: false, + AllAttrs: true, + } + nodes, err := c.service.GetNodes(ctx, p) + if err != nil { + return nil, err + } + + var result NodeResponse + + switch { + case len(nodes) == 0 || len(nodes) == 1 && isIntermediate(nodes[0]): + return nil, ErrNodeNotFound + case len(nodes) > 2: + return nil, fmt.Errorf("found more than two nodes") + default: + result = nodes[0] + if len(nodes) == 2 && isIntermediate(result) { + result = nodes[1] + } + } + + treeNode, err := newTreeNode(result) + if err != nil { + return nil, fmt.Errorf("error parse tree node: %w", err) + } + + return treeNode, nil +} + +func (c *Tree) GetObjectByPathDir(ctx context.Context, containerID cid.ID, path string) (*TreeNode, error) { + newPath := pathFromName(path) + p := &GetNodesParams{ + ContainerID: containerID, + TreeID: objectTree, + Path: newPath, + LatestOnly: false, + AllAttrs: true, + } + nodes, err := c.service.GetNodes(ctx, p) + if err != nil { + return nil, err + } + + var result NodeResponse + + switch { + case len(nodes) == 0: + return nil, ErrNodeNotFound + case len(nodes) == 1 && isIntermediate(nodes[0]): + return nil, ErrOnlyDirFound + case len(nodes) > 2: + return nil, fmt.Errorf("found more than two nodes") + default: + result = nodes[0] + if len(nodes) == 2 && isIntermediate(result) { + result = nodes[1] + } + } + + treeNode, err := newTreeNode(result) + if err != nil { + return nil, fmt.Errorf("error parse tree node: %w", err) + } + + return treeNode, nil +} + +func (c *Tree) GetListObjectByPrefix(ctx context.Context, containerID cid.ID, prefixPath string) ([]TreeNode, error) { + rootID, err := c.determinePrefixNode(ctx, containerID, objectTree, prefixPath) + if err != nil { + if errors.Is(err, ErrNodeNotFound) { + return nil, nil + } + return nil, err + } + + subTree, err := c.service.GetSubTree(ctx, containerID, objectTree, rootID, maxDepth) + if err != nil { + if errors.Is(err, ErrNodeNotFound) { + return nil, nil + } + return nil, err + } + + nodes := make([]NodeResponse, 0) + + for _, node := range subTree { + if node.GetNodeID() == rootID { + continue + } + if !isIntermediate(node) { + nodes = append(nodes, node) + } + } + + result := make([]TreeNode, 0, len(nodes)) + for _, v := range nodes { + node, err := newTreeNode(v) + if err != nil { + return nil, fmt.Errorf("error parse tree node: %w", err) + } + result = append(result, *node) + } + + return result, nil +} + +func (c *Tree) GetListOIDBySplitID(ctx context.Context, containerID cid.ID, path string, splitID *object.SplitID) ([]oid.ID, error) { + nodes, err := c.getNodesBySplitID(ctx, containerID, path, splitID) + if err != nil { + return nil, err + } + + ids := make([]oid.ID, len(nodes)) + for i, node := range nodes { + ids[i] = node.ObjID + } + + return ids, nil +} + +func (c *Tree) AddObject(ctx context.Context, containerID cid.ID, filePath string, oid oid.ID, size uint64) (uint64, error) { + path := pathFromName(filePath) + meta := map[string]string{ + oidKV: oid.String(), + FileNameKey: path[len(path)-1], + payloadSize: strconv.FormatUint(size, 10), + fullPath: filePath, + modificationTime: strconv.FormatInt(time.Now().Unix(), 10), + } + + nodeID, err := c.service.AddNodeByPath(ctx, containerID, objectTree, path[:len(path)-1], meta) + + return nodeID, err +} + +func (c *Tree) AddPHYObject(ctx context.Context, containerID cid.ID, filePath string, oid oid.ID, splitID *object.SplitID) (uint64, error) { + meta := map[string]string{ + oidKV: oid.String(), + splitIDKV: splitID.String(), + } + + nodeID, err := c.service.AddNodeByPath(ctx, containerID, splitTree, pathFromName(filePath), meta) + + return nodeID, err +} + +func (c *Tree) DeleteObject(ctx context.Context, containerID cid.ID, nodeID uint64) error { + return c.service.RemoveNode(ctx, containerID, objectTree, nodeID) +} + +func (c *Tree) DeleteObjectsBySplitID(ctx context.Context, containerID cid.ID, path string, splitID *object.SplitID) error { + splitNodes, err := c.getNodesBySplitID(ctx, containerID, path, splitID) + if err != nil { + return err + } + + for _, node := range splitNodes { + err = c.service.RemoveNode(ctx, containerID, splitTree, node.ID) + if err != nil { + return err + } + } + + return nil +} + +func (c *Tree) determinePrefixNode(ctx context.Context, containerID cid.ID, treeID, prefixPath string) (uint64, error) { + var rootID uint64 + path := pathFromName(prefixPath) + + if len(path) > 1 { + var err error + rootID, err = c.getPrefixNodeID(ctx, containerID, treeID, path) + if err != nil { + return 0, err + } + } + + return rootID, nil +} + +func (c *Tree) getPrefixNodeID(ctx context.Context, containerID cid.ID, treeID string, prefixPath []string) (uint64, error) { + p := &GetNodesParams{ + ContainerID: containerID, + TreeID: treeID, + Path: prefixPath, + LatestOnly: false, + AllAttrs: true, + } + nodes, err := c.service.GetNodes(ctx, p) + if err != nil { + return 0, err + } + + var intermediateNodes []uint64 + for _, node := range nodes { + if isIntermediate(node) { + intermediateNodes = append(intermediateNodes, node.GetNodeID()) + } + } + + if len(intermediateNodes) == 0 { + return 0, ErrNodeNotFound + } + + if len(intermediateNodes) > 1 { + return 0, fmt.Errorf("found more than one intermediate nodes") + } + + return intermediateNodes[0], nil +} + +func (c *Tree) getNodesBySplitID(ctx context.Context, containerID cid.ID, filePath string, splitID *object.SplitID) ([]TreeNode, error) { + rootID, err := c.determinePrefixNode(ctx, containerID, splitTree, filePath) + if err != nil { + if errors.Is(err, ErrNodeNotFound) { + return nil, nil + } + return nil, err + } + + subTree, err := c.service.GetSubTree(ctx, containerID, splitTree, rootID, 2) + if err != nil { + if errors.Is(err, ErrNodeNotFound) { + return nil, nil + } + return nil, err + } + + nodes := make([]TreeNode, 0, len(subTree)) + for _, v := range subTree { + node, err := newTreeNode(v) + if err != nil { + return nil, fmt.Errorf("error parse tree node: %w", err) + } + nodes = append(nodes, *node) + } + + result := make([]TreeNode, 0) + for _, node := range nodes { + if val, ok := node.Meta[splitIDKV]; ok { + if val == splitID.String() { + result = append(result, node) + } + } + } + + return result, nil +} + +func isIntermediate(node NodeResponse) bool { + if len(node.GetMeta()) != 1 { + return false + } + + return node.GetMeta()[0].GetKey() == FileNameKey +} + +func newTreeNode(nodeInfo NodeResponse) (*TreeNode, error) { + treeNode := &TreeNode{ + ID: nodeInfo.GetNodeID(), + ParentID: nodeInfo.GetParentID(), + TimeStamp: nodeInfo.GetTimestamp(), + Meta: make(map[string]string, len(nodeInfo.GetMeta())), + } + + for _, kv := range nodeInfo.GetMeta() { + switch kv.GetKey() { + case oidKV: + if err := treeNode.ObjID.DecodeString(string(kv.GetValue())); err != nil { + return nil, err + } + case payloadSize: + if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 { + var err error + if treeNode.PayloadSize, err = strconv.ParseUint(sizeStr, 10, 64); err != nil { + return nil, fmt.Errorf("invalid payload size value '%s': %w", sizeStr, err) + } + } + case modificationTime: + timestamp, err := strconv.ParseInt(string(kv.GetValue()), 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid modification time value '%s': %w", string(kv.GetValue()), err) + } + treeNode.ModificationTime = time.Unix(timestamp, 0) + case fullPath: + treeNode.FullPath = string(kv.GetValue()) + default: + treeNode.Meta[kv.GetKey()] = string(kv.GetValue()) + } + } + + return treeNode, nil +} + +func pathFromName(objectName string) []string { + return strings.Split(objectName, separator) +} diff --git a/registry/storage/driver/frostfs/wrapper/pool_wrapper.go b/registry/storage/driver/frostfs/wrapper/pool_wrapper.go new file mode 100644 index 00000000..58f24015 --- /dev/null +++ b/registry/storage/driver/frostfs/wrapper/pool_wrapper.go @@ -0,0 +1,178 @@ +package wrapper + +import ( + "context" + "errors" + "fmt" + "io" + "strings" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree" + grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" + "github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type GetNodeByPathResponseInfoWrapper struct { + response *grpcService.GetNodeByPathResponse_Info +} + +func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 { + return n.response.GetNodeId() +} + +func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 { + return n.response.GetParentId() +} + +func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 { + return n.response.GetTimestamp() +} + +func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta { + res := make([]tree.Meta, len(n.response.Meta)) + for i, value := range n.response.Meta { + res[i] = value + } + return res +} + +type GetSubTreeResponseBodyWrapper struct { + response *grpcService.GetSubTreeResponse_Body +} + +func (n GetSubTreeResponseBodyWrapper) GetNodeID() uint64 { + return n.response.GetNodeId() +} + +func (n GetSubTreeResponseBodyWrapper) GetParentID() uint64 { + return n.response.GetParentId() +} + +func (n GetSubTreeResponseBodyWrapper) GetTimestamp() uint64 { + return n.response.GetTimestamp() +} + +func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta { + res := make([]tree.Meta, len(n.response.Meta)) + for i, value := range n.response.Meta { + res[i] = value + } + return res +} + +type PoolWrapper struct { + p *treepool.Pool +} + +func NewPoolWrapper(p *treepool.Pool) *PoolWrapper { + return &PoolWrapper{p: p} +} + +func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([]tree.NodeResponse, error) { + poolPrm := treepool.GetNodesParams{ + CID: prm.ContainerID, + TreeID: prm.TreeID, + Path: prm.Path, + Meta: prm.Meta, + PathAttribute: tree.FileNameKey, + LatestOnly: prm.LatestOnly, + AllAttrs: prm.AllAttrs, + } + + nodes, err := w.p.GetNodes(ctx, poolPrm) + if err != nil { + return nil, handleError(err) + } + + res := make([]tree.NodeResponse, len(nodes)) + for i, info := range nodes { + res[i] = GetNodeByPathResponseInfoWrapper{info} + } + + return res, nil +} + +func (w *PoolWrapper) GetSubTree(ctx context.Context, containerID cid.ID, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) { + poolPrm := treepool.GetSubTreeParams{ + CID: containerID, + TreeID: treeID, + RootID: rootID, + Depth: depth, + } + + subTreeReader, err := w.p.GetSubTree(ctx, poolPrm) + if err != nil { + return nil, handleError(err) + } + + var subtree []tree.NodeResponse + + node, err := subTreeReader.Next() + for err == nil { + subtree = append(subtree, GetSubTreeResponseBodyWrapper{node}) + node, err = subTreeReader.Next() + } + if err != nil && err != io.EOF { + return nil, handleError(err) + } + + return subtree, nil +} + +func (w *PoolWrapper) AddNodeByPath(ctx context.Context, containerID cid.ID, treeID string, path []string, meta map[string]string) (uint64, error) { + nodeID, err := w.p.AddNodeByPath(ctx, treepool.AddNodeByPathParams{ + CID: containerID, + TreeID: treeID, + Path: path, + Meta: meta, + PathAttribute: tree.FileNameKey, + }) + return nodeID, handleError(err) +} + +func (w *PoolWrapper) RemoveNode(ctx context.Context, containerID cid.ID, treeID string, nodeID uint64) error { + return handleError(w.p.RemoveNode(ctx, treepool.RemoveNodeParams{ + CID: containerID, + TreeID: treeID, + NodeID: nodeID, + })) +} + +func handleError(err error) error { + if err == nil { + return nil + } + if errors.Is(err, treepool.ErrNodeNotFound) { + return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error()) + } + if errors.Is(err, treepool.ErrNodeAccessDenied) { + return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error()) + } + if isTimeoutError(err) { + return fmt.Errorf("%w: %s", tree.ErrGatewayTimeout, err.Error()) + } + + return err +} + +func unwrapErr(err error) error { + unwrappedErr := errors.Unwrap(err) + for unwrappedErr != nil { + err = unwrappedErr + unwrappedErr = errors.Unwrap(err) + } + + return err +} + +func isTimeoutError(err error) bool { + if strings.Contains(err.Error(), "timeout") || + errors.Is(err, context.DeadlineExceeded) { + return true + } + + return status.Code(unwrapErr(err)) == codes.DeadlineExceeded +}