From 6aab3936d9e9424918cb60cf6ea27cad147e6491 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Sat, 13 Aug 2022 13:07:57 +0300 Subject: [PATCH] [#625] Limit listing generator with maxKeys Signed-off-by: Denis Kirillov --- api/layer/object.go | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/api/layer/object.go b/api/layer/object.go index 09ca9948..2b294bf4 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -528,12 +528,6 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) objects = make([]*data.ObjectInfo, 0, p.MaxKeys) for obj := range objOutCh { - // TODO (@kirillovdenis) : #625 #612, #525 reconsider stop condition - // currently we handle 3 more items to reduce the likelihood of missing the last object in batch - // (potentially we can miss it because of pool of workers) - if len(objects) == p.MaxKeys+3 { - break - } objects = append(objects, obj) } @@ -554,6 +548,7 @@ func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories go func() { + var generated int LOOP: for _, node := range nodeVersions { if shouldSkip(node, p, existed) { @@ -564,6 +559,10 @@ func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data case <-ctx.Done(): break LOOP case nodeCh <- node: + generated++ + if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken + break LOOP + } } } close(nodeCh) @@ -596,12 +595,18 @@ func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, wg.Add(1) err = pool.Submit(func() { defer wg.Done() - if oi := n.objectInfoFromObjectsCacheOrNeoFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi != nil { - select { - case <-ctx.Done(): - case objCh <- oi: + oi := n.objectInfoFromObjectsCacheOrNeoFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) + if oi == nil { + // try to get object again + if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { + // form object info with data that the tree node contains + oi = getPartialObjectInfo(p.Bucket, node) } } + select { + case <-ctx.Done(): + case objCh <- oi: + } }) if err != nil { wg.Done() @@ -617,6 +622,18 @@ func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, return objCh, nil } +// getPartialObjectInfo form data.ObjectInfo using data available in data.NodeVersion. +func getPartialObjectInfo(bktInfo *data.BucketInfo, node *data.NodeVersion) *data.ObjectInfo { + return &data.ObjectInfo{ + ID: node.OID, + CID: bktInfo.CID, + Bucket: bktInfo.Name, + Name: node.FilePath, + Size: node.Size, + HashSum: node.ETag, + } +} + func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) { var err error @@ -756,7 +773,7 @@ func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo * n.log.Warn("couldn't cache an object", zap.Error(err)) } - return processObjectInfoName(oi, prefix, delimiter) + return oi } func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo {