From 12a2060dd0f1303a0fba077dd391ba640a54809d Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Fri, 10 Jun 2022 14:57:41 +0300 Subject: [PATCH] [#524] Optimize listing Signed-off-by: Denis Kirillov --- api/data/tree.go | 6 +- api/layer/layer.go | 12 +-- api/layer/object.go | 221 +++++++++++++++++--------------------- api/layer/object_test.go | 92 ---------------- api/layer/tree_mock.go | 16 +-- api/layer/tree_service.go | 4 +- internal/neofs/tree.go | 173 +++++++++++++++++------------ 7 files changed, 224 insertions(+), 300 deletions(-) diff --git a/api/data/tree.go b/api/data/tree.go index 351c6d039..39723881a 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -19,9 +19,8 @@ type NodeVersion struct { // DeleteMarkerInfo is used to save object info if node in the tree service is delete marker. // We need this information because the "delete marker" object is no longer stored in NeoFS. type DeleteMarkerInfo struct { - FilePath string - Created time.Time - Owner user.ID + Created time.Time + Owner user.ID } // ExtendedObjectInfo contains additional node info to be able to sort versions by timestamp. @@ -37,6 +36,7 @@ type BaseNodeVersion struct { ID uint64 OID oid.ID Timestamp uint64 + FilePath string } type ObjectTaggingInfo struct { diff --git a/api/layer/layer.go b/api/layer/layer.go index cbc96c84d..e65ff8c52 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -485,12 +485,12 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings obj.DeleteMarkVersion = UnversionedObjectVersionID newVersion := &data.NodeVersion{ BaseNodeVersion: data.BaseNodeVersion{ - OID: *randOID, + OID: *randOID, + FilePath: obj.Name, }, DeleteMarker: &data.DeleteMarkerInfo{ - FilePath: obj.Name, - Created: time.Now(), - Owner: n.Owner(ctx), + Created: time.Now(), + Owner: n.Owner(ctx), }, IsUnversioned: true, } @@ -500,7 +500,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings obj.DeleteMarkVersion = randOID.EncodeToString() } - if obj.Error = n.treeService.AddVersion(ctx, &bkt.CID, obj.Name, newVersion); obj.Error != nil { + if obj.Error = n.treeService.AddVersion(ctx, &bkt.CID, newVersion); obj.Error != nil { return obj } @@ -576,7 +576,7 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (*cid.ID, error) } func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { - objects, err := n.listSortedObjects(ctx, allObjectParams{Bucket: p.BktInfo}) + objects, _, err := n.getLatestObjectsVersions(ctx, allObjectParams{Bucket: p.BktInfo, MaxKeys: 1}) if err != nil { return err } diff --git a/api/layer/object.go b/api/layer/object.go index 711e137bc..23df295c3 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -57,12 +57,19 @@ type ( } allObjectParams struct { - Bucket *data.BucketInfo - Delimiter string - Prefix string + Bucket *data.BucketInfo + Delimiter string + Prefix string + MaxKeys int + Marker string + ContinuationToken string } ) +const ( + continuationToken = "" +) + func newAddress(cnr cid.ID, obj oid.ID) oid.Address { var addr oid.Address addr.SetContainer(cnr) @@ -141,7 +148,10 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object own := n.Owner(ctx) versioningEnabled := n.isVersioningEnabled(ctx, p.BktInfo) - newVersion := &data.NodeVersion{IsUnversioned: !versioningEnabled} + newVersion := &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{FilePath: p.Object}, + IsUnversioned: !versioningEnabled, + } r := p.Reader if r != nil { @@ -178,7 +188,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object } newVersion.OID = *id - if err = n.treeService.AddVersion(ctx, &p.BktInfo.CID, p.Object, newVersion); err != nil { + if err = n.treeService.AddVersion(ctx, &p.BktInfo.CID, newVersion); err != nil { return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err) } @@ -345,124 +355,103 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn // ListObjectsV1 returns objects in a bucket for requests of Version 1. func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { - var ( - err error - result ListObjectsInfoV1 - allObjects []*data.ObjectInfo - ) + var result ListObjectsInfoV1 - if p.MaxKeys == 0 { - return &result, nil + prm := allObjectParams{ + Bucket: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + MaxKeys: p.MaxKeys, + Marker: p.Marker, } - if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil { + objects, next, err := n.getLatestObjectsVersions(ctx, prm) + if err != nil { return nil, err } - if len(allObjects) == 0 { - return &result, nil - } - - if p.Marker != "" { - allObjects = trimAfterObjectName(p.Marker, allObjects) - } - - if len(allObjects) > p.MaxKeys { + if next != nil { result.IsTruncated = true - allObjects = allObjects[:p.MaxKeys] - result.NextMarker = allObjects[len(allObjects)-1].Name + result.NextMarker = objects[len(objects)-1].Name } - result.Prefixes, result.Objects = triageObjects(allObjects) + result.Prefixes, result.Objects = triageObjects(objects) return &result, nil } // ListObjectsV2 returns objects in a bucket for requests of Version 2. func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) { - var ( - err error - result ListObjectsInfoV2 - allObjects []*data.ObjectInfo - ) + var result ListObjectsInfoV2 - if p.MaxKeys == 0 { - return &result, nil + prm := allObjectParams{ + Bucket: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + MaxKeys: p.MaxKeys, + Marker: p.StartAfter, + ContinuationToken: p.ContinuationToken, } - if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil { - return nil, err - } - - if len(allObjects) == 0 { - return &result, nil - } - - if p.ContinuationToken != "" { - allObjects = trimAfterObjectID(p.ContinuationToken, allObjects) - } - - if p.StartAfter != "" { - allObjects = trimAfterObjectName(p.StartAfter, allObjects) - } - - if len(allObjects) > p.MaxKeys { - result.IsTruncated = true - allObjects = allObjects[:p.MaxKeys] - result.NextContinuationToken = allObjects[len(allObjects)-1].ID.EncodeToString() - } - - result.Prefixes, result.Objects = triageObjects(allObjects) - - return &result, nil -} - -func (n *layer) listSortedObjects(ctx context.Context, p allObjectParams) ([]*data.ObjectInfo, error) { - objects, err := n.getLatestObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter) + objects, next, err := n.getLatestObjectsVersions(ctx, prm) if err != nil { return nil, err } - sort.Slice(objects, func(i, j int) bool { - return objects[i].Name < objects[j].Name - }) + if next != nil { + result.IsTruncated = true + result.NextContinuationToken = next.ID.EncodeToString() + } - return objects, nil + result.Prefixes, result.Objects = triageObjects(objects) + + return &result, nil } -func (n *layer) getLatestObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) ([]*data.ObjectInfo, error) { +func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) ([]*data.ObjectInfo, *data.ObjectInfo, error) { + if p.MaxKeys == 0 { + return nil, nil, nil + } + var err error - cacheKey := cache.CreateObjectsListCacheKey(&bkt.CID, prefix, true) - ids := n.listsCache.Get(cacheKey) + cacheKey := cache.CreateObjectsListCacheKey(&p.Bucket.CID, p.Prefix, true) + nodeVersions := n.listsCache.GetVersions(cacheKey) - if ids == nil { - ids, err = n.treeService.GetLatestVersionsByPrefix(ctx, &bkt.CID, prefix) + if nodeVersions == nil { + nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, &p.Bucket.CID, p.Prefix) if err != nil { - return nil, err + return nil, nil, err } - if err := n.listsCache.Put(cacheKey, ids); err != nil { + if err = n.listsCache.PutVersions(cacheKey, nodeVersions); err != nil { n.log.Error("couldn't cache list of objects", zap.Error(err)) } } - objectsMap := make(map[string]*data.ObjectInfo, len(ids)) // to squash the same directories - for i := 0; i < len(ids); i++ { - obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt, ids[i]) - if obj == nil { + sort.Slice(nodeVersions, func(i, j int) bool { + return nodeVersions[i].FilePath < nodeVersions[j].FilePath + }) + + existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories + objects := make([]*data.ObjectInfo, 0, p.MaxKeys) + + for _, node := range nodeVersions { + if shouldSkip(node, p, existed) { continue } - if oi := objectInfoFromMeta(bkt, obj, prefix, delimiter); oi != nil { - objectsMap[oi.Name] = oi + + if len(objects) == p.MaxKeys { + return objects, &data.ObjectInfo{ID: node.OID, Name: node.FilePath}, nil + } + + if obj := n.objectFromObjectsCacheOrNeoFS(ctx, p.Bucket, node.OID); obj != nil { + if oi := objectInfoFromMeta(p.Bucket, obj, p.Prefix, p.Delimiter); oi != nil { + objects = append(objects, oi) + } } } - objects := make([]*data.ObjectInfo, 0, len(objectsMap)) - for _, obj := range objectsMap { - objects = append(objects, obj) - } - - return objects, nil + return objects, nil, nil } func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) { @@ -488,7 +477,7 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, if nodeVersion.DeleteMarker != nil { // delete marker does not match any object in NeoFS oi.ID = nodeVersion.OID - oi.Name = nodeVersion.DeleteMarker.FilePath + oi.Name = nodeVersion.FilePath oi.Owner = nodeVersion.DeleteMarker.Owner oi.Created = nodeVersion.DeleteMarker.Created oi.IsDeleteMarker = true @@ -524,30 +513,34 @@ func IsSystemHeader(key string) bool { return strings.HasPrefix(key, "S3-") } -func trimAfterObjectName(startAfter string, objects []*data.ObjectInfo) []*data.ObjectInfo { - if len(objects) != 0 && objects[len(objects)-1].Name <= startAfter { - return nil +func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { + filepath := node.FilePath + if len(p.Delimiter) > 0 { + tail := strings.TrimPrefix(filepath, p.Prefix) + index := strings.Index(tail, p.Delimiter) + if index >= 0 { + filepath = p.Prefix + tail[:index+1] + } } - for i := range objects { - if objects[i].Name > startAfter { - return objects[i:] + if _, ok := existed[filepath]; ok { + return true + } + + if filepath <= p.Marker { + return true + } + + if p.ContinuationToken != "" { + if _, ok := existed[continuationToken]; !ok { + if p.ContinuationToken != node.OID.EncodeToString() { + return true + } + existed[continuationToken] = struct{}{} } } - return nil -} - -func trimAfterObjectID(id string, objects []*data.ObjectInfo) []*data.ObjectInfo { - if len(objects) != 0 && objects[len(objects)-1].ID.EncodeToString() == id { - return []*data.ObjectInfo{} - } - for i, obj := range objects { - if obj.ID.EncodeToString() == id { - return objects[i+1:] - } - } - - return nil + existed[filepath] = struct{}{} + return false } func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) { @@ -574,24 +567,6 @@ func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []st return } -func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*data.ObjectInfo, error) { - var ( - err error - allObjects []*data.ObjectInfo - ) - - allObjects, err = n.listSortedObjects(ctx, allObjectParams{ - Bucket: p.BktInfo, - Prefix: p.Prefix, - Delimiter: p.Delimiter, - }) - if err != nil { - return nil, err - } - - return allObjects, nil -} - func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *data.BucketInfo) bool { settings, err := n.GetBucketSettings(ctx, bktInfo) if err != nil { diff --git a/api/layer/object_test.go b/api/layer/object_test.go index 0f285fdd2..ec4475eab 100644 --- a/api/layer/object_test.go +++ b/api/layer/object_test.go @@ -7,101 +7,9 @@ import ( "io/ioutil" "testing" - "github.com/nspcc-dev/neofs-s3-gw/api/data" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) -func TestTrimAfterObjectName(t *testing.T) { - var ( - objects []*data.ObjectInfo - names = []string{"b", "c", "d"} - ) - for _, name := range names { - objects = append(objects, &data.ObjectInfo{Name: name}) - } - - t.Run("startafter before all objects", func(t *testing.T) { - actual := trimAfterObjectName("a", objects) - require.Equal(t, objects, actual) - }) - - t.Run("startafter first object", func(t *testing.T) { - actual := trimAfterObjectName(names[0], objects) - require.Equal(t, objects[1:], actual) - }) - - t.Run("startafter second-to-last object", func(t *testing.T) { - actual := trimAfterObjectName(names[len(names)-2], objects) - require.Equal(t, objects[len(objects)-1:], actual) - }) - - t.Run("startafter last object", func(t *testing.T) { - actual := trimAfterObjectName(names[len(names)-1], objects) - require.Empty(t, actual) - }) - - t.Run("startafter after all objects", func(t *testing.T) { - actual := trimAfterObjectName("z", objects) - require.Nil(t, actual) - }) - - t.Run("empty objects", func(t *testing.T) { - actual := trimAfterObjectName(names[0], []*data.ObjectInfo{}) - require.Nil(t, actual) - }) - - t.Run("nil objects", func(t *testing.T) { - actual := trimAfterObjectName(names[0], nil) - require.Nil(t, actual) - }) - - t.Run("empty startafter", func(t *testing.T) { - actual := trimAfterObjectName("", objects) - require.Equal(t, objects, actual) - }) -} - -func TestTrimAfterObjectID(t *testing.T) { - var ( - objects []*data.ObjectInfo - ids []oid.ID - numberOfIDS = 3 - ) - - for i := 0; i < numberOfIDS; i++ { - id := oidtest.ID() - objects = append(objects, &data.ObjectInfo{ID: id}) - ids = append(ids, id) - } - - t.Run("existing id", func(t *testing.T) { - actual := trimAfterObjectID(ids[0].EncodeToString(), objects) - require.Equal(t, objects[1:], actual) - }) - - t.Run("second to last id", func(t *testing.T) { - actual := trimAfterObjectID(ids[len(ids)-2].EncodeToString(), objects) - require.Equal(t, objects[len(objects)-1:], actual) - }) - - t.Run("non-existing id", func(t *testing.T) { - actual := trimAfterObjectID("z", objects) - require.Nil(t, actual) - }) - - t.Run("last id", func(t *testing.T) { - actual := trimAfterObjectID(ids[len(ids)-1].EncodeToString(), objects) - require.Empty(t, actual) - }) - - t.Run("empty id", func(t *testing.T) { - actual := trimAfterObjectID("", objects) - require.Nil(t, actual) - }) -} - func TestWrapReader(t *testing.T) { src := make([]byte, 1024*1024+1) _, err := rand.Read(src) diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 755becb36..c41f870c4 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -137,13 +137,13 @@ func (t *TreeServiceMock) GetLatestVersion(_ context.Context, cnrID *cid.ID, obj return nil, ErrNodeNotFound } -func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error) { +func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) { cnrVersionsMap, ok := t.versions[cnrID.EncodeToString()] if !ok { return nil, ErrNodeNotFound } - var result []oid.ID + var result []*data.NodeVersion for key, versions := range cnrVersionsMap { if !strings.HasPrefix(key, prefix) { @@ -155,7 +155,7 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, cnrID *ci }) if len(versions) != 0 { - result = append(result, versions[len(versions)-1].OID) + result = append(result, versions[len(versions)-1]) } } @@ -182,18 +182,18 @@ func (t *TreeServiceMock) GetUnversioned(_ context.Context, cnrID *cid.ID, objec return nil, ErrNodeNotFound } -func (t *TreeServiceMock) AddVersion(_ context.Context, cnrID *cid.ID, objectName string, newVersion *data.NodeVersion) error { +func (t *TreeServiceMock) AddVersion(_ context.Context, cnrID *cid.ID, newVersion *data.NodeVersion) error { cnrVersionsMap, ok := t.versions[cnrID.EncodeToString()] if !ok { t.versions[cnrID.EncodeToString()] = map[string][]*data.NodeVersion{ - objectName: {newVersion}, + newVersion.FilePath: {newVersion}, } return nil } - versions, ok := cnrVersionsMap[objectName] + versions, ok := cnrVersionsMap[newVersion.FilePath] if !ok { - cnrVersionsMap[objectName] = []*data.NodeVersion{newVersion} + cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion} return nil } @@ -217,7 +217,7 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, cnrID *cid.ID, objectNam } } - cnrVersionsMap[objectName] = append(result, newVersion) + cnrVersionsMap[newVersion.FilePath] = append(result, newVersion) return nil } diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index ab8353b60..307230b5a 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -40,10 +40,10 @@ type TreeService interface { GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]*data.NodeVersion, error) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.NodeVersion, error) - GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error) + GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) GetUnversioned(ctx context.Context, cnrID *cid.ID, objectName string) (*data.NodeVersion, error) - AddVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *data.NodeVersion) error + AddVersion(ctx context.Context, cnrID *cid.ID, newVersion *data.NodeVersion) error RemoveVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error PutLock(ctx context.Context, cnrID *cid.ID, nodeID uint64, lock *data.LockInfo) error diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 47f31f18e..c11e3454e 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -67,7 +67,6 @@ const ( // keys for delete marker nodes. isDeleteMarkerKV = "IdDeleteMarker" - filePathKV = "FilePath" ownerKV = "Owner" createdKV = "Created" @@ -140,16 +139,25 @@ func (n *TreeNode) Get(key string) (string, bool) { return value, ok } -func newNodeVersion(node NodeResponse) (*data.NodeVersion, error) { +func (n *TreeNode) FileName() (string, bool) { + value, ok := n.Meta[fileNameKV] + if ok && value == emptyFileName { + value = "" + } + + return value, ok +} + +func newNodeVersion(filePath string, node NodeResponse) (*data.NodeVersion, error) { treeNode, err := newTreeNode(node) if err != nil { return nil, fmt.Errorf("invalid tree node: %w", err) } - return newNodeVersionFromTreeNode(treeNode), nil + return newNodeVersionFromTreeNode(filePath, treeNode), nil } -func newNodeVersionFromTreeNode(treeNode *TreeNode) *data.NodeVersion { +func newNodeVersionFromTreeNode(filePath string, treeNode *TreeNode) *data.NodeVersion { _, isUnversioned := treeNode.Get(isUnversionedKV) _, isDeleteMarker := treeNode.Get(isDeleteMarkerKV) @@ -158,13 +166,12 @@ func newNodeVersionFromTreeNode(treeNode *TreeNode) *data.NodeVersion { ID: treeNode.ID, OID: treeNode.ObjID, Timestamp: treeNode.TimeStamp, + FilePath: filePath, }, IsUnversioned: isUnversioned, } if isDeleteMarker { - filePath, _ := treeNode.Get(filePathKV) - var created time.Time if createdStr, ok := treeNode.Get(createdKV); ok { if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err == nil { @@ -178,9 +185,8 @@ func newNodeVersionFromTreeNode(treeNode *TreeNode) *data.NodeVersion { } version.DeleteMarker = &data.DeleteMarkerInfo{ - FilePath: filePath, - Created: created, - Owner: owner, + Created: created, + Owner: owner, } } return version @@ -524,7 +530,24 @@ func (c *TreeClient) GetLatestVersion(ctx context.Context, cnrID *cid.ID, object meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV} path := pathFromName(objectName) - return c.getLatestVersion(ctx, cnrID, versionTree, path, meta) + p := &getNodesParams{ + CnrID: cnrID, + TreeID: versionTree, + Path: path, + Meta: meta, + LatestOnly: true, + AllAttrs: false, + } + nodes, err := c.getNodes(ctx, p) + if err != nil { + return nil, fmt.Errorf("couldn't get nodes: %w", err) + } + + if len(nodes) == 0 { + return nil, layer.ErrNodeNotFound + } + + return newNodeVersion(objectName, nodes[0]) } // pathFromName splits name by '/' and add an empty marker if name has trailing or leading slash. @@ -539,22 +562,20 @@ func pathFromName(objectName string) []string { return path } -func (c *TreeClient) GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error) { - subTreeNodes, err := c.getSubTreeByPrefix(ctx, cnrID, versionTree, prefix) +func (c *TreeClient) GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) { + subTreeNodes, commonPrefix, err := c.getSubTreeByPrefix(ctx, cnrID, versionTree, prefix) if err != nil { return nil, err } - var result []oid.ID + var result []*data.NodeVersion for _, node := range subTreeNodes { - latestNodes, err := c.getSubTreeVersions(ctx, cnrID, node.GetNodeId(), true) + latestNodes, err := c.getSubTreeVersions(ctx, cnrID, node.GetNodeId(), commonPrefix, true) if err != nil { return nil, err } - for _, latest := range latestNodes { - result = append(result, latest.OID) - } + result = append(result, latestNodes...) } return result, nil @@ -609,21 +630,21 @@ func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, treeID return intermediateNodes[0], nil } -func (c *TreeClient) getSubTreeByPrefix(ctx context.Context, cnrID *cid.ID, treeID, prefix string) ([]*tree.GetSubTreeResponse_Body, error) { +func (c *TreeClient) getSubTreeByPrefix(ctx context.Context, cnrID *cid.ID, treeID, prefix string) ([]*tree.GetSubTreeResponse_Body, string, error) { rootID, tailPrefix, err := c.determinePrefixNode(ctx, cnrID, treeID, prefix) if err != nil { if errors.Is(err, layer.ErrNodeNotFound) { - return nil, nil + return nil, "", nil } - return nil, err + return nil, "", err } subTree, err := c.getSubTree(ctx, cnrID, treeID, rootID, 1) if err != nil { if errors.Is(err, layer.ErrNodeNotFound) { - return nil, nil + return nil, "", nil } - return nil, err + return nil, "", err } result := make([]*tree.GetSubTreeResponse_Body, 0, len(subTree)) @@ -633,7 +654,7 @@ func (c *TreeClient) getSubTreeByPrefix(ctx context.Context, cnrID *cid.ID, tree } } - return result, nil + return result, strings.TrimSuffix(prefix, tailPrefix), nil } func hasPrefix(node *tree.GetSubTreeResponse_Body, prefix string) bool { @@ -654,33 +675,49 @@ func isIntermediate(node *tree.GetNodeByPathResponse_Info) bool { return node.GetMeta()[0].GetKey() == fileNameKV } -func (c *TreeClient) getSubTreeVersions(ctx context.Context, cnrID *cid.ID, nodeID uint64, latestOnly bool) ([]*data.NodeVersion, error) { +func (c *TreeClient) getSubTreeVersions(ctx context.Context, cnrID *cid.ID, nodeID uint64, parentFilePath string, latestOnly bool) ([]*data.NodeVersion, error) { subTree, err := c.getSubTree(ctx, cnrID, versionTree, nodeID, maxGetSubTreeDepth) if err != nil { return nil, err } - var emptyOID oid.ID + var parentPrefix string + if parentFilePath != "" { // The root of subTree can also have a parent + parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar' + } + var emptyOID oid.ID + var filepath string + namesMap := make(map[uint64]string, len(subTree)) versions := make(map[string][]*data.NodeVersion, len(subTree)) - for _, node := range subTree { - treeNode, err := newTreeNode(node) - if err != nil || treeNode.ObjID.Equals(emptyOID) { // invalid or empty OID attribute + + for i, node := range subTree { + treeNode, fileName, err := parseTreeNode(node) + if err != nil { continue } - fileName, ok := treeNode.Get(fileNameKV) - if !ok { + + if i != 0 { + if filepath, err = formFilePath(node, fileName, namesMap); err != nil { + return nil, fmt.Errorf("invalid node order: %w", err) + } + } else { + filepath = parentPrefix + fileName + namesMap[treeNode.ID] = filepath + } + + if treeNode.ObjID.Equals(emptyOID) { // The node can be intermediate but we still want to update namesMap continue } key := formLatestNodeKey(node.GetParentId(), fileName) versionNodes, ok := versions[key] if !ok { - versionNodes = []*data.NodeVersion{newNodeVersionFromTreeNode(treeNode)} + versionNodes = []*data.NodeVersion{newNodeVersionFromTreeNode(filepath, treeNode)} } else if !latestOnly { - versionNodes = append(versionNodes, newNodeVersionFromTreeNode(treeNode)) + versionNodes = append(versionNodes, newNodeVersionFromTreeNode(filepath, treeNode)) } else if versionNodes[0].Timestamp <= treeNode.TimeStamp { - versionNodes[0] = newNodeVersionFromTreeNode(treeNode) + versionNodes[0] = newNodeVersionFromTreeNode(filepath, treeNode) } versions[key] = versionNodes @@ -697,19 +734,45 @@ func (c *TreeClient) getSubTreeVersions(ctx context.Context, cnrID *cid.ID, node return result, nil } +func formFilePath(node *tree.GetSubTreeResponse_Body, fileName string, namesMap map[uint64]string) (string, error) { + parentPath, ok := namesMap[node.GetParentId()] + if !ok { + return "", fmt.Errorf("couldn't get parent path") + } + + filepath := parentPath + separator + fileName + namesMap[node.GetNodeId()] = filepath + + return filepath, nil +} + +func parseTreeNode(node *tree.GetSubTreeResponse_Body) (*TreeNode, string, error) { + treeNode, err := newTreeNode(node) + if err != nil { // invalid OID attribute + return nil, "", err + } + + fileName, ok := treeNode.FileName() + if !ok { + return nil, "", fmt.Errorf("doesn't contain FileName") + } + + return treeNode, fileName, nil +} + func formLatestNodeKey(parentID uint64, fileName string) string { - return strconv.FormatUint(parentID, 10) + fileName + return strconv.FormatUint(parentID, 10) + "." + fileName } func (c *TreeClient) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) { - subTreeNodes, err := c.getSubTreeByPrefix(ctx, cnrID, versionTree, prefix) + prefixNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, cnrID, versionTree, prefix) if err != nil { return nil, err } var result []*data.NodeVersion - for _, node := range subTreeNodes { - versions, err := c.getSubTreeVersions(ctx, cnrID, node.GetNodeId(), false) + for _, node := range prefixNodes { + versions, err := c.getSubTreeVersions(ctx, cnrID, node.GetNodeId(), headPrefix, false) if err != nil { return nil, err } @@ -719,27 +782,6 @@ func (c *TreeClient) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid.ID, return result, nil } -func (c *TreeClient) getLatestVersion(ctx context.Context, cnrID *cid.ID, treeID string, path, meta []string) (*data.NodeVersion, error) { - p := &getNodesParams{ - CnrID: cnrID, - TreeID: treeID, - Path: path, - Meta: meta, - LatestOnly: true, - AllAttrs: false, - } - nodes, err := c.getNodes(ctx, p) - if err != nil { - return nil, fmt.Errorf("couldn't get nodes: %w", err) - } - - if len(nodes) == 0 { - return nil, layer.ErrNodeNotFound - } - - return newNodeVersion(nodes[0]) -} - func (c *TreeClient) GetUnversioned(ctx context.Context, cnrID *cid.ID, filepath string) (*data.NodeVersion, error) { return c.getUnversioned(ctx, cnrID, versionTree, filepath) } @@ -761,8 +803,8 @@ func (c *TreeClient) getUnversioned(ctx context.Context, cnrID *cid.ID, treeID, return nodes[0], nil } -func (c *TreeClient) AddVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *data.NodeVersion) error { - return c.addVersion(ctx, cnrID, versionTree, filepath, version) +func (c *TreeClient) AddVersion(ctx context.Context, cnrID *cid.ID, version *data.NodeVersion) error { + return c.addVersion(ctx, cnrID, versionTree, version) } func (c *TreeClient) RemoveVersion(ctx context.Context, cnrID *cid.ID, id uint64) error { @@ -777,7 +819,7 @@ func (c *TreeClient) CreateMultipartUpload(ctx context.Context, cnrID *cid.ID, i } func (c *TreeClient) GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error) { - subTreeNodes, err := c.getSubTreeByPrefix(ctx, cnrID, systemTree, prefix) + subTreeNodes, _, err := c.getSubTreeByPrefix(ctx, cnrID, systemTree, prefix) if err != nil { return nil, err } @@ -984,8 +1026,8 @@ func (c *TreeClient) Close() error { return nil } -func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, filepath string, version *data.NodeVersion) error { - path := pathFromName(filepath) +func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID string, version *data.NodeVersion) error { + path := pathFromName(version.FilePath) meta := map[string]string{ oidKV: version.OID.EncodeToString(), fileNameKV: path[len(path)-1], @@ -993,7 +1035,6 @@ func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, file if version.DeleteMarker != nil { meta[isDeleteMarkerKV] = "true" - meta[filePathKV] = version.DeleteMarker.FilePath meta[ownerKV] = version.DeleteMarker.Owner.EncodeToString() meta[createdKV] = strconv.FormatInt(version.DeleteMarker.Created.UTC().UnixMilli(), 10) } @@ -1001,7 +1042,7 @@ func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, file if version.IsUnversioned { meta[isUnversionedKV] = "true" - node, err := c.getUnversioned(ctx, cnrID, treeID, filepath) + node, err := c.getUnversioned(ctx, cnrID, treeID, version.FilePath) if err == nil { parentID, err := c.getParent(ctx, cnrID, treeID, node.ID) if err != nil { @@ -1040,7 +1081,7 @@ func (c *TreeClient) getVersions(ctx context.Context, cnrID *cid.ID, treeID, fil result := make([]*data.NodeVersion, 0, len(nodes)) for _, node := range nodes { - nodeVersion, err := newNodeVersion(node) + nodeVersion, err := newNodeVersion(filepath, node) if err != nil { return nil, err }