From 0ae49eaab066d8c47a239c077c60026f6271ec4c Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Sun, 21 Jan 2024 00:30:25 +0300 Subject: [PATCH] [#165] Generalize allObjectListingParams Signed-off-by: Denis Kirillov --- api/layer/listing.go | 101 ++++++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 50 deletions(-) diff --git a/api/layer/listing.go b/api/layer/listing.go index f827b9af9..6f0b305d7 100644 --- a/api/layer/listing.go +++ b/api/layer/listing.go @@ -73,13 +73,13 @@ type ( VersionIDMarker string } - allObjectParams struct { - Bucket *data.BucketInfo - Delimiter string - Prefix string - MaxKeys int - Marker string - ContinuationToken string + allObjectListingParams struct { + BktInfo *data.BucketInfo + Delimiter string + Prefix string + MaxKeys int + Marker string + Bookmark string } ) @@ -87,12 +87,13 @@ type ( func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { var result ListObjectsInfoV1 - prm := allObjectParams{ - Bucket: p.BktInfo, + prm := allObjectListingParams{ + BktInfo: p.BktInfo, Delimiter: p.Delimiter, Prefix: p.Prefix, MaxKeys: p.MaxKeys, Marker: p.Marker, + Bookmark: p.Marker, } objects, next, err := n.getLatestObjectsVersions(ctx, prm) @@ -114,13 +115,13 @@ func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) { var result ListObjectsInfoV2 - prm := allObjectParams{ - Bucket: p.BktInfo, - Delimiter: p.Delimiter, - Prefix: p.Prefix, - MaxKeys: p.MaxKeys, - Marker: p.StartAfter, - ContinuationToken: p.ContinuationToken, + prm := allObjectListingParams{ + BktInfo: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + MaxKeys: p.MaxKeys, + Marker: p.StartAfter, + Bookmark: p.ContinuationToken, } objects, next, err := n.getLatestObjectsVersions(ctx, prm) @@ -139,7 +140,16 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis } func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { - objects, isTruncated, err := n.getAllObjectsVersions(ctx, p) + prm := allObjectListingParams{ + BktInfo: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + MaxKeys: p.MaxKeys, + Marker: p.KeyMarker, + Bookmark: p.VersionIDMarker, + } + + objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm) if err != nil { return nil, err } @@ -160,13 +170,13 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar return res, nil } -func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) { +func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectListingParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) { if p.MaxKeys == 0 { return nil, nil, nil } owner := n.BearerOwner(ctx) - cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken) // todo redo for listv1 + cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) // todo redo for listv1 session := n.cache.GetListSession(owner, cacheKey) if session != nil { // after reading next object from stream in session @@ -180,7 +190,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) session.Context = middleware.SetBoxData(session.Context, bd) } - session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.Bucket, p.Prefix) + session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix) if err != nil { return nil, nil, err } @@ -213,13 +223,13 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) if next != nil { session.Next = []*data.NodeVersion{next} - n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.OID.EncodeToString()), session) + n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, next.OID.EncodeToString()), session) } return } -func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, bool, error) { +func (n *layer) getAllObjectsVersions(ctx context.Context, p allObjectListingParams) ([]*data.ExtendedNodeVersion, bool, error) { if p.MaxKeys == 0 { return nil, false, nil } @@ -229,17 +239,8 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersions return nil, false, err } - pp := allObjectParams{ - Bucket: p.BktInfo, - Delimiter: p.Delimiter, - Prefix: p.Prefix, - Marker: p.KeyMarker, - ContinuationToken: p.VersionIDMarker, - MaxKeys: p.MaxKeys, - } - - generator, errorCh := nodesGeneratorVersions(ctx, pp, session) - objOutCh, err := n.initWorkerPoolVersions(ctx, 2, pp, generator) + generator, errorCh := nodesGeneratorVersions(ctx, p, session) + objOutCh, err := n.initWorkerPoolVersions(ctx, 2, p, generator) if err != nil { return nil, false, err } @@ -298,10 +299,10 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersions return allObjects, isTruncated, nil } -func (n *layer) getListVersionsSession(ctx context.Context, p *ListObjectVersionsParams) (*data.ListSession, error) { +func (n *layer) getListVersionsSession(ctx context.Context, p allObjectListingParams) (*data.ListSession, error) { owner := n.BearerOwner(ctx) - cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker) + cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) session := n.cache.GetListSession(owner, cacheKey) if session == nil { return n.initNewListVersionsSession(ctx, p) @@ -319,7 +320,7 @@ func (n *layer) getListVersionsSession(ctx context.Context, p *ListObjectVersion return session, nil } -func (n *layer) initNewListVersionsSession(ctx context.Context, p *ListObjectVersionsParams) (session *data.ListSession, err error) { +func (n *layer) initNewListVersionsSession(ctx context.Context, p allObjectListingParams) (session *data.ListSession, err error) { session = &data.ListSession{NamesMap: make(map[string]struct{})} session.Context, session.Cancel = context.WithCancel(context.Background()) @@ -335,7 +336,7 @@ func (n *layer) initNewListVersionsSession(ctx context.Context, p *ListObjectVer return session, nil } -func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { +func nodesGeneratorStream(ctx context.Context, p allObjectListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { nodeCh := make(chan *data.NodeVersion, 1000) errCh := make(chan error, 1) //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories @@ -386,7 +387,7 @@ func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.L return nodeCh, errCh } -func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { +func nodesGeneratorVersions(ctx context.Context, p allObjectListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { nodeCh := make(chan *data.NodeVersion, 1000) errCh := make(chan error, 1) existed := stream.NamesMap @@ -439,7 +440,7 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data return nodeCh, errCh } -func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) { +func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectListingParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { @@ -473,10 +474,10 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectP wg.Add(1) err = pool.Submit(func() { defer wg.Done() - oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node) + oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node) if oi == nil { // try to get object again - if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node); oi == nil { + if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node); oi == nil { // do not process object which are definitely missing in object service return } @@ -504,7 +505,7 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectP return objCh, nil } -func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) { +func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectListingParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { @@ -536,10 +537,10 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjec err = pool.Submit(func() { defer wg.Done() - oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node) + oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node) if oi == nil { // try to get object again - if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node); oi == nil { + if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node); oi == nil { // do not process object which are definitely missing in object service return } @@ -567,7 +568,7 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjec return objCh, nil } -func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { +func shouldSkip(node *data.NodeVersion, p allObjectListingParams, existed map[string]struct{}) bool { if node.IsDeleteMarker { return true } @@ -584,9 +585,9 @@ func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]st return true } - if p.ContinuationToken != "" { + if p.Bookmark != "" { if _, ok := existed[continuationToken]; !ok { - if p.ContinuationToken != node.OID.EncodeToString() { + if p.Bookmark != node.OID.EncodeToString() { return true } existed[continuationToken] = struct{}{} @@ -597,7 +598,7 @@ func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]st return false } -func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { +func shouldSkipVersions(node *data.NodeVersion, p allObjectListingParams, existed map[string]struct{}) bool { filePath := node.FilePath if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { filePath = dirName @@ -610,9 +611,9 @@ func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[s return true } - if p.ContinuationToken != "" { + if p.Bookmark != "" { if _, ok := existed[continuationToken]; !ok { - if p.ContinuationToken != node.OID.EncodeToString() { + if p.Bookmark != node.OID.EncodeToString() { return true } existed[continuationToken] = struct{}{}