From 29ac91dfd56f3464ed07458f44b72c44fb825f13 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 9 Oct 2023 09:57:33 +0300 Subject: [PATCH] [#165] Support streaming listing Signed-off-by: Denis Kirillov --- api/handler/handlers_test.go | 13 +- api/handler/object_list_test.go | 262 ++++++++++++++++ api/layer/cache.go | 11 + api/layer/layer.go | 7 +- api/layer/object.go | 345 ++++++++++++++++++++-- api/layer/tree_mock.go | 43 +++ api/layer/tree_service.go | 5 + api/layer/versioning.go | 2 +- api/layer/versioning_test.go | 1 + internal/frostfs/services/pool_wrapper.go | 33 +++ pkg/service/tree/tree.go | 180 ++++++++++- pkg/service/tree/tree_client_in_memory.go | 75 ++++- 12 files changed, 938 insertions(+), 39 deletions(-) diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index 76d059c8..1a859162 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -45,6 +45,7 @@ type handlerContext struct { config *configMock layerFeatures *layer.FeatureSettingsMock + treeMock *tree.ServiceClientMemory } func (hc *handlerContext) Handler() *handler { @@ -147,7 +148,10 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext { var owner user.ID user.IDFromKey(&owner, key.PrivateKey.PublicKey) - treeMock := NewTreeServiceMock(t) + memCli, err := tree.NewTreeServiceClientMemory() + require.NoError(t, err) + + treeMock := tree.NewTree(memCli, zap.NewExample()) cacheCfg := layer.DefaultCachesConfigs(l) if minCache { @@ -188,6 +192,7 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext { config: cfg, layerFeatures: features, + treeMock: memCli, } } @@ -262,12 +267,6 @@ func (a *apeMock) DeletePolicy(namespace string, cnrID cid.ID) error { return nil } -func NewTreeServiceMock(t *testing.T) *tree.Tree { - memCli, err := tree.NewTreeServiceClientMemory() - require.NoError(t, err) - return tree.NewTree(memCli, zap.NewExample()) -} - func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo { _, err := hc.MockedPool().CreateContainer(hc.Context(), layer.PrmContainerCreate{ Creator: hc.owner, diff --git a/api/handler/object_list_test.go b/api/handler/object_list_test.go index 6e962d62..0dbcf924 100644 --- a/api/handler/object_list_test.go +++ b/api/handler/object_list_test.go @@ -1,13 +1,17 @@ package handler import ( + "fmt" "net/http" "net/url" "sort" "strconv" + "strings" "testing" + "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "github.com/stretchr/testify/require" ) @@ -162,6 +166,120 @@ func TestS3BucketListDelimiterBasic(t *testing.T) { require.Equal(t, "quux/", listV1Response.CommonPrefixes[1].Prefix) } +func TestS3BucketListV2PrefixAlt(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-for-listing" + createTestBucket(hc, bktName) + + objects := []string{"bar", "baz", "foo"} + for _, objName := range objects { + putObject(hc, bktName, objName) + } + + response := listObjectsV2(hc, bktName, "ba", "", "", "", -1) + + require.Equal(t, "ba", response.Prefix) + require.Len(t, response.Contents, 2) + require.Equal(t, "bar", response.Contents[0].Key) + require.Equal(t, "baz", response.Contents[1].Key) + require.Empty(t, response.CommonPrefixes) +} + +func TestS3BucketListV2PrefixNotExist(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-for-listing" + createTestBucket(hc, bktName) + + objects := []string{"foo/bar", "foo/baz", "quux"} + for _, objName := range objects { + putObject(hc, bktName, objName) + } + + response := listObjectsV2(hc, bktName, "d", "", "", "", -1) + + require.Equal(t, "d", response.Prefix) + require.Empty(t, response.Contents) + require.Empty(t, response.CommonPrefixes) +} + +func TestS3BucketListV2PrefixUnreadable(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-for-listing" + createTestBucket(hc, bktName) + + objects := []string{"foo/bar", "foo/baz", "quux"} + for _, objName := range objects { + putObject(hc, bktName, objName) + } + + response := listObjectsV2(hc, bktName, "\x0a", "", "", "", -1) + + require.Equal(t, "\x0a", response.Prefix) + require.Empty(t, response.Contents) + require.Empty(t, response.CommonPrefixes) +} + +func TestS3BucketListV2PrefixDelimiterAlt(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-for-listing" + createTestBucket(hc, bktName) + + objects := []string{"bar", "bazar", "cab", "foo"} + for _, objName := range objects { + putObject(hc, bktName, objName) + } + + response := listObjectsV2(hc, bktName, "ba", "a", "", "", -1) + + require.Equal(t, "ba", response.Prefix) + require.Equal(t, "a", response.Delimiter) + require.Len(t, response.Contents, 1) + require.Equal(t, "bar", response.Contents[0].Key) + require.Len(t, response.CommonPrefixes, 1) + require.Equal(t, "baza", response.CommonPrefixes[0].Prefix) +} + +func TestS3BucketListV2PrefixDelimiterDelimiterNotExist(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-for-listing" + createTestBucket(hc, bktName) + + objects := []string{"b/a/c", "b/a/g", "b/a/r", "g"} + for _, objName := range objects { + putObject(hc, bktName, objName) + } + + response := listObjectsV2(hc, bktName, "b", "z", "", "", -1) + + require.Len(t, response.Contents, 3) + require.Equal(t, "b/a/c", response.Contents[0].Key) + require.Equal(t, "b/a/g", response.Contents[1].Key) + require.Equal(t, "b/a/r", response.Contents[2].Key) + require.Empty(t, response.CommonPrefixes) +} + +func TestS3BucketListV2PrefixDelimiterPrefixDelimiterNotExist(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-for-listing" + createTestBucket(hc, bktName) + + objects := []string{"b/a/c", "b/a/g", "b/a/r", "g"} + for _, objName := range objects { + putObject(hc, bktName, objName) + } + + response := listObjectsV2(hc, bktName, "y", "z", "", "", -1) + + require.Empty(t, response.Contents) + require.Empty(t, response.CommonPrefixes) +} + func TestS3BucketListV2DelimiterPercentage(t *testing.T) { tc := prepareHandlerContext(t) @@ -250,7 +368,148 @@ func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, nam } } +func TestHugeListV2(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-for-listingv2" + bktInfo := createTestBucket(hc, bktName) + + objects := prepareObjects(hc, bktInfo, "", 50005) + + fmt.Println("listing start") + start := time.Now() + + resp := &ListObjectsV2Response{IsTruncated: true} + for resp.IsTruncated { + resp = listObjectsV2(hc, bktName, "", "", "", resp.NextContinuationToken, -1) + for i, content := range resp.Contents { + if content.Key != objects[i] { + t.Errorf("expected '%s', got '%s'", objects[i], content.Key) + } + } + objects = objects[len(resp.Contents):] + } + require.Empty(t, objects) + + fmt.Println(time.Since(start)) +} + +func TestListV2StreamNested1(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-for-listingv2-nested" + bktInfo := createTestBucket(hc, bktName) + + objects1 := prepareObjects(hc, bktInfo, "prefix", 10) + objects2 := prepareObjects(hc, bktInfo, "prefix2", 10) + + objects := append(objects1, objects2...) + + fmt.Println("listing start") + start := time.Now() + + resp := &ListObjectsV2Response{IsTruncated: true} + for resp.IsTruncated { + resp = listObjectsV2(hc, bktName, "", "", "", resp.NextContinuationToken, -1) + for i, content := range resp.Contents { + if content.Key != objects[i] { + t.Errorf("expected '%s', got '%s'", objects[i], content.Key) + } + } + objects = objects[len(resp.Contents):] + } + require.Empty(t, objects) + + fmt.Println(time.Since(start)) +} + +func TestHugeListV1(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-for-listingv1" + bktInfo := createTestBucket(hc, bktName) + + objects := prepareObjects(hc, bktInfo, "", 50005) + + fmt.Println("listing start") + start := time.Now() + + resp := &ListObjectsV1Response{IsTruncated: true} + for resp.IsTruncated { + resp = listObjectsV1(hc, bktName, "", "", resp.NextMarker, -1) + for i, content := range resp.Contents { + if content.Key != objects[i] { + t.Errorf("expected '%s', got '%s'", objects[i], content.Key) + } + } + objects = objects[len(resp.Contents):] + } + + require.Empty(t, objects) + + fmt.Println(time.Since(start)) +} + +func prepareObjects(hc *handlerContext, bktInfo *data.BucketInfo, prefix string, size int) []string { + treeID := "version" + parentID := uint64(0) + if prefix != "" { + for _, filename := range strings.Split(prefix, "/") { + nodeID, err := hc.treeMock.AddNode(hc.Context(), bktInfo, treeID, parentID, map[string]string{ + "FileName": filename, + }) + require.NoError(hc.t, err) + parentID = nodeID + } + prefix += "/" + } + + objects := make([]string, size) + + for i := range objects { + filename := "object" + strconv.Itoa(i) + filepath := prefix + filename + + prm := layer.PrmObjectCreate{ + Container: bktInfo.CID, + Filepath: filepath, + Payload: nil, + } + + id, err := hc.tp.CreateObject(hc.Context(), prm) + require.NoError(hc.t, err) + + newVersion := &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + OID: id, + ETag: "12345678", + FilePath: filepath, + }, + IsUnversioned: true, + IsCombined: false, + } + + _, err = hc.treeMock.AddNodeBase(hc.Context(), bktInfo, treeID, parentID, map[string]string{ + "OID": newVersion.OID.EncodeToString(), + "FileName": filename, + "IsUnversioned": "true", + }, false) + require.NoError(hc.t, err) + objects[i] = filepath + } + + hc.treeMock.Sort() + + sort.Strings(objects) + + return objects +} + func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response { + return listObjectsV2Ext(hc, bktName, prefix, delimiter, startAfter, continuationToken, "", maxKeys) +} + +func listObjectsV2Ext(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken, encodingType string, maxKeys int) *ListObjectsV2Response { query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys) if len(startAfter) != 0 { query.Add("start-after", startAfter) @@ -258,6 +517,9 @@ func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, c if len(continuationToken) != 0 { query.Add("continuation-token", continuationToken) } + if len(encodingType) != 0 { + query.Add("encoding-type", encodingType) + } w, r := prepareTestFullRequest(hc, bktName, "", query, nil) hc.Handler().ListObjectsV2Handler(w, r) diff --git a/api/layer/cache.go b/api/layer/cache.go index 87381b33..b4d5fa96 100644 --- a/api/layer/cache.go +++ b/api/layer/cache.go @@ -1,6 +1,8 @@ package layer import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" @@ -10,7 +12,15 @@ import ( "go.uber.org/zap" ) +type TestCacheValue struct { + Next *data.ObjectInfo + Stream LatestVersionsByPrefixStream + NamesMap map[string]struct{} + Context context.Context +} + type Cache struct { + testCache map[string]TestCacheValue logger *zap.Logger listsCache *cache.ObjectsListCache objCache *cache.ObjectsCache @@ -46,6 +56,7 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig { func NewCache(cfg *CachesConfig) *Cache { return &Cache{ + testCache: map[string]TestCacheValue{}, logger: cfg.Logger, listsCache: cache.NewObjectsListCache(cfg.ObjectsList), objCache: cache.New(cfg.Objects), diff --git a/api/layer/layer.go b/api/layer/layer.go index d79dca94..d29d9d07 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -810,7 +810,12 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) } func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { - nodeVersions, err := n.getAllObjectsVersions(ctx, p.BktInfo, "", "") + nodeVersions, err := n.getAllObjectsVersions(ctx, &ListObjectVersionsParams{ + BktInfo: p.BktInfo, + MaxKeys: 1, + }) + //todo fix ^ + if err != nil { return err } diff --git a/api/layer/object.go b/api/layer/object.go index 08dc79f7..1fb34513 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -17,12 +17,14 @@ import ( "strconv" "strings" "sync" + "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" @@ -572,7 +574,7 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis ContinuationToken: p.ContinuationToken, } - objects, next, err := n.getLatestObjectsVersions(ctx, prm) + objects, next, err := n.getLatestObjectsVersionsV2(ctx, prm) if err != nil { return nil, err } @@ -645,6 +647,75 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) return } +func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) { + if p.MaxKeys == 0 { + return nil, nil, nil + } + + testKey := p.Prefix + p.Delimiter + p.ContinuationToken + nodeVersionsStreamValue, ok := n.cache.testCache[testKey] + + if ok { + delete(n.cache.testCache, testKey) + } else { + ctx2, cancel2 := context.WithCancel(context.Background()) + go func() { + <-time.After(10 * time.Second) + cancel2() + }() + + if bd, err := middleware.GetBoxData(ctx); err == nil { + ctx2 = middleware.SetBoxData(ctx2, bd) + } + + nodeVersionsStreamValue.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(ctx2, p.Bucket, p.Prefix) + if err != nil { + return nil, nil, err + } + nodeVersionsStreamValue.NamesMap = map[string]struct{}{} + } + + poolCtx, cancel := context.WithCancel(ctx) + defer cancel() + + generator, errorCh := nodesGeneratorStream(poolCtx, p, nodeVersionsStreamValue) + objOutCh, err := n.initWorkerPoolStream(poolCtx, 2, p, generator) + if err != nil { + return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) + } + + objects = make([]*data.ObjectInfo, 0, p.MaxKeys+1) + if nodeVersionsStreamValue.Next != nil { + objects = append(objects, nodeVersionsStreamValue.Next) + } + + for obj := range objOutCh { + objects = append(objects, obj) + } + + if err = <-errorCh; err != nil { + fmt.Println(len(objects)) + fmt.Println(objects[len(objects)-1].Name) + return nil, nil, fmt.Errorf("failed to get object from tree: %w", err) + } + + sort.Slice(objects, func(i, j int) bool { + return objects[i].Name < objects[j].Name + }) + + if len(objects) > p.MaxKeys { + next = objects[p.MaxKeys] + objects = objects[:p.MaxKeys] + } + + if next != nil { + nodeVersionsStreamValue.Next = next + n.cache.testCache[p.Prefix+p.Delimiter+next.VersionID()] = nodeVersionsStreamValue + } + + return +} + func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { nodeCh := make(chan *data.NodeVersion) existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories @@ -673,6 +744,86 @@ func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data return nodeCh } +func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { + nodeCh := make(chan *data.NodeVersion) + existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories + + go func() { + var generated int + LOOP: + for _, node := range nodeVersions { + if shouldSkipVersions(node, p, existed) { + continue + } + + select { + case <-ctx.Done(): + break LOOP + case nodeCh <- node: + generated++ + if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken + break LOOP + } + } + } + close(nodeCh) + }() + + return nodeCh +} + +func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream TestCacheValue) (<-chan *data.NodeVersion, <-chan error) { + nodeCh := make(chan *data.NodeVersion) + errCh := make(chan error, 1) + //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories + existed := stream.NamesMap + + if stream.Next != nil { + existed[continuationToken] = struct{}{} + } + + limit := p.MaxKeys + if stream.Next == nil { + limit++ + } + + go func() { + var generated int + var err error + + LOOP: + for err == nil { + node, err := stream.Stream.Next(ctx) + if err != nil { + if !errors.Is(err, io.EOF) { + fmt.Println(ctx.Err()) + errCh <- fmt.Errorf("stream next: %w", err) + } + break LOOP + } + + if shouldSkip(node, p, existed) { + continue + } + + select { + case <-ctx.Done(): + break LOOP + case nodeCh <- node: + generated++ + + if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken + break LOOP + } + } + } + close(nodeCh) + close(errCh) + }() + + return nodeCh, errCh +} + func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) @@ -725,6 +876,126 @@ func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, return objCh, nil } +func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedObjectInfo, error) { + reqLog := n.reqLogger(ctx) + pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) + if err != nil { + return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) + } + objCh := make(chan *data.ExtendedObjectInfo) + + go func() { + var wg sync.WaitGroup + + LOOP: + for node := range input { + select { + case <-ctx.Done(): + break LOOP + default: + } + + // We have to make a copy of pointer to data.NodeVersion + // to get correct value in submitted task function. + func(node *data.NodeVersion) { + wg.Add(1) + err = pool.Submit(func() { + defer wg.Done() + + oi := &data.ObjectInfo{} + if node.IsDeleteMarker() { // delete marker does not match any object in FrostFS + oi.ID = node.OID + oi.Name = node.FilePath + oi.Owner = node.DeleteMarker.Owner + oi.Created = node.DeleteMarker.Created + oi.IsDeleteMarker = true + } else { + oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) + if oi == nil { + // try to get object again + if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { + // do not process object which are definitely missing in object service + return + } + } + } + + eoi := &data.ExtendedObjectInfo{ + ObjectInfo: oi, + NodeVersion: node, + } + + select { + case <-ctx.Done(): + case objCh <- eoi: + } + }) + if err != nil { + wg.Done() + reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) + } + }(node) + } + wg.Wait() + close(objCh) + pool.Release() + }() + + return objCh, nil +} + +func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { + reqLog := n.reqLogger(ctx) + pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) + if err != nil { + return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) + } + objCh := make(chan *data.ObjectInfo) + + go func() { + var wg sync.WaitGroup + + LOOP: + for node := range input { + select { + case <-ctx.Done(): + break LOOP + default: + } + + // We have to make a copy of pointer to data.NodeVersion + // to get correct value in submitted task function. + func(node *data.NodeVersion) { + wg.Add(1) + err = pool.Submit(func() { + defer wg.Done() + oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) + if oi == nil { + // try to get object again + if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { + // do not process object which are definitely missing in object service + return + } + } + select { + case <-ctx.Done(): + case objCh <- oi: + } + }) + if err != nil { + wg.Done() + reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) + } + }(node) + } + wg.Wait() + close(objCh) + pool.Release() + }() + + return objCh, nil +} + func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) { var err error @@ -744,41 +1015,43 @@ func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, pr return nodeVersions, nil } -func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) { - nodeVersions, err := n.bucketNodeVersions(ctx, bkt, prefix) +func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (map[string][]*data.ExtendedObjectInfo, error) { + nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, p.Prefix) if err != nil { return nil, err } versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions)) - for _, nodeVersion := range nodeVersions { - oi := &data.ObjectInfo{} + sort.Slice(nodeVersions, func(i, j int) bool { + return nodeVersions[i].FilePath < nodeVersions[j].FilePath + }) - if nodeVersion.IsDeleteMarker() { // delete marker does not match any object in FrostFS - oi.ID = nodeVersion.OID - oi.Name = nodeVersion.FilePath - oi.Owner = nodeVersion.DeleteMarker.Owner - oi.Created = nodeVersion.DeleteMarker.Created - oi.IsDeleteMarker = true - } else { - if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, bkt, nodeVersion, prefix, delimiter); oi == nil { - continue - } - } + poolCtx, cancel := context.WithCancel(ctx) + defer cancel() - eoi := &data.ExtendedObjectInfo{ - ObjectInfo: oi, - NodeVersion: nodeVersion, - } + pp := allObjectParams{ + Bucket: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + Marker: p.KeyMarker, + ContinuationToken: p.VersionIDMarker, + MaxKeys: p.MaxKeys, + } - objVersions, ok := versions[oi.Name] + objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, nodesGeneratorVersions(poolCtx, pp, nodeVersions)) + if err != nil { + return nil, err + } + + for eoi := range objOutCh { + objVersions, ok := versions[eoi.ObjectInfo.Name] if !ok { objVersions = []*data.ExtendedObjectInfo{eoi} - } else if !oi.IsDir { + } else if !eoi.ObjectInfo.IsDir { objVersions = append(objVersions, eoi) } - versions[oi.Name] = objVersions + versions[eoi.ObjectInfo.Name] = objVersions } return versions, nil @@ -819,6 +1092,32 @@ func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]st return false } +func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { + filePath := node.FilePath + if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { + filePath = dirName + if _, ok := existed[filePath]; ok { + return true + } + } + + if filePath < p.Marker { + return true + } + + if p.ContinuationToken != "" { + if _, ok := existed[continuationToken]; !ok { + if p.ContinuationToken != node.OID.EncodeToString() { + return true + } + existed[continuationToken] = struct{}{} + } + } + + existed[filePath] = struct{}{} + return false +} + func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) { for _, ov := range allObjects { if ov.IsDir { diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 2ecb76b9..2c556a56 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -3,6 +3,7 @@ package layer import ( "context" "fmt" + "io" "sort" "strings" @@ -10,6 +11,21 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) +type LatestVersionsByPrefixStreamMock struct { + result []*data.NodeVersion + offset int +} + +func (s *LatestVersionsByPrefixStreamMock) Next(context.Context) (*data.NodeVersion, error) { + if s.offset > len(s.result)-1 { + return nil, io.EOF + } + + res := s.result[s.offset] + s.offset++ + return res, nil +} + type TreeServiceMock struct { settings map[string]*data.BucketSettings versions map[string]map[string][]*data.NodeVersion @@ -196,6 +212,33 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo * return result, nil } +func (t *TreeServiceMock) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (LatestVersionsByPrefixStream, error) { + cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] + if !ok { + return nil, ErrNodeNotFound + } + + var result []*data.NodeVersion + + for key, versions := range cnrVersionsMap { + if !strings.HasPrefix(key, prefix) { + continue + } + + sort.Slice(versions, func(i, j int) bool { + return versions[i].ID < versions[j].ID + }) + + if len(versions) != 0 { + result = append(result, versions[len(versions)-1]) + } + } + + return &LatestVersionsByPrefixStreamMock{ + result: result, + }, nil +} + func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) { cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] if !ok { diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index cfdd769c..f17513e1 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -8,6 +8,10 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) +type LatestVersionsByPrefixStream interface { + Next(ctx context.Context) (*data.NodeVersion, error) +} + // TreeService provide interface to interact with tree service using s3 data models. type TreeService interface { // PutSettingsNode update or create new settings node in tree service. @@ -55,6 +59,7 @@ type TreeService interface { GetVersions(ctx context.Context, bktInfo *data.BucketInfo, objectName string) ([]*data.NodeVersion, error) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) + GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (LatestVersionsByPrefixStream, error) GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) diff --git a/api/layer/versioning.go b/api/layer/versioning.go index 7e0c4a67..288072be 100644 --- a/api/layer/versioning.go +++ b/api/layer/versioning.go @@ -9,7 +9,7 @@ import ( ) func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { - versions, err := n.getAllObjectsVersions(ctx, p.BktInfo, p.Prefix, p.Delimiter) + versions, err := n.getAllObjectsVersions(ctx, p) if err != nil { return nil, err } diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index 471de3a3..a7f9dcd5 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -288,6 +288,7 @@ func TestVersioningDeleteSpecificObjectVersion(t *testing.T) { tc.getObject(tc.obj, "", true) versions := tc.listVersions() + require.Len(t, versions.DeleteMarker, 1) for _, ver := range versions.DeleteMarker { if ver.IsLatest { tc.deleteObject(tc.obj, ver.ObjectInfo.VersionID(), settings) diff --git a/internal/frostfs/services/pool_wrapper.go b/internal/frostfs/services/pool_wrapper.go index e8388098..67f2b7ac 100644 --- a/internal/frostfs/services/pool_wrapper.go +++ b/internal/frostfs/services/pool_wrapper.go @@ -124,6 +124,39 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, return subtree, nil } +type SubTreeStreamImpl struct { + r *treepool.SubTreeReader +} + +func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) { + node, err := s.r.Next() + if err != nil { + if err != io.EOF { + err = handleError(err) + } + return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", err) + } + + return GetSubTreeResponseBodyWrapper{node}, nil +} + +func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (tree.SubTreeStream, error) { + poolPrm := treepool.GetSubTreeParams{ + CID: bktInfo.CID, + TreeID: treeID, + RootID: rootID, + Depth: depth, + BearerToken: getBearer(ctx, bktInfo), + } + + subTreeReader, err := w.p.GetSubTree(ctx, poolPrm) + if err != nil { + return nil, handleError(err) + } + + return &SubTreeStreamImpl{r: subTreeReader}, nil +} + func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) { nodeID, err := w.p.AddNode(ctx, treepool.AddNodeParams{ CID: bktInfo.CID, diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 41c4d8cf..7c3ce2f8 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "sort" "strconv" "strings" @@ -29,12 +30,17 @@ type ( ServiceClient interface { GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error) + GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (SubTreeStream, error) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error } + SubTreeStream interface { + Next() (NodeResponse, error) + } + treeNode struct { ID uint64 ParentID uint64 @@ -639,6 +645,142 @@ func (c *Tree) GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.Buck return c.getVersionsByPrefix(ctx, bktInfo, prefix, true) } +type DummySubTreeStream struct { + data NodeResponse + read bool +} + +func (s *DummySubTreeStream) Next() (NodeResponse, error) { + if s.read { + return nil, io.EOF + } + + s.read = true + return s.data, nil +} + +type LatestVersionsByPrefixStreamImpl struct { + ctx context.Context + rootID uint64 + intermediateRootID uint64 + service ServiceClient + bktInfo *data.BucketInfo + mainStream SubTreeStream + innerStream SubTreeStream + headPrefix string + tailPrefix string + namesMap map[uint64]string + ended bool +} + +func (s *LatestVersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.NodeVersion, error) { + const latestOnly = true + + if s.ended { + return nil, io.EOF + } + + if s.innerStream == nil { + node, err := s.mainStream.Next() + if err != nil { + return nil, fmt.Errorf("main stream next: %w", err) + } + + if node.GetNodeID() == s.rootID || !strings.HasPrefix(getFilename(node), s.tailPrefix) { + return s.Next(ctx) + } + + if node.GetParentID() == s.rootID { + s.intermediateRootID = node.GetNodeID() + } + + if isIntermediate(node) { + s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth) + if err != nil { + return nil, fmt.Errorf("get sub tree node from main stream: %w", err) + } + } else { + s.innerStream = &DummySubTreeStream{data: node} + } + } + + node, err := s.innerStream.Next() + if err != nil { + if errors.Is(err, io.EOF) { + s.innerStream = nil + return s.Next(ctx) + } + return nil, fmt.Errorf("inner stream: %w", err) + } + + treeNode, fileName, err := parseTreeNode(node) + if err != nil { + return s.Next(ctx) + } + + var parentPrefix string + if s.headPrefix != "" { // The root of subTree can also have a parent + parentPrefix = strings.TrimSuffix(s.headPrefix, separator) + separator // To avoid 'foo//bar' + } + + var filepath string + if treeNode.ID != s.intermediateRootID { + if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil { + return nil, fmt.Errorf("invalid node order: %w", err) + } + } else { + filepath = parentPrefix + fileName + s.namesMap[treeNode.ID] = filepath + } + + if treeNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate but we still want to update namesMap + return s.Next(ctx) + } + + return newNodeVersionFromTreeNode(filepath, treeNode), nil +} + +func (c *Tree) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (layer.LatestVersionsByPrefixStream, error) { + mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix) + if err != nil { + if errors.Is(err, io.EOF) { + return &LatestVersionsByPrefixStreamImpl{ended: true}, nil + } + return nil, err + } + + return &LatestVersionsByPrefixStreamImpl{ + ctx: ctx, + namesMap: map[uint64]string{}, + rootID: rootID, + service: c.service, + bktInfo: bktInfo, + mainStream: mainStream, + headPrefix: strings.TrimSuffix(prefix, tailPrefix), + tailPrefix: tailPrefix, + }, nil +} + +func (c *Tree) getSubTreeByPrefixMainStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, string, uint64, error) { + rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix) + if err != nil { + if errors.Is(err, layer.ErrNodeNotFound) { + return nil, "", 0, io.EOF + } + return nil, "", 0, err + } + + subTree, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2) + if err != nil { + if errors.Is(err, layer.ErrNodeNotFound) { + return nil, "", 0, io.EOF + } + return nil, "", 0, err + } + + return subTree, tailPrefix, rootID, nil +} + func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (uint64, string, error) { var rootID uint64 path := strings.Split(prefix, separator) @@ -757,10 +899,18 @@ func isIntermediate(node NodeResponse) bool { return node.GetMeta()[0].GetKey() == FileNameKey } -func (c *Tree) getSubTreeVersions(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, parentFilePath string, latestOnly bool) ([]*data.NodeVersion, error) { - subTree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, nodeID, maxGetSubTreeDepth) - if err != nil { - return nil, err +func (c *Tree) getSubTreeVersionsOld(ctx context.Context, bktInfo *data.BucketInfo, node NodeResponse, parentFilePath string, latestOnly bool) ([]*data.NodeVersion, error) { + return c.getSubTreeVersions(ctx, bktInfo, node, parentFilePath, latestOnly, false) +} + +func (c *Tree) getSubTreeVersions(ctx context.Context, bktInfo *data.BucketInfo, node NodeResponse, parentFilePath string, latestOnly, skipLeafs bool) ([]*data.NodeVersion, error) { + var err error + subTree := []NodeResponse{node} + if !skipLeafs || isIntermediate(node) { + subTree, err = c.service.GetSubTree(ctx, bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth) + if err != nil { + return nil, err + } } var parentPrefix string @@ -847,7 +997,7 @@ func formLatestNodeKey(parentID uint64, fileName string) string { } func (c *Tree) GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) { - return c.getVersionsByPrefix(ctx, bktInfo, prefix, false) + return c.getVersionsByPrefixOld(ctx, bktInfo, prefix, false) } func (c *Tree) getVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]*data.NodeVersion, error) { @@ -858,7 +1008,25 @@ func (c *Tree) getVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo var result []*data.NodeVersion for _, node := range prefixNodes { - versions, err := c.getSubTreeVersions(ctx, bktInfo, node.GetNodeID(), headPrefix, latestOnly) + versions, err := c.getSubTreeVersions(ctx, bktInfo, node, headPrefix, latestOnly, true) + if err != nil { + return nil, err + } + result = append(result, versions...) + } + + return result, nil +} + +func (c *Tree) getVersionsByPrefixOld(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]*data.NodeVersion, error) { + prefixNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, versionTree, prefix, latestOnly) + if err != nil { + return nil, err + } + + var result []*data.NodeVersion + for _, node := range prefixNodes { + versions, err := c.getSubTreeVersionsOld(ctx, bktInfo, node, headPrefix, latestOnly) if err != nil { return nil, err } diff --git a/pkg/service/tree/tree_client_in_memory.go b/pkg/service/tree/tree_client_in_memory.go index 3c3128b1..c6a5ed06 100644 --- a/pkg/service/tree/tree_client_in_memory.go +++ b/pkg/service/tree/tree_client_in_memory.go @@ -3,6 +3,7 @@ package tree import ( "context" "fmt" + "io" "sort" "time" @@ -146,6 +147,7 @@ func (t *memoryTree) createPathIfNotExist(parent *treeNodeMemory, path []string) } t.idCounter++ parent.children = append(parent.children, node) + //sortNodes(parent.children) } return t.createPathIfNotExist(node, path[1:]) @@ -227,9 +229,45 @@ func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.Bucket return nil, ErrNodeNotFound } + // we depth-1 in case of uint32 and 0 as mark to get all subtree leads to overflow and depth is getting quite big to walk all tree levels return node.listNodes(nil, depth-1), nil } +type SubTreeStreamImpl struct { + res []NodeResponse + offset int +} + +func (s *SubTreeStreamImpl) Next() (NodeResponse, error) { + if s.offset > len(s.res)-1 { + return nil, io.EOF + } + s.offset++ + return s.res[s.offset-1], nil +} + +func (c *ServiceClientMemory) GetSubTreeStream(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (SubTreeStream, error) { + cnr, ok := c.containers[bktInfo.CID.EncodeToString()] + if !ok { + return nil, nil + } + + tr, ok := cnr.trees[treeID] + if !ok { + return nil, ErrNodeNotFound + } + + node := tr.treeData.getNode(rootID) + if node == nil { + return nil, ErrNodeNotFound + } + + return &SubTreeStreamImpl{ + res: node.listNodes(nil, depth-1), + offset: 0, + }, nil +} + func newContainerInfo(bktInfo *data.BucketInfo, treeID string) containerInfo { return containerInfo{ bkt: bktInfo, @@ -257,7 +295,11 @@ func newMemoryTree() memoryTree { } } -func (c *ServiceClientMemory) AddNode(_ context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) { +func (c *ServiceClientMemory) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) { + return c.AddNodeBase(ctx, bktInfo, treeID, parent, meta, true) +} + +func (c *ServiceClientMemory) AddNodeBase(_ context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string, needSort bool) (uint64, error) { cnr, ok := c.containers[bktInfo.CID.EncodeToString()] if !ok { cnr = newContainerInfo(bktInfo, treeID) @@ -289,6 +331,9 @@ func (c *ServiceClientMemory) AddNode(_ context.Context, bktInfo *data.BucketInf } parentNode.children = append(parentNode.children, tn) + if needSort { + //sortNodes(parentNode.children) + } cnr.trees[treeID] = tr return newID, nil @@ -326,6 +371,7 @@ func (c *ServiceClientMemory) AddNodeByPath(_ context.Context, bktInfo *data.Buc } parentNode.children = append(parentNode.children, tn) + //sortNodes(parentNode.children) cnr.trees[treeID] = tr return newID, nil @@ -356,11 +402,38 @@ func (c *ServiceClientMemory) MoveNode(_ context.Context, bktInfo *data.BucketIn node.data.parentID = parentID newParent.children = append(newParent.children, node) + //sortNodes(newParent.children) node.parent.removeChild(nodeID) return nil } +func (c *ServiceClientMemory) Sort() { + for _, info := range c.containers { + for _, tree := range info.trees { + sortNode(tree.treeData) + } + } +} + +func sortNode(node *treeNodeMemory) { + if node == nil { + return + } + + sortNodes(node.children) + + for _, child := range node.children { + sortNode(child) + } +} + +func sortNodes(list []*treeNodeMemory) { + sort.Slice(list, func(i, j int) bool { + return list[i].data.getValue(FileNameKey) < list[j].data.getValue(FileNameKey) + }) +} + func (c *ServiceClientMemory) RemoveNode(_ context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error { cnr, ok := c.containers[bktInfo.CID.EncodeToString()] if !ok {