From 2d7973b3f1f19ead73e2e6a9a11acb4fe87b207f Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Sun, 21 Jan 2024 01:13:35 +0300 Subject: [PATCH] [#165] Refactor list versions Signed-off-by: Denis Kirillov --- api/layer/layer.go | 5 +- api/layer/listing.go | 266 +++++++++++++++++++++----------------- api/layer/tree_mock.go | 58 ++------- api/layer/tree_service.go | 5 +- pkg/service/tree/tree.go | 148 +-------------------- 5 files changed, 169 insertions(+), 313 deletions(-) diff --git a/api/layer/layer.go b/api/layer/layer.go index ed3d9dfa8..ecf1df596 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -809,16 +809,15 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) } func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { - res, err := n.ListObjectVersions(ctx, &ListObjectVersionsParams{ + res, _, err := n.getAllObjectsVersions(ctx, commonVersionsListingParams{ BktInfo: p.BktInfo, MaxKeys: 1, }) - //todo fix ^ if err != nil { return err } - if len(res.DeleteMarker) != 0 || len(res.Version) != 0 { + if len(res) != 0 { return errors.GetAPIError(errors.ErrBucketNotEmpty) } diff --git a/api/layer/listing.go b/api/layer/listing.go index e70391075..6099008c7 100644 --- a/api/layer/listing.go +++ b/api/layer/listing.go @@ -73,29 +73,42 @@ type ( VersionIDMarker string } - allObjectListingParams struct { - BktInfo *data.BucketInfo - Delimiter string - Prefix string - MaxKeys int - Marker string - Bookmark string - VersionAPi string + commonVersionsListingParams struct { + BktInfo *data.BucketInfo + Delimiter string + Prefix string + MaxKeys int + Marker string + Bookmark string } + + commonLatestVersionsListingParams struct { + commonVersionsListingParams + ListType ListType + } +) + +type ListType int + +const ( + ListObjectsV1Type ListType = iota + 1 + ListObjectsV2Type ListType = iota + 1 ) // ListObjectsV1 returns objects in a bucket for requests of Version 1. func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { var result ListObjectsInfoV1 - prm := allObjectListingParams{ - BktInfo: p.BktInfo, - Delimiter: p.Delimiter, - Prefix: p.Prefix, - MaxKeys: p.MaxKeys, - Marker: p.Marker, - Bookmark: p.Marker, - VersionAPi: "v1", + prm := commonLatestVersionsListingParams{ + commonVersionsListingParams: commonVersionsListingParams{ + BktInfo: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + MaxKeys: p.MaxKeys, + Marker: p.Marker, + Bookmark: p.Marker, + }, + ListType: ListObjectsV1Type, } objects, next, err := n.getLatestObjectsVersions(ctx, prm) @@ -117,14 +130,16 @@ func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) { var result ListObjectsInfoV2 - prm := allObjectListingParams{ - BktInfo: p.BktInfo, - Delimiter: p.Delimiter, - Prefix: p.Prefix, - MaxKeys: p.MaxKeys, - Marker: p.StartAfter, - Bookmark: p.ContinuationToken, - VersionAPi: "v2", + prm := commonLatestVersionsListingParams{ + commonVersionsListingParams: commonVersionsListingParams{ + BktInfo: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + MaxKeys: p.MaxKeys, + Marker: p.StartAfter, + Bookmark: p.ContinuationToken, + }, + ListType: ListObjectsV2Type, } objects, next, err := n.getLatestObjectsVersions(ctx, prm) @@ -143,14 +158,13 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis } func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { - prm := allObjectListingParams{ - BktInfo: p.BktInfo, - Delimiter: p.Delimiter, - Prefix: p.Prefix, - MaxKeys: p.MaxKeys, - Marker: p.KeyMarker, - Bookmark: p.VersionIDMarker, - VersionAPi: "vs", + prm := commonVersionsListingParams{ + BktInfo: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + MaxKeys: p.MaxKeys, + Marker: p.KeyMarker, + Bookmark: p.VersionIDMarker, } objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm) @@ -174,44 +188,24 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar return res, nil } -func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectListingParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) { +func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVersionsListingParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) { if p.MaxKeys == 0 { return nil, nil, nil } - owner := n.BearerOwner(ctx) - cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) // todo redo for listv1 - session := n.cache.GetListSession(owner, cacheKey) - if session != nil { - // after reading next object from stream in session - // the current cache value already doesn't match with next token in cache key - n.cache.DeleteListSession(owner, cacheKey) - } else { - session = &data.ListSession{NamesMap: make(map[string]struct{})} - session.Context, session.Cancel = context.WithCancel(context.Background()) - - if bd, err := middleware.GetBoxData(ctx); err == nil { - session.Context = middleware.SetBoxData(session.Context, bd) - } - - session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix) - if err != nil { - return nil, nil, err - } + session, err := n.getListLatestVersionsSession(ctx, p) + if err != nil { + return nil, nil, err } - poolCtx, cancel := context.WithCancel(ctx) - defer cancel() - - generator, errorCh := nodesGeneratorStream(poolCtx, p, session) - objOutCh, err := n.initWorkerPoolStream(poolCtx, 2, p, generator) + generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session) + objOutCh, err := n.initWorkerPoolStream(ctx, 2, p.commonVersionsListingParams, generator) if err != nil { return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) } objects = make([]*data.NodeVersion, 0, p.MaxKeys+1) objects = append(objects, session.Next...) - for obj := range objOutCh { objects = append(objects, obj) } @@ -222,27 +216,19 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectListing if len(objects) > p.MaxKeys { next = objects[p.MaxKeys] + n.putListLatestVersionsSession(ctx, p, session, objects) objects = objects[:p.MaxKeys] } - if next != nil { - session.Next = []*data.NodeVersion{next} - if p.VersionAPi == "v1" { - n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, objects[len(objects)-1].FilePath), session) - } else { - n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, next.OID.EncodeToString()), session) - } - } - return } -func (n *layer) getAllObjectsVersions(ctx context.Context, p allObjectListingParams) ([]*data.ExtendedNodeVersion, bool, error) { +func (n *layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListingParams) ([]*data.ExtendedNodeVersion, bool, error) { if p.MaxKeys == 0 { return nil, false, nil } - session, err := n.getListVersionsSession(ctx, p) + session, err := n.getListAllVersionsSession(ctx, p) if err != nil { return nil, false, err } @@ -253,8 +239,26 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p allObjectListingPar return nil, false, err } + allObjects := handleGeneratedVersions(objOutCh, p, session) + + if err = <-errorCh; err != nil { + return nil, false, fmt.Errorf("failed to get next object from stream: %w", err) + } + + var isTruncated bool + if len(allObjects) > p.MaxKeys { + isTruncated = true + n.putListAllVersionsSession(ctx, p, session, allObjects) + allObjects = allObjects[:p.MaxKeys] + } + + return allObjects, isTruncated, nil +} + +func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p commonVersionsListingParams, session *data.ListSession) []*data.ExtendedNodeVersion { var lastName string - groupedVersions := make([][]*data.ExtendedNodeVersion, 0, p.MaxKeys) + var listRowStartIndex int + allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys) for eoi := range objOutCh { name := eoi.NodeVersion.FilePath @@ -264,61 +268,53 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p allObjectListingPar } if lastName != name { - groupedVersions = append(groupedVersions, []*data.ExtendedNodeVersion{eoi}) + formVersionsListRow(allObjects, listRowStartIndex, session) + listRowStartIndex = len(allObjects) + allObjects = append(allObjects, eoi) } else if dirName == "" { - groupedVersions[len(groupedVersions)-1] = append(groupedVersions[len(groupedVersions)-1], eoi) + allObjects = append(allObjects, eoi) } lastName = name } - if err = <-errorCh; err != nil { - return nil, false, fmt.Errorf("failed to get next object from stream: %w", err) - } + formVersionsListRow(allObjects, listRowStartIndex, session) - allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys) - - for _, versions := range groupedVersions { - sort.Slice(versions, func(i, j int) bool { - return versions[j].NodeVersion.Timestamp < versions[i].NodeVersion.Timestamp // sort in reverse order - }) - - for i, version := range versions { - version.IsLatest = i == 0 && (session.Next == nil || session.Next[0].FilePath != versions[0].NodeVersion.FilePath) - allObjects = append(allObjects, version) - } - } - - var isTruncated bool - if len(allObjects) > p.MaxKeys { - isTruncated = true - session.Next = make([]*data.NodeVersion, len(allObjects)-p.MaxKeys+1) - session.Next[0] = allObjects[p.MaxKeys-1].NodeVersion - - for i, node := range allObjects[p.MaxKeys:] { - session.Next[i+1] = node.NodeVersion - } - - session.Acquired.Store(false) - n.cache.PutListSession(n.BearerOwner(ctx), cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].OID.EncodeToString()), session) - - allObjects = allObjects[:p.MaxKeys] - } - - return allObjects, isTruncated, nil + return allObjects } -func (n *layer) getListVersionsSession(ctx context.Context, p allObjectListingParams) (*data.ListSession, error) { +func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int, session *data.ListSession) { + if len(objects) == 0 { + return + } + + prevVersions := objects[rowStartIndex:] + sort.Slice(prevVersions, func(i, j int) bool { + return prevVersions[j].NodeVersion.Timestamp < prevVersions[i].NodeVersion.Timestamp // sort in reverse order to have last added first + }) + + objects[rowStartIndex].IsLatest = len(session.Next) == 0 || session.Next[0].FilePath != objects[rowStartIndex].NodeVersion.FilePath +} + +func (n *layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) { + return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true) +} + +func (n *layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) { + return n.getListVersionsSession(ctx, p, false) +} + +func (n *layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) { owner := n.BearerOwner(ctx) cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) session := n.cache.GetListSession(owner, cacheKey) if session == nil { - return n.initNewListVersionsSession(ctx, p) + return n.initNewVersionsByPrefixSession(ctx, p, latestOnly) } if session.Acquired.Swap(true) { - return n.initNewListVersionsSession(ctx, p) + return n.initNewVersionsByPrefixSession(ctx, p, latestOnly) } // after reading next object from stream in session @@ -328,7 +324,7 @@ func (n *layer) getListVersionsSession(ctx context.Context, p allObjectListingPa return session, nil } -func (n *layer) initNewListVersionsSession(ctx context.Context, p allObjectListingParams) (session *data.ListSession, err error) { +func (n *layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) { session = &data.ListSession{NamesMap: make(map[string]struct{})} session.Context, session.Cancel = context.WithCancel(context.Background()) @@ -336,7 +332,7 @@ func (n *layer) initNewListVersionsSession(ctx context.Context, p allObjectListi session.Context = middleware.SetBoxData(session.Context, bd) } - session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix) + session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly) if err != nil { return nil, err } @@ -344,7 +340,45 @@ func (n *layer) initNewListVersionsSession(ctx context.Context, p allObjectListi return session, nil } -func nodesGeneratorStream(ctx context.Context, p allObjectListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { +func (n *layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.NodeVersion) { + if len(allObjects) <= p.MaxKeys { + return + } + + var cacheKey cache.ListSessionKey + switch p.ListType { + case ListObjectsV1Type: + cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys-1].FilePath) + case ListObjectsV2Type: + cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys].OID.EncodeToString()) + default: + // should never happen + panic("invalid list type") + } + + session.Acquired.Store(false) + session.Next = []*data.NodeVersion{allObjects[p.MaxKeys]} + n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session) +} + +func (n *layer) putListAllVersionsSession(ctx context.Context, p commonVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) { + if len(allObjects) <= p.MaxKeys { + return + } + + session.Acquired.Store(false) + + session.Next = make([]*data.NodeVersion, len(allObjects)-p.MaxKeys+1) + session.Next[0] = allObjects[p.MaxKeys-1].NodeVersion + for i, node := range allObjects[p.MaxKeys:] { + session.Next[i+1] = node.NodeVersion + } + + cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].OID.EncodeToString()) + n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session) +} + +func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { nodeCh := make(chan *data.NodeVersion, 1000) errCh := make(chan error, 1) //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories @@ -395,7 +429,7 @@ func nodesGeneratorStream(ctx context.Context, p allObjectListingParams, stream return nodeCh, errCh } -func nodesGeneratorVersions(ctx context.Context, p allObjectListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { +func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { nodeCh := make(chan *data.NodeVersion, 1000) errCh := make(chan error, 1) existed := stream.NamesMap @@ -448,7 +482,7 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectListingParams, strea return nodeCh, errCh } -func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectListingParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) { +func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { @@ -513,7 +547,7 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectL return objCh, nil } -func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectListingParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) { +func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { @@ -576,7 +610,7 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjec return objCh, nil } -func shouldSkip(node *data.NodeVersion, p allObjectListingParams, existed map[string]struct{}) bool { +func shouldSkip(node *data.NodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool { if node.IsDeleteMarker { return true } @@ -606,7 +640,7 @@ func shouldSkip(node *data.NodeVersion, p allObjectListingParams, existed map[st return false } -func shouldSkipVersions(node *data.NodeVersion, p allObjectListingParams, existed map[string]struct{}) bool { +func shouldSkipVersions(node *data.NodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool { filePath := node.FilePath if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { filePath = dirName diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 564ef6210..3a02e4faa 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -11,12 +11,12 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) -type LatestVersionsByPrefixStreamMock struct { +type VersionsByPrefixStreamMock struct { result []*data.NodeVersion offset int } -func (s *LatestVersionsByPrefixStreamMock) Next(context.Context) (*data.NodeVersion, error) { +func (s *VersionsByPrefixStreamMock) Next(context.Context) (*data.NodeVersion, error) { if s.offset > len(s.result)-1 { return nil, io.EOF } @@ -187,7 +187,7 @@ func (t *TreeServiceMock) GetLatestVersion(_ context.Context, bktInfo *data.Buck return nil, ErrNodeNotFound } -func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) { +func (t *TreeServiceMock) InitVersionsByPrefixStream(_ context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) { cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] if !ok { return nil, ErrNodeNotFound @@ -200,6 +200,11 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo * continue } + if !latestOnly { + result = append(result, versions...) + continue + } + sort.Slice(versions, func(i, j int) bool { return versions[i].ID < versions[j].ID }) @@ -209,52 +214,7 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo * } } - return result, nil -} - -func (t *TreeServiceMock) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) { - cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] - if !ok { - return nil, ErrNodeNotFound - } - - var result []*data.NodeVersion - - for key, versions := range cnrVersionsMap { - if !strings.HasPrefix(key, prefix) { - continue - } - - sort.Slice(versions, func(i, j int) bool { - return versions[i].ID < versions[j].ID - }) - - if len(versions) != 0 { - result = append(result, versions[len(versions)-1]) - } - } - - return &LatestVersionsByPrefixStreamMock{ - result: result, - }, nil -} - -func (t *TreeServiceMock) GetAllVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) { - cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] - if !ok { - return nil, ErrNodeNotFound - } - - var result []*data.NodeVersion - - for key, versions := range cnrVersionsMap { - if !strings.HasPrefix(key, prefix) { - continue - } - result = append(result, versions...) - } - - return &LatestVersionsByPrefixStreamMock{ + return &VersionsByPrefixStreamMock{ result: result, }, nil } diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 59ed314d8..1ec59c76c 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -54,10 +54,7 @@ type TreeService interface { GetVersions(ctx context.Context, bktInfo *data.BucketInfo, objectName string) ([]*data.NodeVersion, error) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) - GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) - GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) - GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) - GetAllVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) + InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index a21fd3c46..8436fc39f 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -638,10 +638,6 @@ func pathFromName(objectName string) []string { return strings.Split(objectName, separator) } -func (c *Tree) GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) { - return c.getVersionsByPrefix(ctx, bktInfo, prefix, true) -} - type DummySubTreeStream struct { data NodeResponse read bool @@ -656,7 +652,7 @@ func (s *DummySubTreeStream) Next() (NodeResponse, error) { return s.data, nil } -type LatestVersionsByPrefixStreamImpl struct { +type VersionsByPrefixStreamImpl struct { ctx context.Context rootID uint64 intermediateRootID uint64 @@ -672,7 +668,8 @@ type LatestVersionsByPrefixStreamImpl struct { currentLatest *data.NodeVersion } -func (s *LatestVersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.NodeVersion, error) { +// Next todo remove recursion +func (s *VersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.NodeVersion, error) { if s.ended { return nil, io.EOF } @@ -771,16 +768,16 @@ func (s *LatestVersionsByPrefixStreamImpl) Next(ctx context.Context) (*data.Node return nodeVersion, nil } -func (c *Tree) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) { +func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) { mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix) if err != nil { if errors.Is(err, io.EOF) { - return &LatestVersionsByPrefixStreamImpl{ended: true}, nil + return &VersionsByPrefixStreamImpl{ended: true}, nil } return nil, err } - return &LatestVersionsByPrefixStreamImpl{ + return &VersionsByPrefixStreamImpl{ ctx: ctx, namesMap: map[uint64]string{}, rootID: rootID, @@ -789,7 +786,7 @@ func (c *Tree) GetLatestVersionsByPrefixStream(ctx context.Context, bktInfo *dat mainStream: mainStream, headPrefix: strings.TrimSuffix(prefix, tailPrefix), tailPrefix: tailPrefix, - latestOnly: true, + latestOnly: latestOnly, }, nil } @@ -931,73 +928,6 @@ func isIntermediate(node NodeResponse) bool { return node.GetMeta()[0].GetKey() == FileNameKey } -func (c *Tree) getSubTreeVersionsOld(ctx context.Context, bktInfo *data.BucketInfo, node NodeResponse, parentFilePath string, latestOnly bool) ([]*data.NodeVersion, error) { - return c.getSubTreeVersions(ctx, bktInfo, node, parentFilePath, latestOnly, false) -} - -func (c *Tree) getSubTreeVersions(ctx context.Context, bktInfo *data.BucketInfo, node NodeResponse, parentFilePath string, latestOnly, skipLeafs bool) ([]*data.NodeVersion, error) { - var err error - subTree := []NodeResponse{node} - if !skipLeafs || isIntermediate(node) { - subTree, err = c.service.GetSubTree(ctx, bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth) - if err != nil { - return nil, err - } - } - - var parentPrefix string - if parentFilePath != "" { // The root of subTree can also have a parent - parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar' - } - - var emptyOID oid.ID - var filepath string - namesMap := make(map[uint64]string, len(subTree)) - versions := make(map[string][]*data.NodeVersion, len(subTree)) - - for i, node := range subTree { - treeNode, fileName, err := parseTreeNode(node) - if err != nil { - continue - } - - if i != 0 { - if filepath, err = formFilePath(node, fileName, namesMap); err != nil { - return nil, fmt.Errorf("invalid node order: %w", err) - } - } else { - filepath = parentPrefix + fileName - namesMap[treeNode.ID] = filepath - } - - if treeNode.ObjID.Equals(emptyOID) { // The node can be intermediate but we still want to update namesMap - continue - } - - key := formLatestNodeKey(node.GetParentID(), fileName) - versionNodes, ok := versions[key] - if !ok { - versionNodes = []*data.NodeVersion{newNodeVersionFromTreeNode(filepath, treeNode)} - } else if !latestOnly { - versionNodes = append(versionNodes, newNodeVersionFromTreeNode(filepath, treeNode)) - } else if versionNodes[0].Timestamp <= treeNode.TimeStamp { - versionNodes[0] = newNodeVersionFromTreeNode(filepath, treeNode) - } - - versions[key] = versionNodes - } - - result := make([]*data.NodeVersion, 0, len(versions)) // consider use len(subTree) - for _, version := range versions { - if latestOnly && version[0].IsDeleteMarker { - continue - } - result = append(result, version...) - } - - return result, nil -} - func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) { parentPath, ok := namesMap[node.GetParentID()] if !ok { @@ -1028,70 +958,6 @@ func formLatestNodeKey(parentID uint64, fileName string) string { return strconv.FormatUint(parentID, 10) + "." + fileName } -func (c *Tree) GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) { - return c.getVersionsByPrefixOld(ctx, bktInfo, prefix, false) -} - -func (c *Tree) GetAllVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string) (data.VersionsStream, error) { - //return c.getVersionsByPrefixOld(ctx, bktInfo, prefix, false) - - mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix) - if err != nil { - if errors.Is(err, io.EOF) { - return &LatestVersionsByPrefixStreamImpl{ended: true}, nil - } - return nil, err - } - - return &LatestVersionsByPrefixStreamImpl{ - ctx: ctx, - namesMap: map[uint64]string{}, - rootID: rootID, - service: c.service, - bktInfo: bktInfo, - mainStream: mainStream, - headPrefix: strings.TrimSuffix(prefix, tailPrefix), - tailPrefix: tailPrefix, - }, nil - -} - -func (c *Tree) getVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]*data.NodeVersion, error) { - prefixNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, versionTree, prefix, latestOnly) - if err != nil { - return nil, err - } - - var result []*data.NodeVersion - for _, node := range prefixNodes { - versions, err := c.getSubTreeVersions(ctx, bktInfo, node, headPrefix, latestOnly, true) - if err != nil { - return nil, err - } - result = append(result, versions...) - } - - return result, nil -} - -func (c *Tree) getVersionsByPrefixOld(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]*data.NodeVersion, error) { - prefixNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, versionTree, prefix, latestOnly) - if err != nil { - return nil, err - } - - var result []*data.NodeVersion - for _, node := range prefixNodes { - versions, err := c.getSubTreeVersionsOld(ctx, bktInfo, node, headPrefix, latestOnly) - if err != nil { - return nil, err - } - result = append(result, versions...) - } - - return result, nil -} - func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) { return c.getUnversioned(ctx, bktInfo, versionTree, filepath) }