From ad95d1745ccc5a8251e33e983e77758df11306d8 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 16 Jun 2022 12:32:58 +0300 Subject: [PATCH] [#525] Parallelize listing Signed-off-by: Denis Kirillov --- api/layer/object.go | 120 ++++++++++++++++++++++++++++++++++++-------- go.mod | 3 +- go.sum | 5 +- 3 files changed, 105 insertions(+), 23 deletions(-) diff --git a/api/layer/object.go b/api/layer/object.go index 23df295c..67377ef4 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -11,6 +11,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/nspcc-dev/neofs-s3-gw/api" @@ -21,6 +22,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -408,13 +410,19 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis return &result, nil } -func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) ([]*data.ObjectInfo, *data.ObjectInfo, error) { +type logWrapper struct { + log *zap.Logger +} + +func (l *logWrapper) Printf(format string, args ...interface{}) { + l.log.Info(fmt.Sprintf(format, args...)) +} + +func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) { if p.MaxKeys == 0 { return nil, nil, nil } - var err error - cacheKey := cache.CreateObjectsListCacheKey(&p.Bucket.CID, p.Prefix, true) nodeVersions := n.listsCache.GetVersions(cacheKey) @@ -432,26 +440,96 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) return nodeVersions[i].FilePath < nodeVersions[j].FilePath }) - existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories - objects := make([]*data.ObjectInfo, 0, p.MaxKeys) - - for _, node := range nodeVersions { - if shouldSkip(node, p, existed) { - continue - } - - if len(objects) == p.MaxKeys { - return objects, &data.ObjectInfo{ID: node.OID, Name: node.FilePath}, nil - } - - if obj := n.objectFromObjectsCacheOrNeoFS(ctx, p.Bucket, node.OID); obj != nil { - if oi := objectInfoFromMeta(p.Bucket, obj, p.Prefix, p.Delimiter); oi != nil { - objects = append(objects, oi) - } - } + poolCtx, cancel := context.WithCancel(ctx) + defer cancel() + objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions)) + if err != nil { + return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) } - return objects, nil, nil + objects = make([]*data.ObjectInfo, 0, p.MaxKeys) + + for obj := range objOutCh { + if len(objects) == p.MaxKeys { // todo reconsider stop condition + next = obj + break + } + objects = append(objects, obj) + } + + sort.Slice(objects, func(i, j int) bool { + return objects[i].Name < objects[j].Name + }) + + return +} + +func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { + nodeCh := make(chan *data.NodeVersion) + existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories + + go func() { + LOOP: + for _, node := range nodeVersions { + if shouldSkip(node, p, existed) { + continue + } + + select { + case <-ctx.Done(): + break LOOP + case nodeCh <- node: + } + } + close(nodeCh) + }() + + return nodeCh +} + +func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { + pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{n.log})) + if err != nil { + return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) + } + objCh := make(chan *data.ObjectInfo) + + go func() { + var wg sync.WaitGroup + + LOOP: + for node := range input { + select { + case <-ctx.Done(): + break LOOP + default: + } + + func(node *data.NodeVersion) { + wg.Add(1) + err = pool.Submit(func() { + defer wg.Done() + if obj := n.objectFromObjectsCacheOrNeoFS(ctx, p.Bucket, node.OID); obj != nil { + if oi := objectInfoFromMeta(p.Bucket, obj, p.Prefix, p.Delimiter); oi != nil { + select { + case <-ctx.Done(): + case objCh <- oi: + } + } + } + }) + if err != nil { + wg.Done() + n.log.Warn("failed to submit task to pool", zap.Error(err)) + } + }(node) + } + wg.Wait() + close(objCh) + pool.Release() + }() + + return objCh, nil } func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) { diff --git a/go.mod b/go.mod index ae77d611..d9747d97 100644 --- a/go.mod +++ b/go.mod @@ -11,10 +11,11 @@ require ( github.com/nspcc-dev/neo-go v0.98.2 github.com/nspcc-dev/neofs-api-go/v2 v2.13.0 github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.5 + github.com/panjf2000/ants/v2 v2.5.0 github.com/prometheus/client_golang v1.11.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.8.1 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.7.1 github.com/urfave/cli/v2 v2.3.0 go.uber.org/zap v1.18.1 golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd diff --git a/go.sum b/go.sum index 1729c153..3c27106e 100644 --- a/go.sum +++ b/go.sum @@ -376,6 +376,8 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/panjf2000/ants/v2 v2.5.0 h1:1rWGWSnxCsQBga+nQbA4/iY6VMeNoOIAM0ZWh9u3q2Q= +github.com/panjf2000/ants/v2 v2.5.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= @@ -446,8 +448,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/goleveldb v0.0.0-20180307113352-169b1b37be73/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=