From 25477cdaf870a2f9536485c1fcf1d5d2ce4e5bc9 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Fri, 20 May 2022 11:26:35 +0300 Subject: [PATCH] [#416] Use tree service to list objects Signed-off-by: Denis Kirillov --- api/cache/objectslist.go | 14 ++- api/handler/handlers_test.go | 8 +- api/layer/object.go | 51 +++++++-- api/layer/tree_service.go | 1 + internal/neofs/tree.go | 146 ++++++++++++++++++++++--- internal/neofstest/tree/tree_mock.go | 157 +++++++++++++++++++++++++++ 6 files changed, 342 insertions(+), 35 deletions(-) create mode 100644 internal/neofstest/tree/tree_mock.go diff --git a/api/cache/objectslist.go b/api/cache/objectslist.go index e28d47b9..7eed1bd1 100644 --- a/api/cache/objectslist.go +++ b/api/cache/objectslist.go @@ -33,8 +33,9 @@ type ( // ObjectsListKey is a key to find a ObjectsListCache's entry. ObjectsListKey struct { - cid string - prefix string + cid string + prefix string + latestOnly bool } ) @@ -103,11 +104,12 @@ func (l *ObjectsListCache) CleanCacheEntriesContainingObject(objectName string, } } -// CreateObjectsListCacheKey returns ObjectsListKey with the given CID and prefix. -func CreateObjectsListCacheKey(cnr cid.ID, prefix string) ObjectsListKey { +// CreateObjectsListCacheKey returns ObjectsListKey with the given CID, prefix and latestOnly flag. +func CreateObjectsListCacheKey(cnr *cid.ID, prefix string, latestOnly bool) ObjectsListKey { p := ObjectsListKey{ - cid: cnr.EncodeToString(), - prefix: prefix, + cid: cnr.EncodeToString(), + prefix: prefix, + latestOnly: latestOnly, } return p diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index bd53e638..7793036f 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -17,6 +17,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/api/resolver" + treetest "github.com/nspcc-dev/neofs-s3-gw/internal/neofstest/tree" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/user" @@ -55,9 +56,10 @@ func prepareHandlerContext(t *testing.T) *handlerContext { }) layerCfg := &layer.Config{ - Caches: layer.DefaultCachesConfigs(zap.NewExample()), - AnonKey: layer.AnonymousKey{Key: key}, - Resolver: testResolver, + Caches: layer.DefaultCachesConfigs(zap.NewExample()), + AnonKey: layer.AnonymousKey{Key: key}, + Resolver: testResolver, + TreeService: treetest.NewTreeService(), } h := &handler{ diff --git a/api/layer/object.go b/api/layer/object.go index 6d828eb1..6ffe36f8 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -503,19 +503,11 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis } func (n *layer) listSortedObjects(ctx context.Context, p allObjectParams) ([]*data.ObjectInfo, error) { - versions, err := n.getAllObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter) + objects, err := n.getLatestObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter) if err != nil { return nil, err } - objects := make([]*data.ObjectInfo, 0, len(versions)) - for _, v := range versions { - lastVersion := v.getLast() - if lastVersion != nil { - objects = append(objects, lastVersion) - } - } - sort.Slice(objects, func(i, j int) bool { return objects[i].Name < objects[j].Name }) @@ -523,10 +515,49 @@ func (n *layer) listSortedObjects(ctx context.Context, p allObjectParams) ([]*da return objects, nil } +func (n *layer) getLatestObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) ([]*data.ObjectInfo, error) { + var err error + + cacheKey := cache.CreateObjectsListCacheKey(&bkt.CID, prefix, true) + ids := n.listsCache.Get(cacheKey) + + if ids == nil { + ids, err = n.treeService.GetLatestVersionsByPrefix(ctx, &bkt.CID, prefix) + if err != nil { + return nil, err + } + if err := n.listsCache.Put(cacheKey, ids); err != nil { + n.log.Error("couldn't cache list of objects", zap.Error(err)) + } + } + + objectsMap := make(map[string]*data.ObjectInfo, len(ids)) // to squash the same directories + for i := 0; i < len(ids); i++ { + obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt, ids[i]) + if obj == nil { + continue + } + if oi := objectInfoFromMeta(bkt, obj, prefix, delimiter); oi != nil { + if isSystem(oi) { + continue + } + + objectsMap[oi.Name] = oi + } + } + + objects := make([]*data.ObjectInfo, 0, len(objectsMap)) + for _, obj := range objectsMap { + objects = append(objects, obj) + } + + return objects, nil +} + func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string]*objectVersions, error) { var err error - cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix) + cacheKey := cache.CreateObjectsListCacheKey(&bkt.CID, prefix, false) ids := n.listsCache.Get(cacheKey) if ids == nil { diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index d933b11a..dfbec0bc 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -32,6 +32,7 @@ type TreeService interface { GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]*NodeVersion, error) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*NodeVersion, error) + GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error) GetUnversioned(ctx context.Context, cnrID *cid.ID, objectName string) (*NodeVersion, error) AddVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *NodeVersion) error RemoveVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 64afedea..9e84a771 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -56,8 +56,12 @@ const ( systemTree = "system" separator = "/" + + maxGetSubTreeDepth = 10 // current limit on storage node side ) +var emptyOID oid.ID + // NewTreeClient creates instance of TreeClient using provided address and create grpc connection. func NewTreeClient(addr string, key *keys.PrivateKey) (*TreeClient, error) { conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -74,7 +78,13 @@ func NewTreeClient(addr string, key *keys.PrivateKey) (*TreeClient, error) { }, nil } -func newTreeNode(nodeInfo *tree.GetNodeByPathResponse_Info) (*TreeNode, error) { +type NodeResponse interface { + GetMeta() []*tree.KeyValue + GetNodeId() uint64 + GetTimestamp() uint64 +} + +func newTreeNode(nodeInfo NodeResponse) (*TreeNode, error) { var objID oid.ID meta := make(map[string]string, len(nodeInfo.GetMeta())) @@ -92,7 +102,7 @@ func newTreeNode(nodeInfo *tree.GetNodeByPathResponse_Info) (*TreeNode, error) { return &TreeNode{ ID: nodeInfo.GetNodeId(), ObjID: objID, - TimeStamp: nodeInfo.Timestamp, + TimeStamp: nodeInfo.GetTimestamp(), Meta: meta, }, nil } @@ -102,7 +112,7 @@ func (n *TreeNode) Get(key string) (string, bool) { return value, ok } -func newNodeVersion(node *tree.GetNodeByPathResponse_Info) (*layer.NodeVersion, error) { +func newNodeVersion(node NodeResponse) (*layer.NodeVersion, error) { treeNode, err := newTreeNode(node) if err != nil { return nil, fmt.Errorf("invalid tree node: %w", err) @@ -113,7 +123,7 @@ func newNodeVersion(node *tree.GetNodeByPathResponse_Info) (*layer.NodeVersion, return &layer.NodeVersion{ BaseNodeVersion: layer.BaseNodeVersion{ - ID: node.NodeId, + ID: treeNode.ID, OID: treeNode.ObjID, }, IsUnversioned: isUnversioned, @@ -242,6 +252,96 @@ func (c *TreeClient) GetLatestVersion(ctx context.Context, cnrID *cid.ID, object return c.getLatestVersion(ctx, cnrID, versionTree, fileNameKV, path, meta) } +func (c *TreeClient) GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error) { + var rootID uint64 + path := strings.Split(prefix, separator) + tailPrefix := path[len(path)-1] + + if len(path) > 1 { + meta := []string{fileNameKV} + + nodes, err := c.getNodes(ctx, cnrID, versionTree, fileNameKV, path[:len(path)-1], meta, true) + if err != nil { + return nil, err + } + if len(nodes) == 0 { + return nil, nil + } + if len(nodes) != 1 { + return nil, layer.ErrNodeNotFound + } + + rootID = nodes[0].NodeId + } + + subTree, err := c.getSubTree(ctx, cnrID, versionTree, rootID, 1) + if err != nil { + return nil, err + } + + var result []oid.ID + for _, node := range subTree { + if node.GetNodeId() != 0 && hasPrefix(node, tailPrefix) { + latestNodes, err := c.getSubTreeLatestVersions(ctx, cnrID, node.GetNodeId()) + if err != nil { + return nil, err + } + result = append(result, latestNodes...) + } + } + + return result, nil +} + +func hasPrefix(node *tree.GetSubTreeResponse_Body, prefix string) bool { + for _, kv := range node.GetMeta() { + if kv.GetKey() == fileNameKV { + return strings.HasPrefix(string(kv.GetValue()), prefix) + } + } + + return false +} + +func (c *TreeClient) getSubTreeLatestVersions(ctx context.Context, cnrID *cid.ID, nodeID uint64) ([]oid.ID, error) { + subTree, err := c.getSubTree(ctx, cnrID, versionTree, nodeID, maxGetSubTreeDepth) + if err != nil { + return nil, err + } + + latestVersions := make(map[string]*TreeNode, len(subTree)) + for _, node := range subTree { + treeNode, err := newTreeNode(node) + if err != nil || treeNode.ObjID.Equals(emptyOID) { // invalid OID attribute + continue + } + fileName, ok := treeNode.Get(fileNameKV) + if !ok { + continue + } + + key := formLatestNodeKey(node.GetParentId(), fileName) + latest, ok := latestVersions[key] + if !ok || latest.TimeStamp <= treeNode.TimeStamp { // todo also compare oid + latestVersions[key] = treeNode + } + } + + result := make([]oid.ID, 0, len(latestVersions)) + for _, treeNode := range latestVersions { + if _, ok := treeNode.Get(isDeleteMarkerKV); ok { + continue + } + result = append(result, treeNode.ObjID) + } + + return result, nil +} + +func formLatestNodeKey(parentID uint64, fileName string) string { + return strconv.FormatUint(parentID, 10) + fileName +} + func (c *TreeClient) GetSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*layer.BaseNodeVersion, error) { meta := []string{oidKV} path := strings.Split(objectName, separator) @@ -379,11 +479,21 @@ func (c *TreeClient) getVersions(ctx context.Context, cnrID *cid.ID, treeID, fil } func (c *TreeClient) getParent(ctx context.Context, cnrID *cid.ID, treeID string, id uint64) (uint64, error) { + subTree, err := c.getSubTree(ctx, cnrID, treeID, id, 0) + if err != nil { + return 0, err + } + + return subTree[0].GetParentId(), nil +} + +func (c *TreeClient) getSubTree(ctx context.Context, cnrID *cid.ID, treeID string, rootID uint64, depth uint32) ([]*tree.GetSubTreeResponse_Body, error) { request := &tree.GetSubTreeRequest{ Body: &tree.GetSubTreeRequest_Body{ ContainerId: cnrID[:], TreeId: treeID, - RootId: id, + RootId: rootID, + Depth: depth, BearerToken: getBearer(ctx), }, } @@ -394,28 +504,32 @@ func (c *TreeClient) getParent(ctx context.Context, cnrID *cid.ID, treeID string Sign: sign, } }); err != nil { - return 0, err + return nil, err } cli, err := c.service.GetSubTree(ctx, request) if err != nil { - return 0, fmt.Errorf("failed to get sub tree client: %w", err) - } - - resp, err := cli.Recv() - if err != nil { - return 0, fmt.Errorf("failed to get sub tree: %w", err) + if strings.Contains(err.Error(), "not found") { + return nil, nil + } + return nil, fmt.Errorf("failed to get sub tree client: %w", err) } + var subtree []*tree.GetSubTreeResponse_Body for { - if _, err = cli.Recv(); err == io.EOF { + resp, err := cli.Recv() + if err == io.EOF { break } else if err != nil { - return 0, fmt.Errorf("failed to read out sub tree stream: %w", err) + if strings.Contains(err.Error(), "not found") { + return nil, nil + } + return nil, fmt.Errorf("failed to get sub tree: %w", err) } + subtree = append(subtree, resp.Body) } - return resp.GetBody().GetParentId(), nil + return subtree, nil } func metaFromSettings(settings *data.BucketSettings) map[string]string { @@ -474,7 +588,7 @@ func (c *TreeClient) getNodes(ctx context.Context, cnrID *cid.ID, treeID, pathAt resp, err := c.service.GetNodeByPath(ctx, request) if err != nil { - return nil, fmt.Errorf("failed to get node path deb: %w", err) + return nil, fmt.Errorf("failed to get node path: %w", err) } return resp.GetBody().GetNodes(), nil diff --git a/internal/neofstest/tree/tree_mock.go b/internal/neofstest/tree/tree_mock.go new file mode 100644 index 00000000..ca48479e --- /dev/null +++ b/internal/neofstest/tree/tree_mock.go @@ -0,0 +1,157 @@ +package tree + +import ( + "context" + "sort" + + "github.com/nspcc-dev/neofs-s3-gw/api/data" + "github.com/nspcc-dev/neofs-s3-gw/api/layer" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" +) + +type TreeServiceMock struct { + settings map[string]*data.BucketSettings + versions map[string]map[string][]*layer.NodeVersion + system map[string]map[string]*layer.BaseNodeVersion +} + +func NewTreeService() *TreeServiceMock { + return &TreeServiceMock{ + settings: make(map[string]*data.BucketSettings), + versions: make(map[string]map[string][]*layer.NodeVersion), + system: make(map[string]map[string]*layer.BaseNodeVersion), + } +} + +func (t *TreeServiceMock) PutSettingsNode(_ context.Context, id *cid.ID, settings *data.BucketSettings) error { + t.settings[id.EncodeToString()] = settings + return nil +} + +func (t *TreeServiceMock) GetSettingsNode(_ context.Context, id *cid.ID) (*data.BucketSettings, error) { + settings, ok := t.settings[id.EncodeToString()] + if !ok { + return nil, layer.ErrNodeNotFound + } + + return settings, nil +} + +func (t *TreeServiceMock) GetNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) { + panic("implement me") +} + +func (t *TreeServiceMock) PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) { + panic("implement me") +} + +func (t *TreeServiceMock) GetBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) { + panic("implement me") +} + +func (t *TreeServiceMock) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) { + panic("implement me") +} + +func (t *TreeServiceMock) DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) { + panic("implement me") +} + +func (t *TreeServiceMock) GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]*layer.NodeVersion, error) { + panic("implement me") +} + +func (t *TreeServiceMock) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*layer.NodeVersion, error) { + cnrVersionsMap, ok := t.versions[cnrID.EncodeToString()] + if !ok { + return nil, layer.ErrNodeNotFound + } + + versions, ok := cnrVersionsMap[objectName] + if !ok { + return nil, layer.ErrNodeNotFound + } + + sort.Slice(versions, func(i, j int) bool { + return versions[i].ID < versions[j].ID + }) + + if len(versions) != 0 { + return versions[len(versions)-1], nil + } + + return nil, layer.ErrNodeNotFound +} + +func (t *TreeServiceMock) GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error) { + panic("implement me") +} + +func (t *TreeServiceMock) GetUnversioned(ctx context.Context, cnrID *cid.ID, objectName string) (*layer.NodeVersion, error) { + panic("implement me") +} + +func (t *TreeServiceMock) AddVersion(_ context.Context, cnrID *cid.ID, objectName string, newVersion *layer.NodeVersion) error { + cnrVersionsMap, ok := t.versions[cnrID.EncodeToString()] + if !ok { + t.versions[cnrID.EncodeToString()] = map[string][]*layer.NodeVersion{ + objectName: {newVersion}, + } + return nil + } + + versions, ok := cnrVersionsMap[objectName] + if !ok { + cnrVersionsMap[objectName] = []*layer.NodeVersion{newVersion} + return nil + } + + sort.Slice(versions, func(i, j int) bool { + return versions[i].ID < versions[j].ID + }) + + if len(versions) != 0 { + newVersion.ID = versions[len(versions)-1].ID + 1 + } + + cnrVersionsMap[objectName] = append(versions, newVersion) + + return nil +} + +func (t *TreeServiceMock) RemoveVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error { + panic("implement me") +} + +func (t *TreeServiceMock) AddSystemVersion(_ context.Context, cnrID *cid.ID, objectName string, newVersion *layer.BaseNodeVersion) error { + cnrSystemMap, ok := t.system[cnrID.EncodeToString()] + if !ok { + t.system[cnrID.EncodeToString()] = map[string]*layer.BaseNodeVersion{ + objectName: newVersion, + } + return nil + } + + cnrSystemMap[objectName] = newVersion + + return nil +} + +func (t *TreeServiceMock) GetSystemVersion(_ context.Context, cnrID *cid.ID, objectName string) (*layer.BaseNodeVersion, error) { + cnrSystemMap, ok := t.system[cnrID.EncodeToString()] + if !ok { + return nil, layer.ErrNodeNotFound + } + + sysVersion, ok := cnrSystemMap[objectName] + if !ok { + return nil, layer.ErrNodeNotFound + } + + return sysVersion, nil +} + +func (t *TreeServiceMock) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error { + panic("implement me") +}