forked from TrueCloudLab/frostfs-s3-gw
[#625] Limit listing generator with maxKeys
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
3824151699
commit
6aab3936d9
1 changed files with 28 additions and 11 deletions
|
@ -528,12 +528,6 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
|
||||||
objects = make([]*data.ObjectInfo, 0, p.MaxKeys)
|
objects = make([]*data.ObjectInfo, 0, p.MaxKeys)
|
||||||
|
|
||||||
for obj := range objOutCh {
|
for obj := range objOutCh {
|
||||||
// TODO (@kirillovdenis) : #625 #612, #525 reconsider stop condition
|
|
||||||
// currently we handle 3 more items to reduce the likelihood of missing the last object in batch
|
|
||||||
// (potentially we can miss it because of pool of workers)
|
|
||||||
if len(objects) == p.MaxKeys+3 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
objects = append(objects, obj)
|
objects = append(objects, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -554,6 +548,7 @@ func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data
|
||||||
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
|
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
var generated int
|
||||||
LOOP:
|
LOOP:
|
||||||
for _, node := range nodeVersions {
|
for _, node := range nodeVersions {
|
||||||
if shouldSkip(node, p, existed) {
|
if shouldSkip(node, p, existed) {
|
||||||
|
@ -564,6 +559,10 @@ func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break LOOP
|
break LOOP
|
||||||
case nodeCh <- node:
|
case nodeCh <- node:
|
||||||
|
generated++
|
||||||
|
if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
|
||||||
|
break LOOP
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(nodeCh)
|
close(nodeCh)
|
||||||
|
@ -596,12 +595,18 @@ func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams,
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
err = pool.Submit(func() {
|
err = pool.Submit(func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if oi := n.objectInfoFromObjectsCacheOrNeoFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi != nil {
|
oi := n.objectInfoFromObjectsCacheOrNeoFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter)
|
||||||
|
if oi == nil {
|
||||||
|
// try to get object again
|
||||||
|
if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil {
|
||||||
|
// form object info with data that the tree node contains
|
||||||
|
oi = getPartialObjectInfo(p.Bucket, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
case objCh <- oi:
|
case objCh <- oi:
|
||||||
}
|
}
|
||||||
}
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -617,6 +622,18 @@ func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams,
|
||||||
return objCh, nil
|
return objCh, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPartialObjectInfo form data.ObjectInfo using data available in data.NodeVersion.
|
||||||
|
func getPartialObjectInfo(bktInfo *data.BucketInfo, node *data.NodeVersion) *data.ObjectInfo {
|
||||||
|
return &data.ObjectInfo{
|
||||||
|
ID: node.OID,
|
||||||
|
CID: bktInfo.CID,
|
||||||
|
Bucket: bktInfo.Name,
|
||||||
|
Name: node.FilePath,
|
||||||
|
Size: node.Size,
|
||||||
|
HashSum: node.ETag,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
|
func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -756,7 +773,7 @@ func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *
|
||||||
n.log.Warn("couldn't cache an object", zap.Error(err))
|
n.log.Warn("couldn't cache an object", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return processObjectInfoName(oi, prefix, delimiter)
|
return oi
|
||||||
}
|
}
|
||||||
|
|
||||||
func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo {
|
func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo {
|
||||||
|
|
Loading…
Reference in a new issue