[#416] Use tree service to list objects
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
7e8b57605a
commit
25477cdaf8
6 changed files with 342 additions and 35 deletions
6
api/cache/objectslist.go
vendored
6
api/cache/objectslist.go
vendored
|
@ -35,6 +35,7 @@ type (
|
|||
ObjectsListKey struct {
|
||||
cid string
|
||||
prefix string
|
||||
latestOnly bool
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -103,11 +104,12 @@ func (l *ObjectsListCache) CleanCacheEntriesContainingObject(objectName string,
|
|||
}
|
||||
}
|
||||
|
||||
// CreateObjectsListCacheKey returns ObjectsListKey with the given CID and prefix.
|
||||
func CreateObjectsListCacheKey(cnr cid.ID, prefix string) ObjectsListKey {
|
||||
// CreateObjectsListCacheKey returns ObjectsListKey with the given CID, prefix and latestOnly flag.
|
||||
func CreateObjectsListCacheKey(cnr *cid.ID, prefix string, latestOnly bool) ObjectsListKey {
|
||||
p := ObjectsListKey{
|
||||
cid: cnr.EncodeToString(),
|
||||
prefix: prefix,
|
||||
latestOnly: latestOnly,
|
||||
}
|
||||
|
||||
return p
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/resolver"
|
||||
treetest "github.com/nspcc-dev/neofs-s3-gw/internal/neofstest/tree"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||
|
@ -58,6 +59,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
|
|||
Caches: layer.DefaultCachesConfigs(zap.NewExample()),
|
||||
AnonKey: layer.AnonymousKey{Key: key},
|
||||
Resolver: testResolver,
|
||||
TreeService: treetest.NewTreeService(),
|
||||
}
|
||||
|
||||
h := &handler{
|
||||
|
|
|
@ -503,19 +503,11 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
|
|||
}
|
||||
|
||||
func (n *layer) listSortedObjects(ctx context.Context, p allObjectParams) ([]*data.ObjectInfo, error) {
|
||||
versions, err := n.getAllObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter)
|
||||
objects, err := n.getLatestObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objects := make([]*data.ObjectInfo, 0, len(versions))
|
||||
for _, v := range versions {
|
||||
lastVersion := v.getLast()
|
||||
if lastVersion != nil {
|
||||
objects = append(objects, lastVersion)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(objects, func(i, j int) bool {
|
||||
return objects[i].Name < objects[j].Name
|
||||
})
|
||||
|
@ -523,10 +515,49 @@ func (n *layer) listSortedObjects(ctx context.Context, p allObjectParams) ([]*da
|
|||
return objects, nil
|
||||
}
|
||||
|
||||
func (n *layer) getLatestObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) ([]*data.ObjectInfo, error) {
|
||||
var err error
|
||||
|
||||
cacheKey := cache.CreateObjectsListCacheKey(&bkt.CID, prefix, true)
|
||||
ids := n.listsCache.Get(cacheKey)
|
||||
|
||||
if ids == nil {
|
||||
ids, err = n.treeService.GetLatestVersionsByPrefix(ctx, &bkt.CID, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := n.listsCache.Put(cacheKey, ids); err != nil {
|
||||
n.log.Error("couldn't cache list of objects", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
objectsMap := make(map[string]*data.ObjectInfo, len(ids)) // to squash the same directories
|
||||
for i := 0; i < len(ids); i++ {
|
||||
obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt, ids[i])
|
||||
if obj == nil {
|
||||
continue
|
||||
}
|
||||
if oi := objectInfoFromMeta(bkt, obj, prefix, delimiter); oi != nil {
|
||||
if isSystem(oi) {
|
||||
continue
|
||||
}
|
||||
|
||||
objectsMap[oi.Name] = oi
|
||||
}
|
||||
}
|
||||
|
||||
objects := make([]*data.ObjectInfo, 0, len(objectsMap))
|
||||
for _, obj := range objectsMap {
|
||||
objects = append(objects, obj)
|
||||
}
|
||||
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string]*objectVersions, error) {
|
||||
var err error
|
||||
|
||||
cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix)
|
||||
cacheKey := cache.CreateObjectsListCacheKey(&bkt.CID, prefix, false)
|
||||
ids := n.listsCache.Get(cacheKey)
|
||||
|
||||
if ids == nil {
|
||||
|
|
|
@ -32,6 +32,7 @@ type TreeService interface {
|
|||
|
||||
GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]*NodeVersion, error)
|
||||
GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*NodeVersion, error)
|
||||
GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error)
|
||||
GetUnversioned(ctx context.Context, cnrID *cid.ID, objectName string) (*NodeVersion, error)
|
||||
AddVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *NodeVersion) error
|
||||
RemoveVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error
|
||||
|
|
|
@ -56,8 +56,12 @@ const (
|
|||
systemTree = "system"
|
||||
|
||||
separator = "/"
|
||||
|
||||
maxGetSubTreeDepth = 10 // current limit on storage node side
|
||||
)
|
||||
|
||||
var emptyOID oid.ID
|
||||
|
||||
// NewTreeClient creates instance of TreeClient using provided address and create grpc connection.
|
||||
func NewTreeClient(addr string, key *keys.PrivateKey) (*TreeClient, error) {
|
||||
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
|
@ -74,7 +78,13 @@ func NewTreeClient(addr string, key *keys.PrivateKey) (*TreeClient, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func newTreeNode(nodeInfo *tree.GetNodeByPathResponse_Info) (*TreeNode, error) {
|
||||
type NodeResponse interface {
|
||||
GetMeta() []*tree.KeyValue
|
||||
GetNodeId() uint64
|
||||
GetTimestamp() uint64
|
||||
}
|
||||
|
||||
func newTreeNode(nodeInfo NodeResponse) (*TreeNode, error) {
|
||||
var objID oid.ID
|
||||
meta := make(map[string]string, len(nodeInfo.GetMeta()))
|
||||
|
||||
|
@ -92,7 +102,7 @@ func newTreeNode(nodeInfo *tree.GetNodeByPathResponse_Info) (*TreeNode, error) {
|
|||
return &TreeNode{
|
||||
ID: nodeInfo.GetNodeId(),
|
||||
ObjID: objID,
|
||||
TimeStamp: nodeInfo.Timestamp,
|
||||
TimeStamp: nodeInfo.GetTimestamp(),
|
||||
Meta: meta,
|
||||
}, nil
|
||||
}
|
||||
|
@ -102,7 +112,7 @@ func (n *TreeNode) Get(key string) (string, bool) {
|
|||
return value, ok
|
||||
}
|
||||
|
||||
func newNodeVersion(node *tree.GetNodeByPathResponse_Info) (*layer.NodeVersion, error) {
|
||||
func newNodeVersion(node NodeResponse) (*layer.NodeVersion, error) {
|
||||
treeNode, err := newTreeNode(node)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid tree node: %w", err)
|
||||
|
@ -113,7 +123,7 @@ func newNodeVersion(node *tree.GetNodeByPathResponse_Info) (*layer.NodeVersion,
|
|||
|
||||
return &layer.NodeVersion{
|
||||
BaseNodeVersion: layer.BaseNodeVersion{
|
||||
ID: node.NodeId,
|
||||
ID: treeNode.ID,
|
||||
OID: treeNode.ObjID,
|
||||
},
|
||||
IsUnversioned: isUnversioned,
|
||||
|
@ -242,6 +252,96 @@ func (c *TreeClient) GetLatestVersion(ctx context.Context, cnrID *cid.ID, object
|
|||
return c.getLatestVersion(ctx, cnrID, versionTree, fileNameKV, path, meta)
|
||||
}
|
||||
|
||||
func (c *TreeClient) GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error) {
|
||||
var rootID uint64
|
||||
path := strings.Split(prefix, separator)
|
||||
tailPrefix := path[len(path)-1]
|
||||
|
||||
if len(path) > 1 {
|
||||
meta := []string{fileNameKV}
|
||||
|
||||
nodes, err := c.getNodes(ctx, cnrID, versionTree, fileNameKV, path[:len(path)-1], meta, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(nodes) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if len(nodes) != 1 {
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
|
||||
rootID = nodes[0].NodeId
|
||||
}
|
||||
|
||||
subTree, err := c.getSubTree(ctx, cnrID, versionTree, rootID, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []oid.ID
|
||||
for _, node := range subTree {
|
||||
if node.GetNodeId() != 0 && hasPrefix(node, tailPrefix) {
|
||||
latestNodes, err := c.getSubTreeLatestVersions(ctx, cnrID, node.GetNodeId())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, latestNodes...)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func hasPrefix(node *tree.GetSubTreeResponse_Body, prefix string) bool {
|
||||
for _, kv := range node.GetMeta() {
|
||||
if kv.GetKey() == fileNameKV {
|
||||
return strings.HasPrefix(string(kv.GetValue()), prefix)
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *TreeClient) getSubTreeLatestVersions(ctx context.Context, cnrID *cid.ID, nodeID uint64) ([]oid.ID, error) {
|
||||
subTree, err := c.getSubTree(ctx, cnrID, versionTree, nodeID, maxGetSubTreeDepth)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
latestVersions := make(map[string]*TreeNode, len(subTree))
|
||||
for _, node := range subTree {
|
||||
treeNode, err := newTreeNode(node)
|
||||
if err != nil || treeNode.ObjID.Equals(emptyOID) { // invalid OID attribute
|
||||
continue
|
||||
}
|
||||
fileName, ok := treeNode.Get(fileNameKV)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
key := formLatestNodeKey(node.GetParentId(), fileName)
|
||||
latest, ok := latestVersions[key]
|
||||
if !ok || latest.TimeStamp <= treeNode.TimeStamp { // todo also compare oid
|
||||
latestVersions[key] = treeNode
|
||||
}
|
||||
}
|
||||
|
||||
result := make([]oid.ID, 0, len(latestVersions))
|
||||
for _, treeNode := range latestVersions {
|
||||
if _, ok := treeNode.Get(isDeleteMarkerKV); ok {
|
||||
continue
|
||||
}
|
||||
result = append(result, treeNode.ObjID)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func formLatestNodeKey(parentID uint64, fileName string) string {
|
||||
return strconv.FormatUint(parentID, 10) + fileName
|
||||
}
|
||||
|
||||
func (c *TreeClient) GetSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*layer.BaseNodeVersion, error) {
|
||||
meta := []string{oidKV}
|
||||
path := strings.Split(objectName, separator)
|
||||
|
@ -379,11 +479,21 @@ func (c *TreeClient) getVersions(ctx context.Context, cnrID *cid.ID, treeID, fil
|
|||
}
|
||||
|
||||
func (c *TreeClient) getParent(ctx context.Context, cnrID *cid.ID, treeID string, id uint64) (uint64, error) {
|
||||
subTree, err := c.getSubTree(ctx, cnrID, treeID, id, 0)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return subTree[0].GetParentId(), nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) getSubTree(ctx context.Context, cnrID *cid.ID, treeID string, rootID uint64, depth uint32) ([]*tree.GetSubTreeResponse_Body, error) {
|
||||
request := &tree.GetSubTreeRequest{
|
||||
Body: &tree.GetSubTreeRequest_Body{
|
||||
ContainerId: cnrID[:],
|
||||
TreeId: treeID,
|
||||
RootId: id,
|
||||
RootId: rootID,
|
||||
Depth: depth,
|
||||
BearerToken: getBearer(ctx),
|
||||
},
|
||||
}
|
||||
|
@ -394,28 +504,32 @@ func (c *TreeClient) getParent(ctx context.Context, cnrID *cid.ID, treeID string
|
|||
Sign: sign,
|
||||
}
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cli, err := c.service.GetSubTree(ctx, request)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get sub tree client: %w", err)
|
||||
}
|
||||
|
||||
resp, err := cli.Recv()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get sub tree: %w", err)
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get sub tree client: %w", err)
|
||||
}
|
||||
|
||||
var subtree []*tree.GetSubTreeResponse_Body
|
||||
for {
|
||||
if _, err = cli.Recv(); err == io.EOF {
|
||||
resp, err := cli.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return 0, fmt.Errorf("failed to read out sub tree stream: %w", err)
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get sub tree: %w", err)
|
||||
}
|
||||
subtree = append(subtree, resp.Body)
|
||||
}
|
||||
|
||||
return resp.GetBody().GetParentId(), nil
|
||||
return subtree, nil
|
||||
}
|
||||
|
||||
func metaFromSettings(settings *data.BucketSettings) map[string]string {
|
||||
|
@ -474,7 +588,7 @@ func (c *TreeClient) getNodes(ctx context.Context, cnrID *cid.ID, treeID, pathAt
|
|||
|
||||
resp, err := c.service.GetNodeByPath(ctx, request)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get node path deb: %w", err)
|
||||
return nil, fmt.Errorf("failed to get node path: %w", err)
|
||||
}
|
||||
|
||||
return resp.GetBody().GetNodes(), nil
|
||||
|
|
157
internal/neofstest/tree/tree_mock.go
Normal file
157
internal/neofstest/tree/tree_mock.go
Normal file
|
@ -0,0 +1,157 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type TreeServiceMock struct {
|
||||
settings map[string]*data.BucketSettings
|
||||
versions map[string]map[string][]*layer.NodeVersion
|
||||
system map[string]map[string]*layer.BaseNodeVersion
|
||||
}
|
||||
|
||||
func NewTreeService() *TreeServiceMock {
|
||||
return &TreeServiceMock{
|
||||
settings: make(map[string]*data.BucketSettings),
|
||||
versions: make(map[string]map[string][]*layer.NodeVersion),
|
||||
system: make(map[string]map[string]*layer.BaseNodeVersion),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) PutSettingsNode(_ context.Context, id *cid.ID, settings *data.BucketSettings) error {
|
||||
t.settings[id.EncodeToString()] = settings
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetSettingsNode(_ context.Context, id *cid.ID) (*data.BucketSettings, error) {
|
||||
settings, ok := t.settings[id.EncodeToString()]
|
||||
if !ok {
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
|
||||
return settings, nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]*layer.NodeVersion, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*layer.NodeVersion, error) {
|
||||
cnrVersionsMap, ok := t.versions[cnrID.EncodeToString()]
|
||||
if !ok {
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
|
||||
versions, ok := cnrVersionsMap[objectName]
|
||||
if !ok {
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
|
||||
sort.Slice(versions, func(i, j int) bool {
|
||||
return versions[i].ID < versions[j].ID
|
||||
})
|
||||
|
||||
if len(versions) != 0 {
|
||||
return versions[len(versions)-1], nil
|
||||
}
|
||||
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetUnversioned(ctx context.Context, cnrID *cid.ID, objectName string) (*layer.NodeVersion, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) AddVersion(_ context.Context, cnrID *cid.ID, objectName string, newVersion *layer.NodeVersion) error {
|
||||
cnrVersionsMap, ok := t.versions[cnrID.EncodeToString()]
|
||||
if !ok {
|
||||
t.versions[cnrID.EncodeToString()] = map[string][]*layer.NodeVersion{
|
||||
objectName: {newVersion},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
versions, ok := cnrVersionsMap[objectName]
|
||||
if !ok {
|
||||
cnrVersionsMap[objectName] = []*layer.NodeVersion{newVersion}
|
||||
return nil
|
||||
}
|
||||
|
||||
sort.Slice(versions, func(i, j int) bool {
|
||||
return versions[i].ID < versions[j].ID
|
||||
})
|
||||
|
||||
if len(versions) != 0 {
|
||||
newVersion.ID = versions[len(versions)-1].ID + 1
|
||||
}
|
||||
|
||||
cnrVersionsMap[objectName] = append(versions, newVersion)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) RemoveVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) AddSystemVersion(_ context.Context, cnrID *cid.ID, objectName string, newVersion *layer.BaseNodeVersion) error {
|
||||
cnrSystemMap, ok := t.system[cnrID.EncodeToString()]
|
||||
if !ok {
|
||||
t.system[cnrID.EncodeToString()] = map[string]*layer.BaseNodeVersion{
|
||||
objectName: newVersion,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
cnrSystemMap[objectName] = newVersion
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetSystemVersion(_ context.Context, cnrID *cid.ID, objectName string) (*layer.BaseNodeVersion, error) {
|
||||
cnrSystemMap, ok := t.system[cnrID.EncodeToString()]
|
||||
if !ok {
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
|
||||
sysVersion, ok := cnrSystemMap[objectName]
|
||||
if !ok {
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
|
||||
return sysVersion, nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error {
|
||||
panic("implement me")
|
||||
}
|
Loading…
Reference in a new issue