[#525] Parallelize listing

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-06-16 12:32:58 +03:00 committed by Alex Vanin
parent 6ad7c988e6
commit ad95d1745c
3 changed files with 105 additions and 23 deletions

View file

@ -11,6 +11,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/nspcc-dev/neofs-s3-gw/api"
@ -21,6 +22,7 @@ import (
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
@ -408,13 +410,19 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
return &result, nil
}
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) ([]*data.ObjectInfo, *data.ObjectInfo, error) {
type logWrapper struct {
log *zap.Logger
}
func (l *logWrapper) Printf(format string, args ...interface{}) {
l.log.Info(fmt.Sprintf(format, args...))
}
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) {
if p.MaxKeys == 0 {
return nil, nil, nil
}
var err error
cacheKey := cache.CreateObjectsListCacheKey(&p.Bucket.CID, p.Prefix, true)
nodeVersions := n.listsCache.GetVersions(cacheKey)
@ -432,26 +440,96 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
return nodeVersions[i].FilePath < nodeVersions[j].FilePath
})
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
objects := make([]*data.ObjectInfo, 0, p.MaxKeys)
for _, node := range nodeVersions {
if shouldSkip(node, p, existed) {
continue
}
if len(objects) == p.MaxKeys {
return objects, &data.ObjectInfo{ID: node.OID, Name: node.FilePath}, nil
}
if obj := n.objectFromObjectsCacheOrNeoFS(ctx, p.Bucket, node.OID); obj != nil {
if oi := objectInfoFromMeta(p.Bucket, obj, p.Prefix, p.Delimiter); oi != nil {
objects = append(objects, oi)
}
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions))
if err != nil {
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
}
return objects, nil, nil
objects = make([]*data.ObjectInfo, 0, p.MaxKeys)
for obj := range objOutCh {
if len(objects) == p.MaxKeys { // todo reconsider stop condition
next = obj
break
}
objects = append(objects, obj)
}
sort.Slice(objects, func(i, j int) bool {
return objects[i].Name < objects[j].Name
})
return
}
func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion {
nodeCh := make(chan *data.NodeVersion)
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
go func() {
LOOP:
for _, node := range nodeVersions {
if shouldSkip(node, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- node:
}
}
close(nodeCh)
}()
return nodeCh
}
func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) {
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{n.log}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.ObjectInfo)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
func(node *data.NodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
if obj := n.objectFromObjectsCacheOrNeoFS(ctx, p.Bucket, node.OID); obj != nil {
if oi := objectInfoFromMeta(p.Bucket, obj, p.Prefix, p.Delimiter); oi != nil {
select {
case <-ctx.Done():
case objCh <- oi:
}
}
}
})
if err != nil {
wg.Done()
n.log.Warn("failed to submit task to pool", zap.Error(err))
}
}(node)
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) {