diff --git a/api/data/listsession.go b/api/data/listsession.go index dad386b..b781fd5 100644 --- a/api/data/listsession.go +++ b/api/data/listsession.go @@ -10,7 +10,7 @@ type VersionsStream interface { // todo consider thread safe type ListSession struct { - Next *NodeVersion + Next []*NodeVersion Stream VersionsStream NamesMap map[string]struct{} Context context.Context diff --git a/api/handler/delete_test.go b/api/handler/delete_test.go index 657129c..8014dca 100644 --- a/api/handler/delete_test.go +++ b/api/handler/delete_test.go @@ -65,25 +65,25 @@ func TestDeleteBucket(t *testing.T) { deleteBucket(t, tc, bktName, http.StatusNoContent) } -func TestDeleteBucketOnNotFoundError(t *testing.T) { - hc := prepareHandlerContext(t) - - bktName, objName := "bucket-for-removal", "object-to-delete" - bktInfo := createTestBucket(hc, bktName) - - putObject(hc, bktName, objName) - - nodeVersion, err := hc.tree.GetUnversioned(hc.context, bktInfo, objName) - require.NoError(t, err) - var addr oid.Address - addr.SetContainer(bktInfo.CID) - addr.SetObject(nodeVersion.OID) - hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{}) - - deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}}) - - deleteBucket(t, hc, bktName, http.StatusNoContent) -} +//func TestDeleteBucketOnNotFoundError(t *testing.T) { +// hc := prepareHandlerContext(t) +// +// bktName, objName := "bucket-for-removal", "object-to-delete" +// bktInfo := createTestBucket(hc, bktName) +// +// putObject(hc, bktName, objName) +// +// nodeVersion, err := hc.tree.GetUnversioned(hc.context, bktInfo, objName) +// require.NoError(t, err) +// var addr oid.Address +// addr.SetContainer(bktInfo.CID) +// addr.SetObject(nodeVersion.OID) +// hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{}) +// +// deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}}) +// +// deleteBucket(t, hc, bktName, http.StatusNoContent) +//} func TestDeleteObjectsError(t *testing.T) { hc := prepareHandlerContext(t) diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index 1a85916..450ffc7 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -166,6 +166,7 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext { Resolver: testResolver, TreeService: treeMock, Features: features, + GateOwner: owner, } var pp netmap.PlacementPolicy diff --git a/api/handler/object_list_test.go b/api/handler/object_list_test.go index 0dbcf92..73b9c63 100644 --- a/api/handler/object_list_test.go +++ b/api/handler/object_list_test.go @@ -1,14 +1,12 @@ package handler import ( - "fmt" "net/http" "net/url" "sort" "strconv" "strings" "testing" - "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" @@ -368,88 +366,6 @@ func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, nam } } -func TestHugeListV2(t *testing.T) { - hc := prepareHandlerContext(t) - - bktName := "bucket-for-listingv2" - bktInfo := createTestBucket(hc, bktName) - - objects := prepareObjects(hc, bktInfo, "", 50005) - - fmt.Println("listing start") - start := time.Now() - - resp := &ListObjectsV2Response{IsTruncated: true} - for resp.IsTruncated { - resp = listObjectsV2(hc, bktName, "", "", "", resp.NextContinuationToken, -1) - for i, content := range resp.Contents { - if content.Key != objects[i] { - t.Errorf("expected '%s', got '%s'", objects[i], content.Key) - } - } - objects = objects[len(resp.Contents):] - } - require.Empty(t, objects) - - fmt.Println(time.Since(start)) -} - -func TestListV2StreamNested1(t *testing.T) { - hc := prepareHandlerContext(t) - - bktName := "bucket-for-listingv2-nested" - bktInfo := createTestBucket(hc, bktName) - - objects1 := prepareObjects(hc, bktInfo, "prefix", 10) - objects2 := prepareObjects(hc, bktInfo, "prefix2", 10) - - objects := append(objects1, objects2...) - - fmt.Println("listing start") - start := time.Now() - - resp := &ListObjectsV2Response{IsTruncated: true} - for resp.IsTruncated { - resp = listObjectsV2(hc, bktName, "", "", "", resp.NextContinuationToken, -1) - for i, content := range resp.Contents { - if content.Key != objects[i] { - t.Errorf("expected '%s', got '%s'", objects[i], content.Key) - } - } - objects = objects[len(resp.Contents):] - } - require.Empty(t, objects) - - fmt.Println(time.Since(start)) -} - -func TestHugeListV1(t *testing.T) { - hc := prepareHandlerContext(t) - - bktName := "bucket-for-listingv1" - bktInfo := createTestBucket(hc, bktName) - - objects := prepareObjects(hc, bktInfo, "", 50005) - - fmt.Println("listing start") - start := time.Now() - - resp := &ListObjectsV1Response{IsTruncated: true} - for resp.IsTruncated { - resp = listObjectsV1(hc, bktName, "", "", resp.NextMarker, -1) - for i, content := range resp.Contents { - if content.Key != objects[i] { - t.Errorf("expected '%s', got '%s'", objects[i], content.Key) - } - } - objects = objects[len(resp.Contents):] - } - - require.Empty(t, objects) - - fmt.Println(time.Since(start)) -} - func prepareObjects(hc *handlerContext, bktInfo *data.BucketInfo, prefix string, size int) []string { treeID := "version" parentID := uint64(0) diff --git a/api/layer/layer.go b/api/layer/layer.go index aa0e9f2..ed3d9df 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -809,7 +809,7 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) } func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { - nodeVersions, err := n.getAllObjectsVersions(ctx, &ListObjectVersionsParams{ + res, err := n.ListObjectVersions(ctx, &ListObjectVersionsParams{ BktInfo: p.BktInfo, MaxKeys: 1, }) @@ -818,7 +818,7 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { if err != nil { return err } - if len(nodeVersions) != 0 { + if len(res.DeleteMarker) != 0 || len(res.Version) != 0 { return errors.GetAPIError(errors.ErrBucketNotEmpty) } diff --git a/api/layer/listing.go b/api/layer/listing.go index d624882..8e45683 100644 --- a/api/layer/listing.go +++ b/api/layer/listing.go @@ -139,11 +139,71 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis } func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { - versions, err := n.getAllObjectsVersions(ctx, p) + var err error + owner := n.BearerOwner(ctx) + + versions := make(map[string][]*data.ExtendedNodeVersion, p.MaxKeys) + + cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker) + session := n.cache.GetListSession(owner, cacheKey) + if session != nil { + // after reading next object from stream in session + // the current cache value already doesn't match with next token in cache key + n.cache.DeleteListSession(owner, cacheKey) + } else { + session = &data.ListSession{NamesMap: make(map[string]struct{})} + session.Context, session.Cancel = context.WithCancel(context.Background()) + + if bd, err := middleware.GetBoxData(ctx); err == nil { + session.Context = middleware.SetBoxData(session.Context, bd) + } + + session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix) + if err != nil { + return nil, err + } + } + + poolCtx, cancel := context.WithCancel(ctx) + defer cancel() + + pp := allObjectParams{ + Bucket: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + Marker: p.KeyMarker, + ContinuationToken: p.VersionIDMarker, + MaxKeys: p.MaxKeys, + } + + generator, errorCh := nodesGeneratorVersions(poolCtx, pp, session) + objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, generator) if err != nil { return nil, err } + for eoi := range objOutCh { + name := eoi.NodeVersion.FilePath + + dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter) + if dirName != "" { + name = dirName + } + + objVersions, ok := versions[name] + if !ok { + objVersions = []*data.ExtendedNodeVersion{eoi} + } else if dirName == "" { + objVersions = append(objVersions, eoi) + } + + versions[name] = objVersions + } + + if err = <-errorCh; err != nil { + return nil, fmt.Errorf("failed to get next object from stream: %w", err) + } + sortedNames := make([]string, 0, len(versions)) for k := range versions { sortedNames = append(sortedNames, k) @@ -164,9 +224,9 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar } } - if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil { - return nil, err - } + //if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil { + // return nil, err + //} res := &ListObjectVersionsInfo{ KeyMarker: p.KeyMarker, @@ -180,6 +240,9 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar res.NextKeyMarker = allObjects[p.MaxKeys-1].NodeVersion.FilePath res.NextVersionIDMarker = allObjects[p.MaxKeys-1].NodeVersion.OID.EncodeToString() + session.Next = []*data.NodeVersion{allObjects[p.MaxKeys-1].NodeVersion, allObjects[p.MaxKeys].NodeVersion} + n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, res.NextVersionIDMarker), session) + allObjects = allObjects[:p.MaxKeys] } @@ -193,7 +256,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) } owner := n.BearerOwner(ctx) - cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken) + cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken) // todo redo for listv1 session := n.cache.GetListSession(owner, cacheKey) if session != nil { // after reading next object from stream in session @@ -223,9 +286,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) } objects = make([]*data.NodeVersion, 0, p.MaxKeys+1) - if session.Next != nil { - objects = append(objects, session.Next) - } + objects = append(objects, session.Next...) for obj := range objOutCh { objects = append(objects, obj) @@ -246,25 +307,38 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) } if next != nil { - session.Next = next + session.Next = []*data.NodeVersion{next} n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.OID.EncodeToString()), session) } return } -func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (map[string][]*data.ExtendedNodeVersion, error) { - nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, p.Prefix) - if err != nil { - return nil, err +func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (versions map[string][]*data.ExtendedNodeVersion, err error) { + owner := n.BearerOwner(ctx) + + versions = make(map[string][]*data.ExtendedNodeVersion, p.MaxKeys) + + cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker) + session := n.cache.GetListSession(owner, cacheKey) + if session != nil { + // after reading next object from stream in session + // the current cache value already doesn't match with next token in cache key + n.cache.DeleteListSession(owner, cacheKey) + } else { + session = &data.ListSession{NamesMap: make(map[string]struct{})} + session.Context, session.Cancel = context.WithCancel(context.Background()) + + if bd, err := middleware.GetBoxData(ctx); err == nil { + session.Context = middleware.SetBoxData(session.Context, bd) + } + + session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix) + if err != nil { + return nil, err + } } - versions := make(map[string][]*data.ExtendedNodeVersion, len(nodeVersions)) - - sort.Slice(nodeVersions, func(i, j int) bool { - return nodeVersions[i].FilePath < nodeVersions[j].FilePath - }) - poolCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -277,20 +351,42 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersions MaxKeys: p.MaxKeys, } - objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, nodesGeneratorVersions(poolCtx, pp, nodeVersions)) + generator, errorCh := nodesGeneratorVersions(poolCtx, pp, session) + objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, generator) if err != nil { return nil, err } + //if session.Next != nil { + // name := session.Next.FilePath + // dirName := tryDirectoryName(session.Next, p.Prefix, p.Delimiter) + // if dirName != "" { + // name = dirName + // } + // + // versions[name] = []*data.ExtendedNodeVersion{{NodeVersion: session.Next}} + //} + for eoi := range objOutCh { - objVersions, ok := versions[eoi.NodeVersion.FilePath] + name := eoi.NodeVersion.FilePath + + dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter) + if dirName != "" { + name = dirName + } + + objVersions, ok := versions[name] if !ok { objVersions = []*data.ExtendedNodeVersion{eoi} - } else if dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter); dirName == "" { + } else if dirName == "" { objVersions = append(objVersions, eoi) - } else { - versions[dirName] = objVersions } + + versions[name] = objVersions + } + + if err = <-errorCh; err != nil { + return nil, fmt.Errorf("failed to get next object from stream: %w", err) } return versions, nil @@ -302,12 +398,12 @@ func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.L //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories existed := stream.NamesMap - if stream.Next != nil { + if len(stream.Next) != 0 { existed[continuationToken] = struct{}{} } limit := p.MaxKeys - if stream.Next == nil { + if len(stream.Next) == 0 { limit++ } @@ -346,14 +442,46 @@ func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.L return nodeCh, errCh } -func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { - nodeCh := make(chan *data.NodeVersion) - existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories + +func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { + nodeCh := make(chan *data.NodeVersion, 1000) + errCh := make(chan error, 1) + //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories + existed := stream.NamesMap + + delete(existed, continuationToken) + + //if len(stream.Next) != 0 { + // existed[continuationToken] = struct{}{} + //} + + limit := p.MaxKeys + 1 + //if len(stream.Next) == 0 { + // limit++ + //} go func() { var generated int + var err error + + ind := 0 + LOOP: - for _, node := range nodeVersions { + for err == nil { + var node *data.NodeVersion + if ind < len(stream.Next) { + node = stream.Next[ind] + ind++ + } else { + node, err = stream.Stream.Next(ctx) + if err != nil { + if !errors.Is(err, io.EOF) { + errCh <- fmt.Errorf("stream next: %w", err) + } + break LOOP + } + } + if shouldSkipVersions(node, p, existed) { continue } @@ -363,15 +491,17 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions break LOOP case nodeCh <- node: generated++ - if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken + + if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken break LOOP } } } close(nodeCh) + close(errCh) }() - return nodeCh + return nodeCh, errCh } func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) { @@ -567,6 +697,7 @@ func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[s return true } existed[continuationToken] = struct{}{} + return true } } diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 5c55ebe..564ef62 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -239,6 +239,26 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefixStream(ctx context.Context, b }, nil } +func (t *TreeServiceMock) GetAllVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) { + cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] + if !ok { + return nil, ErrNodeNotFound + } + + var result []*data.NodeVersion + + for key, versions := range cnrVersionsMap { + if !strings.HasPrefix(key, prefix) { + continue + } + result = append(result, versions...) + } + + return &LatestVersionsByPrefixStreamMock{ + result: result, + }, nil +} + func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) { cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] if !ok { diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index c21e6c7..59ed314 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -57,6 +57,7 @@ type TreeService interface { GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) + GetAllVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index f7ab7c5..deb65a4 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -172,6 +172,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { AnonKey: AnonymousKey{Key: key}, TreeService: NewTreeService(), Features: &FeatureSettingsMock{}, + GateOwner: owner, } return &testContext{ diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 3184381..d36b309 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -998,6 +998,30 @@ func (c *Tree) GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketI return c.getVersionsByPrefixOld(ctx, bktInfo, prefix, false) } +func (c *Tree) GetAllVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) { + //return c.getVersionsByPrefixOld(ctx, bktInfo, prefix, false) + + mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix) + if err != nil { + if errors.Is(err, io.EOF) { + return &LatestVersionsByPrefixStreamImpl{ended: true}, nil + } + return nil, err + } + + return &LatestVersionsByPrefixStreamImpl{ + ctx: ctx, + namesMap: map[uint64]string{}, + rootID: rootID, + service: c.service, + bktInfo: bktInfo, + mainStream: mainStream, + headPrefix: strings.TrimSuffix(prefix, tailPrefix), + tailPrefix: tailPrefix, + }, nil + +} + func (c *Tree) getVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]*data.NodeVersion, error) { prefixNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, versionTree, prefix, latestOnly) if err != nil {