From 093de13f54e61039b0e8439d1e1c3c5d9be2de20 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Fri, 19 Jan 2024 12:53:53 +0300 Subject: [PATCH] [#165] Add stream listing tests Signed-off-by: Denis Kirillov --- api/data/listsession.go | 3 +- api/handler/delete_test.go | 13 +- api/handler/object_list_test.go | 73 +++++- api/layer/listing.go | 277 ++++++++-------------- pkg/service/tree/tree.go | 3 + pkg/service/tree/tree_client_in_memory.go | 6 +- 6 files changed, 189 insertions(+), 186 deletions(-) diff --git a/api/data/listsession.go b/api/data/listsession.go index b781fd54..1a15ab9c 100644 --- a/api/data/listsession.go +++ b/api/data/listsession.go @@ -2,17 +2,18 @@ package data import ( "context" + "sync/atomic" ) type VersionsStream interface { Next(ctx context.Context) (*NodeVersion, error) } -// todo consider thread safe type ListSession struct { Next []*NodeVersion Stream VersionsStream NamesMap map[string]struct{} Context context.Context Cancel context.CancelFunc + Acquired atomic.Bool } diff --git a/api/handler/delete_test.go b/api/handler/delete_test.go index 8014dca2..e97f5853 100644 --- a/api/handler/delete_test.go +++ b/api/handler/delete_test.go @@ -435,16 +435,19 @@ func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.B } func createVersionedBucketAndObject(t *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) { - createTestBucket(tc, bktName) - bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName) - require.NoError(t, err) - putBucketVersioning(t, tc, bktName, true) - + bktInfo := createVersionedBucket(tc, bktName) objInfo := createTestObject(tc, bktInfo, objName, encryption.Params{}) return bktInfo, objInfo } +func createVersionedBucket(hc *handlerContext, bktName string) *data.BucketInfo { + bktInfo := createTestBucket(hc, bktName) + putBucketVersioning(hc.t, hc, bktName, true) + + return bktInfo +} + func putBucketVersioning(t *testing.T, tc *handlerContext, bktName string, enabled bool) { cfg := &VersioningConfiguration{Status: "Suspended"} if enabled { diff --git a/api/handler/object_list_test.go b/api/handler/object_list_test.go index 0e9d9b2c..5ef31c9e 100644 --- a/api/handler/object_list_test.go +++ b/api/handler/object_list_test.go @@ -98,7 +98,7 @@ func TestListObjectsLatestVersions(t *testing.T) { }) } -func TestListObjectsPaging(t *testing.T) { +func TestListObjectsVersionsPaging(t *testing.T) { hc := prepareHandlerContext(t) bktName := "bucket-versioning-enabled" @@ -128,6 +128,65 @@ func TestListObjectsPaging(t *testing.T) { require.Empty(t, objects) } +func TestListObjectsVersionsCorrectIsLatestFlag(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-versioning-enabled" + createVersionedBucket(hc, bktName) + + objName1, objName2 := "obj1", "obj2" + + n := 9 + listSize := 3 + headers := make([]http.Header, n) + // objects uploaded: ["obj1"-v1, "obj1"-v2, "obj1"-v3, "obj2"-v1, "obj2"-v2, "obj2"-v3, "obj2"-v4, "obj2"-v5, "obj2"-v6] + for i := 0; i < n; i++ { + objName := objName1 + if i >= listSize { + objName = objName2 + } + headers[i] = putObjectContent(hc, bktName, objName, fmt.Sprintf("content/%d", i)) + } + + versions := listObjectsVersions(hc, bktName, "", "", "", "", listSize) + // expected objects: ["obj1"-v3, "obj1"-v2, "obj1"-v1] + checkListVersionsParts(t, versions, formReverseVersionResponse(objName1, headers[:listSize], true)) + + versions = listObjectsVersions(hc, bktName, "", "", versions.NextKeyMarker, versions.NextVersionIDMarker, listSize) + // expected objects: ["obj2"-v6, "obj2"-v5, "obj2"-v4] + checkListVersionsParts(t, versions, formReverseVersionResponse(objName2, headers[2*listSize:], true)) + + versions = listObjectsVersions(hc, bktName, "", "", versions.NextKeyMarker, versions.NextVersionIDMarker, listSize) + // expected objects: ["obj2"-v3, "obj2"-v2, "obj2"-v1] + checkListVersionsParts(t, versions, formReverseVersionResponse(objName2, headers[listSize:2*listSize], false)) +} + +func formReverseVersionResponse(objName string, headers []http.Header, isLatest bool) []ObjectVersionResponse { + res := make([]ObjectVersionResponse, len(headers)) + + for i, h := range headers { + ind := len(headers) - 1 - i + res[ind] = ObjectVersionResponse{ + ETag: h.Get(api.ETag), + IsLatest: isLatest && ind == 0, + Key: objName, + VersionID: h.Get(api.AmzVersionID), + } + } + + return res +} + +func checkListVersionsParts(t *testing.T, versions *ListObjectsVersionsResponse, expected []ObjectVersionResponse) { + require.Len(t, versions.Version, len(expected)) + for i, res := range versions.Version { + require.Equal(t, expected[i].Key, res.Key) + require.Equal(t, expected[i].ETag, res.ETag) + require.Equal(t, expected[i].VersionID, res.VersionID) + require.Equal(t, expected[i].IsLatest, res.IsLatest) + } +} + func TestS3CompatibilityBucketListV2BothContinuationTokenStartAfter(t *testing.T) { tc := prepareHandlerContext(t) @@ -202,6 +261,18 @@ func TestS3BucketListDelimiterBasic(t *testing.T) { require.Equal(t, "quux/", listV1Response.CommonPrefixes[1].Prefix) } +func TestS3BucketListEmpty(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName := "bucket-for-listing" + createTestBucket(hc, bktName) + + versions := listObjectsVersions(hc, bktName, "", "", "", "", -1) + require.Empty(t, versions.Version) + require.Empty(t, versions.DeleteMarker) + require.Empty(t, versions.CommonPrefixes) +} + func TestS3BucketListV2PrefixAlt(t *testing.T) { hc := prepareHandlerContext(t) diff --git a/api/layer/listing.go b/api/layer/listing.go index ba2b6f33..f827b9af 100644 --- a/api/layer/listing.go +++ b/api/layer/listing.go @@ -139,114 +139,24 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis } func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { - var err error - owner := n.BearerOwner(ctx) - - versions := make(map[string][]*data.ExtendedNodeVersion, p.MaxKeys) - - cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker) - session := n.cache.GetListSession(owner, cacheKey) - if session != nil { - // after reading next object from stream in session - // the current cache value already doesn't match with next token in cache key - n.cache.DeleteListSession(owner, cacheKey) - } else { - session = &data.ListSession{NamesMap: make(map[string]struct{})} - session.Context, session.Cancel = context.WithCancel(context.Background()) - - if bd, err := middleware.GetBoxData(ctx); err == nil { - session.Context = middleware.SetBoxData(session.Context, bd) - } - - session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix) - if err != nil { - return nil, err - } - } - - poolCtx, cancel := context.WithCancel(ctx) - defer cancel() - - pp := allObjectParams{ - Bucket: p.BktInfo, - Delimiter: p.Delimiter, - Prefix: p.Prefix, - Marker: p.KeyMarker, - ContinuationToken: p.VersionIDMarker, - MaxKeys: p.MaxKeys, - } - - generator, errorCh := nodesGeneratorVersions(poolCtx, pp, session) - objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, generator) + objects, isTruncated, err := n.getAllObjectsVersions(ctx, p) if err != nil { return nil, err } - for eoi := range objOutCh { - name := eoi.NodeVersion.FilePath - - dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter) - if dirName != "" { - name = dirName - } - - objVersions, ok := versions[name] - if !ok { - objVersions = []*data.ExtendedNodeVersion{eoi} - } else if dirName == "" { - objVersions = append(objVersions, eoi) - } - - versions[name] = objVersions - } - - if err = <-errorCh; err != nil { - return nil, fmt.Errorf("failed to get next object from stream: %w", err) - } - - sortedNames := make([]string, 0, len(versions)) - for k := range versions { - sortedNames = append(sortedNames, k) - } - sort.Strings(sortedNames) - - allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys) - - for _, name := range sortedNames { - sortedVersions := versions[name] - sort.Slice(sortedVersions, func(i, j int) bool { - return sortedVersions[j].NodeVersion.Timestamp < sortedVersions[i].NodeVersion.Timestamp // sort in reverse order - }) - - for i, version := range sortedVersions { - version.IsLatest = i == 0 - allObjects = append(allObjects, version) - } - } - - //if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil { - // return nil, err - //} - res := &ListObjectVersionsInfo{ KeyMarker: p.KeyMarker, VersionIDMarker: p.VersionIDMarker, + IsTruncated: isTruncated, } - res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects, p.Prefix, p.Delimiter) - - if len(allObjects) > p.MaxKeys { - res.IsTruncated = true - res.NextKeyMarker = allObjects[p.MaxKeys-1].NodeVersion.FilePath - res.NextVersionIDMarker = allObjects[p.MaxKeys-1].NodeVersion.OID.EncodeToString() - - session.Next = []*data.NodeVersion{allObjects[p.MaxKeys-1].NodeVersion, allObjects[p.MaxKeys].NodeVersion} - n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, res.NextVersionIDMarker), session) - - allObjects = allObjects[:p.MaxKeys] + if res.IsTruncated { + res.NextKeyMarker = objects[p.MaxKeys-1].NodeVersion.FilePath + res.NextVersionIDMarker = objects[p.MaxKeys-1].NodeVersion.OID.EncodeToString() } - res.Version, res.DeleteMarker = triageVersions(allObjects) + res.CommonPrefixes, objects = triageExtendedObjects(objects, p.Prefix, p.Delimiter) + res.Version, res.DeleteMarker = triageVersions(objects) return res, nil } @@ -309,33 +219,15 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) return } -func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (versions map[string][]*data.ExtendedNodeVersion, err error) { - owner := n.BearerOwner(ctx) - - versions = make(map[string][]*data.ExtendedNodeVersion, p.MaxKeys) - - cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker) - session := n.cache.GetListSession(owner, cacheKey) - if session != nil { - // after reading next object from stream in session - // the current cache value already doesn't match with next token in cache key - n.cache.DeleteListSession(owner, cacheKey) - } else { - session = &data.ListSession{NamesMap: make(map[string]struct{})} - session.Context, session.Cancel = context.WithCancel(context.Background()) - - if bd, err := middleware.GetBoxData(ctx); err == nil { - session.Context = middleware.SetBoxData(session.Context, bd) - } - - session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix) - if err != nil { - return nil, err - } +func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, bool, error) { + if p.MaxKeys == 0 { + return nil, false, nil } - poolCtx, cancel := context.WithCancel(ctx) - defer cancel() + session, err := n.getListVersionsSession(ctx, p) + if err != nil { + return nil, false, err + } pp := allObjectParams{ Bucket: p.BktInfo, @@ -346,22 +238,14 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersions MaxKeys: p.MaxKeys, } - generator, errorCh := nodesGeneratorVersions(poolCtx, pp, session) - objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, generator) + generator, errorCh := nodesGeneratorVersions(ctx, pp, session) + objOutCh, err := n.initWorkerPoolVersions(ctx, 2, pp, generator) if err != nil { - return nil, err + return nil, false, err } - //if session.Next != nil { - // name := session.Next.FilePath - // dirName := tryDirectoryName(session.Next, p.Prefix, p.Delimiter) - // if dirName != "" { - // name = dirName - // } - // - // versions[name] = []*data.ExtendedNodeVersion{{NodeVersion: session.Next}} - //} - + var lastName string + groupedVersions := make([][]*data.ExtendedNodeVersion, 0, p.MaxKeys) for eoi := range objOutCh { name := eoi.NodeVersion.FilePath @@ -370,21 +254,85 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersions name = dirName } - objVersions, ok := versions[name] - if !ok { - objVersions = []*data.ExtendedNodeVersion{eoi} + if lastName != name { + groupedVersions = append(groupedVersions, []*data.ExtendedNodeVersion{eoi}) } else if dirName == "" { - objVersions = append(objVersions, eoi) + groupedVersions[len(groupedVersions)-1] = append(groupedVersions[len(groupedVersions)-1], eoi) } - - versions[name] = objVersions + lastName = name } if err = <-errorCh; err != nil { - return nil, fmt.Errorf("failed to get next object from stream: %w", err) + return nil, false, fmt.Errorf("failed to get next object from stream: %w", err) } - return versions, nil + allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys) + + for _, versions := range groupedVersions { + sort.Slice(versions, func(i, j int) bool { + return versions[j].NodeVersion.Timestamp < versions[i].NodeVersion.Timestamp // sort in reverse order + }) + + for i, version := range versions { + version.IsLatest = i == 0 && (session.Next == nil || session.Next[0].FilePath != versions[0].NodeVersion.FilePath) + allObjects = append(allObjects, version) + } + } + + var isTruncated bool + if len(allObjects) > p.MaxKeys { + isTruncated = true + session.Next = make([]*data.NodeVersion, len(allObjects)-p.MaxKeys+1) + session.Next[0] = allObjects[p.MaxKeys-1].NodeVersion + + for i, node := range allObjects[p.MaxKeys:] { + session.Next[i+1] = node.NodeVersion + } + + session.Acquired.Store(false) + n.cache.PutListSession(n.BearerOwner(ctx), cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].OID.EncodeToString()), session) + + allObjects = allObjects[:p.MaxKeys] + } + + return allObjects, isTruncated, nil +} + +func (n *layer) getListVersionsSession(ctx context.Context, p *ListObjectVersionsParams) (*data.ListSession, error) { + owner := n.BearerOwner(ctx) + + cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker) + session := n.cache.GetListSession(owner, cacheKey) + if session == nil { + return n.initNewListVersionsSession(ctx, p) + + } + + if session.Acquired.Swap(true) { + return n.initNewListVersionsSession(ctx, p) + } + + // after reading next object from stream in session + // the current cache value already doesn't match with next token in cache key + n.cache.DeleteListSession(owner, cacheKey) + + return session, nil +} + +func (n *layer) initNewListVersionsSession(ctx context.Context, p *ListObjectVersionsParams) (session *data.ListSession, err error) { + session = &data.ListSession{NamesMap: make(map[string]struct{})} + session.Context, session.Cancel = context.WithCancel(context.Background()) + + if bd, err := middleware.GetBoxData(ctx); err == nil { + session.Context = middleware.SetBoxData(session.Context, bd) + } + + session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix) + if err != nil { + return nil, err + } + + return session, nil } func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { @@ -441,25 +389,17 @@ func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.L func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { nodeCh := make(chan *data.NodeVersion, 1000) errCh := make(chan error, 1) - //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories existed := stream.NamesMap delete(existed, continuationToken) - //if len(stream.Next) != 0 { - // existed[continuationToken] = struct{}{} - //} - - limit := p.MaxKeys + 1 - //if len(stream.Next) == 0 { - // limit++ - //} - go func() { - var generated int - var err error - - ind := 0 + var ( + generated int + ind int + err error + lastName string + ) LOOP: for err == nil { @@ -486,10 +426,10 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data break LOOP case nodeCh <- node: generated++ - - if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken + if generated > p.MaxKeys && node.FilePath != lastName { break LOOP } + lastName = node.FilePath } } close(nodeCh) @@ -627,25 +567,6 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjec return objCh, nil } -func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) { - var err error - - owner := n.BearerOwner(ctx) - cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix, false) - nodeVersions := n.cache.GetList(owner, cacheKey) - - if nodeVersions == nil { - nodeVersions, err = n.treeService.GetAllVersionsByPrefix(ctx, bkt, prefix) - if err != nil { - return nil, fmt.Errorf("get all versions from tree service: %w", err) - } - - n.cache.PutList(owner, cacheKey, nodeVersions) - } - - return nodeVersions, nil -} - func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { if node.IsDeleteMarker { return true diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 268e4135..a21fd3c4 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -680,6 +680,9 @@ func (s *LatestVersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.Node if s.innerStream == nil { node, err := s.mainStream.Next() if err != nil { + if errors.Is(err, ErrNodeNotFound) { + return nil, io.EOF + } if errors.Is(err, io.EOF) { s.ended = true if s.latestOnly && s.currentLatest != nil { diff --git a/pkg/service/tree/tree_client_in_memory.go b/pkg/service/tree/tree_client_in_memory.go index c6a5ed06..36e4e18c 100644 --- a/pkg/service/tree/tree_client_in_memory.go +++ b/pkg/service/tree/tree_client_in_memory.go @@ -236,9 +236,13 @@ func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.Bucket type SubTreeStreamImpl struct { res []NodeResponse offset int + err error } func (s *SubTreeStreamImpl) Next() (NodeResponse, error) { + if s.err != nil { + return nil, s.err + } if s.offset > len(s.res)-1 { return nil, io.EOF } @@ -249,7 +253,7 @@ func (s *SubTreeStreamImpl) Next() (NodeResponse, error) { func (c *ServiceClientMemory) GetSubTreeStream(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (SubTreeStream, error) { cnr, ok := c.containers[bktInfo.CID.EncodeToString()] if !ok { - return nil, nil + return &SubTreeStreamImpl{err: ErrNodeNotFound}, nil } tr, ok := cnr.trees[treeID]