diff --git a/api/cache/listmultipart.go b/api/cache/listmultipart.go
new file mode 100644
index 0000000..cfa43ed
--- /dev/null
+++ b/api/cache/listmultipart.go
@@ -0,0 +1,109 @@
+package cache
+
+import (
+ "fmt"
+ "time"
+
+ "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
+ "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
+ cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
+ "github.com/bluele/gcache"
+ "go.uber.org/zap"
+)
+
+type (
+ // ListMultipartSessionCache contains cache for list multiparts session (during pagination).
+ ListMultipartSessionCache struct {
+ cache gcache.Cache
+ logger *zap.Logger
+ }
+
+ // ListMultipartSessionKey is a key to find a ListMultipartSessionCache's entry.
+ ListMultipartSessionKey struct {
+ cid cid.ID
+ prefix string
+ marker string
+ uploadID string
+ }
+)
+
+const (
+ // DefaultListMultipartSessionCacheLifetime is a default lifetime of entries in cache of ListMultipartUploads.
+ DefaultListMultipartSessionCacheLifetime = time.Minute
+ // DefaultListMultipartSessionCacheSize is a default size of cache of ListMultipartUploads.
+ DefaultListMultipartSessionCacheSize = 100
+)
+
+// DefaultListMultipartSessionConfig returns new default cache expiration values.
+func DefaultListMultipartSessionConfig(logger *zap.Logger) *Config {
+ return &Config{
+ Size: DefaultListMultipartSessionCacheSize,
+ Lifetime: DefaultListMultipartSessionCacheLifetime,
+ Logger: logger,
+ }
+}
+
+func (k *ListMultipartSessionKey) String() string {
+ return k.cid.EncodeToString() + k.prefix + k.marker + k.uploadID
+}
+
+// NewListMultipartSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries.
+func NewListMultipartSessionCache(config *Config) *ListMultipartSessionCache {
+ gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(_ interface{}, val interface{}) {
+ session, ok := val.(*data.ListMultipartSession)
+ if !ok {
+ config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)),
+ zap.String("expected", fmt.Sprintf("%T", session)))
+ }
+
+ if !session.Acquired.Load() {
+ session.Cancel()
+ }
+ }).Build()
+ return &ListMultipartSessionCache{cache: gc, logger: config.Logger}
+}
+
+// GetListMultipartSession returns a session of ListMultipartUploads request.
+func (l *ListMultipartSessionCache) GetListMultipartSession(key ListMultipartSessionKey) *data.ListMultipartSession {
+ entry, err := l.cache.Get(key)
+ if err != nil {
+ return nil
+ }
+
+ result, ok := entry.(*data.ListMultipartSession)
+ if !ok {
+ l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
+ zap.String("expected", fmt.Sprintf("%T", result)))
+ return nil
+ }
+
+ return result
+}
+
+// PutListMultipartSession puts a ListMultipartUploads session info to cache.
+func (l *ListMultipartSessionCache) PutListMultipartSession(key ListMultipartSessionKey, session *data.ListMultipartSession) error {
+ s := l.GetListMultipartSession(key)
+ if s != nil && s != session {
+ if !s.Acquired.Load() {
+ s.Cancel()
+ }
+ }
+ return l.cache.Set(key, session)
+}
+
+// DeleteListMultipartSession removes key from cache.
+func (l *ListMultipartSessionCache) DeleteListMultipartSession(key ListMultipartSessionKey) {
+ l.cache.Remove(key)
+}
+
+// CreateListMultipartSessionCacheKey returns ListMultipartSessionKey with the given CID, prefix, marker and uploadID.
+func CreateListMultipartSessionCacheKey(cnr cid.ID, prefix, marker, uploadID string) ListMultipartSessionKey {
+ p := ListMultipartSessionKey{
+ cid: cnr,
+ prefix: prefix,
+ marker: marker,
+ uploadID: uploadID,
+ }
+
+ return p
+}
diff --git a/api/cache/listsession.go b/api/cache/listsession.go
index c8d0b68..a901d97 100644
--- a/api/cache/listsession.go
+++ b/api/cache/listsession.go
@@ -28,7 +28,7 @@ type (
const (
// DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects.
- DefaultListSessionCacheLifetime = time.Second * 60
+ DefaultListSessionCacheLifetime = time.Minute
// DefaultListSessionCacheSize is a default size of cache of ListObjects.
DefaultListSessionCacheSize = 100
)
diff --git a/api/data/listsession.go b/api/data/listsession.go
index a13f2e4..c3498f6 100644
--- a/api/data/listsession.go
+++ b/api/data/listsession.go
@@ -9,11 +9,25 @@ type VersionsStream interface {
Next(ctx context.Context) (*NodeVersion, error)
}
-type ListSession struct {
- Next []*ExtendedNodeVersion
- Stream VersionsStream
- NamesMap map[string]struct{}
+type CommonSession struct {
Context context.Context
Cancel context.CancelFunc
Acquired atomic.Bool
}
+
+type ListSession struct {
+ CommonSession
+ Next []*ExtendedNodeVersion
+ Stream VersionsStream
+ NamesMap map[string]struct{}
+}
+
+type MultipartInfoStream interface {
+ Next(ctx context.Context) (*MultipartInfo, error)
+}
+
+type ListMultipartSession struct {
+ CommonSession
+ Next *MultipartInfo
+ Stream MultipartInfoStream
+}
diff --git a/api/data/tree.go b/api/data/tree.go
index c75d936..a30f3cd 100644
--- a/api/data/tree.go
+++ b/api/data/tree.go
@@ -207,3 +207,9 @@ func (l LockInfo) UntilDate() string {
func (l LockInfo) IsCompliance() bool {
return l.isCompliance
}
+
+type MultipartStreamParams struct {
+ Prefix string
+ KeyMarker string
+ UploadIDMarker string
+}
diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go
index 628bba7..319c6d7 100644
--- a/api/handler/handlers_test.go
+++ b/api/handler/handlers_test.go
@@ -244,6 +244,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
Buckets: minCacheCfg,
System: minCacheCfg,
AccessControl: minCacheCfg,
+ MultipartList: minCacheCfg,
NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime},
}
}
diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go
index bb10927..644bcfb 100644
--- a/api/handler/multipart_upload.go
+++ b/api/handler/multipart_upload.go
@@ -513,7 +513,7 @@ func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Req
if maxUploadsStr != "" {
val, err := strconv.Atoi(maxUploadsStr)
- if err != nil || val < 1 || val > 1000 {
+ if err != nil || val < 1 || val > maxObjectList {
h.logAndSendError(ctx, w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
return
}
diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go
index 2831b3e..5dcacd8 100644
--- a/api/handler/multipart_upload_test.go
+++ b/api/handler/multipart_upload_test.go
@@ -266,6 +266,36 @@ func TestListMultipartUploads(t *testing.T) {
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID)
})
+ t.Run("check delimiter", func(t *testing.T) {
+ t.Run("not truncated", func(t *testing.T) {
+ listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", -1)
+ require.Len(t, listUploads.Uploads, 0)
+ require.Len(t, listUploads.CommonPrefixes, 2)
+ require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
+ require.Equal(t, "/zzz/", listUploads.CommonPrefixes[1].Prefix)
+ })
+
+ t.Run("truncated", func(t *testing.T) {
+ listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", 1)
+ require.Len(t, listUploads.Uploads, 0)
+ require.Len(t, listUploads.CommonPrefixes, 1)
+ require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
+ require.True(t, listUploads.IsTruncated)
+
+ listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
+ require.Len(t, listUploads.Uploads, 0)
+ require.Len(t, listUploads.CommonPrefixes, 1)
+ require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
+ require.True(t, listUploads.IsTruncated)
+
+ listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
+ require.Len(t, listUploads.Uploads, 0)
+ require.Len(t, listUploads.CommonPrefixes, 1)
+ require.Equal(t, "/zzz/", listUploads.CommonPrefixes[0].Prefix)
+ require.False(t, listUploads.IsTruncated)
+ })
+ })
+
t.Run("check markers", func(t *testing.T) {
t.Run("check only key-marker", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "", "", "", objName2, -1)
@@ -294,6 +324,58 @@ func TestListMultipartUploads(t *testing.T) {
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
})
})
+
+ t.Run("check next markers", func(t *testing.T) {
+ t.Run("check both next-key-marker and next-upload-id-marker", func(t *testing.T) {
+ listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1)
+ require.True(t, listUploads.IsTruncated)
+ require.Len(t, listUploads.Uploads, 1)
+ require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker)
+ require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker)
+ require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
+ require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key)
+
+ listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
+ require.True(t, listUploads.IsTruncated)
+ require.Len(t, listUploads.Uploads, 1)
+ require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker)
+ require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker)
+ require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID)
+ require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key)
+
+ listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
+ require.False(t, listUploads.IsTruncated)
+ require.Len(t, listUploads.Uploads, 1)
+ require.Empty(t, listUploads.NextUploadIDMarker)
+ require.Empty(t, listUploads.NextKeyMarker)
+ require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
+ })
+
+ t.Run("check only next-key-marker", func(t *testing.T) {
+ listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1)
+ require.True(t, listUploads.IsTruncated)
+ require.Len(t, listUploads.Uploads, 1)
+ require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker)
+ require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker)
+ require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
+ require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key)
+
+ listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1)
+ require.True(t, listUploads.IsTruncated)
+ require.Len(t, listUploads.Uploads, 1)
+ require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker)
+ require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker)
+ require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID)
+ require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key)
+
+ listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1)
+ require.False(t, listUploads.IsTruncated)
+ require.Len(t, listUploads.Uploads, 1)
+ require.Empty(t, listUploads.NextUploadIDMarker)
+ require.Empty(t, listUploads.NextKeyMarker)
+ require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
+ })
+ })
}
func TestMultipartUploadSize(t *testing.T) {
diff --git a/api/layer/cache.go b/api/layer/cache.go
index c3ceb7c..ecf5ac4 100644
--- a/api/layer/cache.go
+++ b/api/layer/cache.go
@@ -12,15 +12,16 @@ import (
)
type Cache struct {
- logger *zap.Logger
- listsCache *cache.ObjectsListCache
- sessionListCache *cache.ListSessionCache
- objCache *cache.ObjectsCache
- namesCache *cache.ObjectsNameCache
- bucketCache *cache.BucketCache
- systemCache *cache.SystemCache
- accessCache *cache.AccessControlCache
- networkInfoCache *cache.NetworkInfoCache
+ logger *zap.Logger
+ listsCache *cache.ObjectsListCache
+ sessionListCache *cache.ListSessionCache
+ objCache *cache.ObjectsCache
+ namesCache *cache.ObjectsNameCache
+ bucketCache *cache.BucketCache
+ systemCache *cache.SystemCache
+ accessCache *cache.AccessControlCache
+ networkInfoCache *cache.NetworkInfoCache
+ sessionMultipartCache *cache.ListMultipartSessionCache
}
// CachesConfig contains params for caches.
@@ -33,6 +34,7 @@ type CachesConfig struct {
Buckets *cache.Config
System *cache.Config
AccessControl *cache.Config
+ MultipartList *cache.Config
NetworkInfo *cache.NetworkInfoCacheConfig
}
@@ -48,20 +50,22 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
System: cache.DefaultSystemConfig(logger),
AccessControl: cache.DefaultAccessControlConfig(logger),
NetworkInfo: cache.DefaultNetworkInfoConfig(logger),
+ MultipartList: cache.DefaultListMultipartSessionConfig(logger),
}
}
func NewCache(cfg *CachesConfig) *Cache {
return &Cache{
- logger: cfg.Logger,
- listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
- sessionListCache: cache.NewListSessionCache(cfg.SessionList),
- objCache: cache.New(cfg.Objects),
- namesCache: cache.NewObjectsNameCache(cfg.Names),
- bucketCache: cache.NewBucketCache(cfg.Buckets),
- systemCache: cache.NewSystemCache(cfg.System),
- accessCache: cache.NewAccessControlCache(cfg.AccessControl),
- networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo),
+ logger: cfg.Logger,
+ listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
+ sessionListCache: cache.NewListSessionCache(cfg.SessionList),
+ objCache: cache.New(cfg.Objects),
+ namesCache: cache.NewObjectsNameCache(cfg.Names),
+ bucketCache: cache.NewBucketCache(cfg.Buckets),
+ systemCache: cache.NewSystemCache(cfg.System),
+ accessCache: cache.NewAccessControlCache(cfg.AccessControl),
+ networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo),
+ sessionMultipartCache: cache.NewListMultipartSessionCache(cfg.MultipartList),
}
}
@@ -161,6 +165,14 @@ func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.Li
return c.sessionListCache.GetListSession(key)
}
+func (c *Cache) GetListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) *data.ListMultipartSession {
+ if !c.accessCache.Get(owner, key.String()) {
+ return nil
+ }
+
+ return c.sessionMultipartCache.GetListMultipartSession(key)
+}
+
func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) {
if err := c.sessionListCache.PutListSession(key, session); err != nil {
c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err))
@@ -176,6 +188,21 @@ func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) {
c.accessCache.Delete(owner, key.String())
}
+func (c *Cache) PutListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey, session *data.ListMultipartSession) {
+ if err := c.sessionMultipartCache.PutListMultipartSession(key, session); err != nil {
+ c.logger.Warn(logs.CouldntCacheListMultipartSession, zap.Error(err))
+ }
+
+ if err := c.accessCache.Put(owner, key.String()); err != nil {
+ c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
+ }
+}
+
+func (c *Cache) DeleteListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) {
+ c.sessionMultipartCache.DeleteListMultipartSession(key)
+ c.accessCache.Delete(owner, key.String())
+}
+
func (c *Cache) GetTagging(owner user.ID, key string) map[string]string {
if !c.accessCache.Get(owner, key) {
return nil
diff --git a/api/layer/listing.go b/api/layer/listing.go
index 790243b..630a15d 100644
--- a/api/layer/listing.go
+++ b/api/layer/listing.go
@@ -79,7 +79,8 @@ type (
Prefix string
MaxKeys int
Marker string
- Bookmark string
+ // Bookmark contains Marker or ContinuationToken and is used for pagination and as part of a cache key for list session.
+ Bookmark string
}
commonLatestVersionsListingParams struct {
@@ -193,11 +194,10 @@ func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
return nil, nil, nil
}
- session, err := n.getListLatestVersionsSession(ctx, p)
+ session, err := n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
if err != nil {
return nil, nil, err
}
-
generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator)
if err != nil {
@@ -230,7 +230,7 @@ func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListi
return nil, false, nil
}
- session, err := n.getListAllVersionsSession(ctx, p)
+ session, err := n.getListVersionsSession(ctx, p, false)
if err != nil {
return nil, false, err
}
@@ -301,48 +301,31 @@ func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int,
}
}
-func (n *Layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) {
- return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
-}
-
-func (n *Layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
- return n.getListVersionsSession(ctx, p, false)
-}
-
-func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
+func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
- session := n.cache.GetListSession(owner, cacheKey)
- if session == nil {
- return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
+ session = n.cache.GetListSession(owner, cacheKey)
+ if session == nil || session.Acquired.Swap(true) {
+ session = n.newSession(ctx)
+ session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
+ return session, err
}
- if session.Acquired.Swap(true) {
- return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
- }
-
- // after reading next object from stream in session
- // the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
-
return session, nil
}
-func (n *Layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
- session = &data.ListSession{NamesMap: make(map[string]struct{})}
+func (n *Layer) newSession(ctx context.Context) *data.ListSession {
+ session := &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
+ // save access box data for next requests
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
}
- session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
- if err != nil {
- return nil, err
- }
-
- return session, nil
+ return session
}
func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go
index ed7612c..2639f69 100644
--- a/api/layer/multipart_upload.go
+++ b/api/layer/multipart_upload.go
@@ -2,6 +2,7 @@ package layer
import (
"bytes"
+ "cmp"
"context"
"crypto/md5"
"encoding/base64"
@@ -10,17 +11,20 @@ import (
"errors"
"fmt"
"io"
+ "slices"
"sort"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
+ "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
+ "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@@ -499,47 +503,65 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
return &result, nil
}
- multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix)
+ session, err := n.getListMultipartUploadsSession(ctx, p)
if err != nil {
return nil, err
}
- uploads := make([]*UploadInfo, 0, len(multipartInfos))
+ uploads := make([]*UploadInfo, 0, p.MaxUploads)
uniqDirs := make(map[string]struct{})
+ uploadsCount := 0
+ if session.Next != nil {
+ upload := uploadInfoFromMultipartInfo(session.Next, p.Prefix, p.Delimiter)
+ switch {
+ case upload.IsDir && isUniqDir(upload.Key, uniqDirs):
+ uniqDirs[upload.Key] = struct{}{}
+ fallthrough
+ case !upload.IsDir:
+ uploads = append(uploads, upload)
+ uploadsCount++
+ }
+ }
- for _, multipartInfo := range multipartInfos {
- info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter)
- if info != nil {
- if info.IsDir {
- if _, ok := uniqDirs[info.Key]; ok {
- continue
- }
- uniqDirs[info.Key] = struct{}{}
+ info := session.Next
+ for uploadsCount < p.MaxUploads {
+ info, err = session.Stream.Next(ctx)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ break
}
- uploads = append(uploads, info)
+ n.reqLogger(ctx).Warn(logs.CouldNotGetMultipartUploadInfo, zap.Error(err))
+ continue
}
+ upload := uploadInfoFromMultipartInfo(info, p.Prefix, p.Delimiter)
+ if upload.IsDir {
+ if !isUniqDir(upload.Key, uniqDirs) {
+ continue
+ }
+ uniqDirs[upload.Key] = struct{}{}
+ }
+ uploads = append(uploads, upload)
+ uploadsCount++
}
- sort.Slice(uploads, func(i, j int) bool {
- if uploads[i].Key == uploads[j].Key {
- return uploads[i].UploadID < uploads[j].UploadID
- }
- return uploads[i].Key < uploads[j].Key
- })
-
- if p.KeyMarker != "" {
- if p.UploadIDMarker != "" {
- uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads)
+ isTruncated := true
+ next, err := session.Stream.Next(ctx)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ isTruncated = false
} else {
- uploads = trimAfterUploadKey(p.KeyMarker, uploads)
+ return nil, err
}
}
- if len(uploads) > p.MaxUploads {
+ if isTruncated && info != nil {
+ // put to session redundant multipart upload which we read to check for EOF
+ session.Next = next
result.IsTruncated = true
- uploads = uploads[:p.MaxUploads]
- result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID
- result.NextKeyMarker = uploads[len(uploads)-1].Key
+ result.NextUploadIDMarker = info.UploadID
+ result.NextKeyMarker = info.Key
+ cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, info.Key, info.UploadID)
+ n.putListMultipartUploadsSession(ctx, session, cacheKey)
}
for _, ov := range uploads {
@@ -550,9 +572,62 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
}
}
+ slices.SortFunc(result.Uploads, func(a, b *UploadInfo) int {
+ keyCmp := cmp.Compare(a.Key, b.Key)
+ if keyCmp == 0 {
+ return cmp.Compare(a.UploadID, b.UploadID)
+ }
+
+ return keyCmp
+ })
+
return &result, nil
}
+func (n *Layer) putListMultipartUploadsSession(ctx context.Context, session *data.ListMultipartSession, cacheKey cache.ListMultipartSessionKey) {
+ session.Acquired.Store(false)
+ n.cache.PutListMultipartSession(n.BearerOwner(ctx), cacheKey, session)
+}
+
+func (n *Layer) getListMultipartUploadsSession(ctx context.Context, p *ListMultipartUploadsParams) (session *data.ListMultipartSession, err error) {
+ owner := n.BearerOwner(ctx)
+ cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, p.KeyMarker, p.UploadIDMarker)
+ session = n.cache.GetListMultipartSession(owner, cacheKey)
+ if session == nil || session.Acquired.Swap(true) {
+ session = newListMultipartSession(ctx)
+ params := data.MultipartStreamParams{
+ Prefix: p.Prefix,
+ KeyMarker: p.KeyMarker,
+ UploadIDMarker: p.UploadIDMarker,
+ }
+ session.Stream, err = n.treeService.GetMultipartUploadsByPrefix(session.Context, p.Bkt, params)
+ if err != nil {
+ return nil, err
+ }
+ }
+ // if after reading next object from stream in session the current cache value already
+ // doesn't match with next token in cache key
+ n.cache.DeleteListMultipartSession(owner, cacheKey)
+
+ return session, nil
+}
+
+func newListMultipartSession(ctx context.Context) *data.ListMultipartSession {
+ reqCtx, cancel := context.WithCancel(context.Background())
+ session := &data.ListMultipartSession{
+ CommonSession: data.CommonSession{
+ Context: reqCtx,
+ Cancel: cancel,
+ },
+ }
+
+ // save access box data for next requests
+ if bd, err := middleware.GetBoxData(ctx); err == nil {
+ session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
+ }
+ return session
+}
+
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
multipartInfo, parts, err := n.getUploadParts(ctx, p)
if err != nil {
@@ -677,44 +752,10 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
return multipartInfo, res, nil
}
-func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo {
- var res []*UploadInfo
- if len(uploads) != 0 && uploads[len(uploads)-1].Key < key {
- return res
- }
-
- for _, obj := range uploads {
- if obj.Key >= key && obj.UploadID > id {
- res = append(res, obj)
- }
- }
-
- return res
-}
-
-func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
- var result []*UploadInfo
- if len(objects) != 0 && objects[len(objects)-1].Key <= key {
- return result
- }
- for i, obj := range objects {
- if obj.Key > key {
- result = objects[i:]
- break
- }
- }
-
- return result
-}
-
func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo {
var isDir bool
key := uploadInfo.Key
- if !strings.HasPrefix(key, prefix) {
- return nil
- }
-
if len(delimiter) > 0 {
tail := strings.TrimPrefix(key, prefix)
index := strings.Index(tail, delimiter)
@@ -732,3 +773,10 @@ func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimit
Created: uploadInfo.Created,
}
}
+
+func isUniqDir(key string, uniqDirs map[string]struct{}) bool {
+ if _, ok := uniqDirs[key]; ok {
+ return false
+ }
+ return true
+}
diff --git a/api/layer/multipart_upload_test.go b/api/layer/multipart_upload_test.go
deleted file mode 100644
index 5d21523..0000000
--- a/api/layer/multipart_upload_test.go
+++ /dev/null
@@ -1,108 +0,0 @@
-package layer
-
-import (
- "sort"
- "testing"
-
- "github.com/stretchr/testify/require"
-)
-
-func TestTrimAfterUploadIDAndKey(t *testing.T) {
- uploads := []*UploadInfo{
- {Key: "j", UploadID: "k"}, // key < id <
- {Key: "l", UploadID: "p"}, // key < id >
- {Key: "n", UploadID: "m"}, // key = id <
- {Key: "n", UploadID: "o"}, // pivot
- {Key: "n", UploadID: "q"}, // key = id >
- {Key: "p", UploadID: "h"}, // key > id <
- {Key: "q", UploadID: "r"}, // key > id >
- }
- expectedUploadsListsIndexes := [][]int{
- {1, 2, 3, 4, 6},
- {4, 6},
- {3, 4, 6},
- {4, 6},
- {6},
- {6},
- {},
- }
-
- sort.Slice(uploads, func(i, j int) bool {
- if uploads[i].Key == uploads[j].Key {
- return uploads[i].UploadID < uploads[j].UploadID
- }
- return uploads[i].Key < uploads[j].Key
- })
-
- length := len(uploads)
-
- t.Run("the last element's key is less, upload id is less", func(t *testing.T) {
- keys := trimAfterUploadIDAndKey("z", "a", uploads)
- require.Empty(t, keys)
- require.Len(t, uploads, length)
- })
-
- t.Run("the last element's key is less, upload id is greater", func(t *testing.T) {
- keys := trimAfterUploadIDAndKey("z", "a", uploads)
- require.Empty(t, keys)
- require.Len(t, uploads, length)
- })
-
- t.Run("check for uploads", func(t *testing.T) {
- for i, u := range uploads {
- list := trimAfterUploadIDAndKey(u.Key, u.UploadID, uploads)
- require.Equal(t, len(list), len(expectedUploadsListsIndexes[i]))
- for j, idx := range expectedUploadsListsIndexes[i] {
- require.Equal(t, list[j], uploads[idx])
- }
- }
- })
-}
-
-func TestTrimAfterUploadKey(t *testing.T) {
- var (
- uploadKeys = []string{"e", "f", "f", "g", "h", "i"}
- theSameKeyIdx = []int{1, 2}
- diffKeyIdx = []int{0, 3}
- lastIdx = len(uploadKeys) - 1
- )
-
- uploadsInfos := make([]*UploadInfo, 0, len(uploadKeys))
- for _, k := range uploadKeys {
- uploadsInfos = append(uploadsInfos, &UploadInfo{Key: k})
- }
-
- t.Run("empty list", func(t *testing.T) {
- keys := trimAfterUploadKey("f", []*UploadInfo{})
- require.Len(t, keys, 0)
- })
-
- t.Run("the last element is less than a key", func(t *testing.T) {
- keys := trimAfterUploadKey("j", uploadsInfos)
- require.Empty(t, keys)
- require.Len(t, uploadsInfos, len(uploadKeys))
- })
-
- t.Run("different keys in sequence", func(t *testing.T) {
- for _, i := range diffKeyIdx {
- keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos)
- require.Len(t, keys, len(uploadKeys)-i-1)
- require.Equal(t, keys, uploadsInfos[i+1:])
- require.Len(t, uploadsInfos, len(uploadKeys))
- }
- })
-
- t.Run("the same keys in the sequence first element", func(t *testing.T) {
- for _, i := range theSameKeyIdx {
- keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos)
- require.Len(t, keys, 3)
- require.Equal(t, keys, uploadsInfos[3:])
- require.Len(t, uploadsInfos, len(uploadKeys))
- }
- })
-
- t.Run("last element", func(t *testing.T) {
- keys := trimAfterUploadKey(uploadKeys[lastIdx], uploadsInfos)
- require.Empty(t, keys)
- })
-}
diff --git a/api/layer/tree/tree_service.go b/api/layer/tree/tree_service.go
index db079b1..94470fb 100644
--- a/api/layer/tree/tree_service.go
+++ b/api/layer/tree/tree_service.go
@@ -53,7 +53,7 @@ type Service interface {
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
- GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
+ GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error)
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
// AddPart puts a node to a system tree as a child of appropriate multipart upload
diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go
index 577f406..cd5a3ec 100644
--- a/api/layer/tree_mock.go
+++ b/api/layer/tree_mock.go
@@ -328,7 +328,7 @@ func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data
return nil
}
-func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, string) ([]*data.MultipartInfo, error) {
+func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, data.MultipartStreamParams) (data.MultipartInfoStream, error) {
panic("implement me")
}
diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go
index 1ac2b30..caa6810 100644
--- a/cmd/s3-gw/app.go
+++ b/cmd/s3-gw/app.go
@@ -1007,6 +1007,8 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime)
+ cacheCfg.MultipartList.Lifetime = fetchCacheLifetime(v, l, cfgMultipartListCacheLifetime, cacheCfg.MultipartList.Lifetime)
+ cacheCfg.MultipartList.Size = fetchCacheSize(v, l, cfgMultipartListCacheSize, cacheCfg.MultipartList.Size)
return cacheCfg
}
diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go
index 691e231..4309727 100644
--- a/cmd/s3-gw/app_settings.go
+++ b/cmd/s3-gw/app_settings.go
@@ -141,6 +141,8 @@ const ( // Settings.
cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime"
cfgFrostfsIDCacheSize = "cache.frostfsid.size"
cfgNetworkInfoCacheLifetime = "cache.network_info.lifetime"
+ cfgMultipartListCacheLifetime = "cache.multipart_list_session.lifetime"
+ cfgMultipartListCacheSize = "cache.multipart_list_session.size"
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"
diff --git a/config/config.env b/config/config.env
index ef6a27b..706d10f 100644
--- a/config/config.env
+++ b/config/config.env
@@ -101,9 +101,12 @@ S3_GW_CACHE_OBJECTS_SIZE=1000000
# Cache which keeps lists of objects in buckets
S3_GW_CACHE_LIST_LIFETIME=1m
S3_GW_CACHE_LIST_SIZE=100000
-# Cache which keeps listing session
+# Cache which keeps listing objects session
S3_GW_CACHE_LIST_SESSION_LIFETIME=1m
S3_GW_CACHE_LIST_SESSION_SIZE=100
+# Cache which keeps listing multipart uploads session
+S3_GW_CACHE_MULTIPART_LIST_SESSION_LIFETIME=1m
+S3_GW_CACHE_MULTIPART_LIST_SESSION_SIZE=100
# Cache which contains mapping of bucket name to bucket info
S3_GW_CACHE_BUCKETS_LIFETIME=1m
S3_GW_CACHE_BUCKETS_SIZE=1000
diff --git a/config/config.yaml b/config/config.yaml
index 051f5f7..539ec63 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -129,6 +129,9 @@ cache:
list_session:
lifetime: 1m
size: 100
+ multipart_list_session:
+ lifetime: 1m
+ size: 100
# Cache which contains mapping of nice name to object addresses
names:
lifetime: 1m
diff --git a/docs/configuration.md b/docs/configuration.md
index 70b2f33..6fbd5f1 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -426,6 +426,9 @@ cache:
list_session:
lifetime: 1m
size: 100
+ multipart_list_session:
+ lifetime: 1m
+ size: 10000
names:
lifetime: 1m
size: 1000
@@ -452,19 +455,20 @@ cache:
lifetime: 1m
```
-| Parameter | Type | Default value | Description |
-|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
-| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 1000000` | Cache for objects (FrostFS headers). |
-| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100000` | Cache which keeps lists of objects in buckets. |
-| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing session. |
-| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 10000` | Cache which contains mapping of nice name to object addresses. |
-| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
-| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 10000` | Cache for system objects in a bucket: bucket settings etc. |
-| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`
`size: 100` | Cache which stores access box with tokens by its address. |
-| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 100000` | Cache which stores owner to cache operation mapping. |
-| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores list of policy chains. |
-| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores FrostfsID subject info. |
-| `network_info` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores network info. |
+| Parameter | Type | Default value | Description |
+|--------------------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
+| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 1000000` | Cache for objects (FrostFS headers). |
+| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100000` | Cache which keeps lists of objects in buckets. |
+| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing session. |
+| `multipart_list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing of multipart uploads. |
+| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 10000` | Cache which contains mapping of nice name to object addresses. |
+| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
+| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 10000` | Cache for system objects in a bucket: bucket settings etc. |
+| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`
`size: 100` | Cache which stores access box with tokens by its address. |
+| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 100000` | Cache which stores owner to cache operation mapping. |
+| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores list of policy chains. |
+| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores FrostfsID subject info. |
+| `network_info` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores network info. |
#### `cache` subsection
diff --git a/internal/logs/logs.go b/internal/logs/logs.go
index 3da1255..ea77cad 100644
--- a/internal/logs/logs.go
+++ b/internal/logs/logs.go
@@ -8,6 +8,8 @@ const (
UpdateAccessCredObjectIntoFrostFS = "update access cred object into FrostFS" // Info in ../../authmate/authmate.go
MetricsAreDisabled = "metrics are disabled" // Warn in ../../metrics/app.go
FoundMoreThanOneUnversionedNode = "found more than one unversioned node" // Debug in ../../pkg/service/tree/tree.go
+ CouldNotParseTreeNode = "could not parse tree node" // Error in ../../pkg/service/tree/tree.go
+ CouldNotFormFilePath = "could not form file path" // Error in ../../pkg/service/tree/tree.go
ServiceIsRunning = "service is running" // Info in ../../cmd/s3-gw/service.go
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../cmd/s3-gw/service.go
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../cmd/s3-gw/service.go
@@ -67,6 +69,7 @@ const (
CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go
CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go
MismatchedObjEncryptionInfo = "mismatched obj encryptionInfo" // Warn in ../../api/layer/multipart_upload.go
+ CouldNotGetMultipartUploadInfo = "could not get multipart upload info" // Warn in ../../api/layer/multipart_upload.go
UploadPart = "upload part" // Debug in ../../api/layer/multipart_upload.go
CouldntDeleteOldPartObject = "couldn't delete old part object" // Error in ../../api/layer/multipart_upload.go
CouldNotPutCompletedObject = "could not put a completed object (multipart upload)" // Error in ../../api/layer/multipart_upload.go
@@ -89,6 +92,7 @@ const (
CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go
CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go
CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go
+ CouldntCacheListMultipartSession = "couldn't cache list multipart session" // Warn in ../../api/layer/cache.go
CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go
diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go
index 438b129..4b3e538 100644
--- a/pkg/service/tree/tree.go
+++ b/pkg/service/tree/tree.go
@@ -84,7 +84,9 @@ var (
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
ErrGatewayTimeout = frostfs.ErrGatewayTimeout
- errNodeDoesntContainFileName = fmt.Errorf("node doesn't contain FileName")
+ errNodeDoesntContainFileName = errors.New("node doesn't contain FileName")
+
+ errParentPathNotFound = errors.New("couldn't get parent path")
)
const (
@@ -1061,7 +1063,7 @@ func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *
var filepath string
if !s.intermediateRootID.Equal(trNode.ID) {
- if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil {
+ if filepath, err = formFilePath(trNode, fileName, s.namesMap); err != nil {
return nil, false, fmt.Errorf("invalid node order: %w", err)
}
} else {
@@ -1165,58 +1167,18 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr
return intermediateNodes, nil
}
-func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
- rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
+func (c *Tree) getSubTreeByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, []uint64, error) {
+ rootID, _, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
- return nil, "", nil
+ return nil, nil, nil
}
- return nil, "", err
+ return nil, nil, err
}
- subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2, false)
- if err != nil {
- if errors.Is(err, tree.ErrNodeNotFound) {
- return nil, "", nil
- }
- return nil, "", err
- }
+ stream, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
- nodesMap := make(map[string][]NodeResponse, len(subTree))
- for _, node := range subTree {
- if MultiID(rootID).Equal(node.GetNodeID()) {
- continue
- }
-
- fileName := getFilename(node)
- if !strings.HasPrefix(fileName, tailPrefix) {
- continue
- }
-
- nodes := nodesMap[fileName]
-
- // Add all nodes if flag latestOnly is false.
- // Add all intermediate nodes
- // and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
- if len(nodes) == 0 {
- nodes = []NodeResponse{node}
- } else if !latestOnly || isIntermediate(node) {
- nodes = append(nodes, node)
- } else if isIntermediate(nodes[0]) {
- nodes = append([]NodeResponse{node}, nodes...)
- } else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
- nodes[0] = node
- }
-
- nodesMap[fileName] = nodes
- }
-
- result := make([]NodeResponse, 0, len(subTree))
- for _, nodes := range nodesMap {
- result = append(result, nodes...)
- }
-
- return result, strings.TrimSuffix(prefix, tailPrefix), nil
+ return stream, rootID, err
}
func getFilename(node NodeResponse) string {
@@ -1237,20 +1199,19 @@ func isIntermediate(node NodeResponse) bool {
return node.GetMeta()[0].GetKey() == FileNameKey
}
-func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) {
- var filepath string
+func formFilePath(node *treeNode, fileName string, namesMap map[uint64]string) (string, error) {
+ var filePath string
- for i, id := range node.GetParentID() {
+ for i, id := range node.ParentID {
parentPath, ok := namesMap[id]
if !ok {
- return "", fmt.Errorf("couldn't get parent path")
+ return "", errParentPathNotFound
}
-
- filepath = parentPath + separator + fileName
- namesMap[node.GetNodeID()[i]] = filepath
+ filePath = parentPath + separator + fileName
+ namesMap[node.ID[i]] = filePath
}
- return filepath, nil
+ return filePath, nil
}
func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
@@ -1267,10 +1228,6 @@ func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
return tNode, fileName, nil
}
-func formLatestNodeKey(parentID uint64, fileName string) string {
- return strconv.FormatUint(parentID, 10) + "." + fileName
-}
-
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
}
@@ -1313,84 +1270,137 @@ func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketIn
return err
}
-func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
- subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
- if err != nil {
- return nil, err
- }
-
- var result []*data.MultipartInfo
- for _, node := range subTreeNodes {
- multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix)
- if err != nil {
- return nil, err
- }
- result = append(result, multipartUploads...)
- }
-
- return result, nil
+type multipartInfoStream struct {
+ log *zap.Logger
+ nodePaths map[uint64]string
+ rootID MultiID
+ // childStream stream of children nodes of prefix node.
+ childStream SubTreeStream
+ // currentStream stream of children's nodes with max depth.
+ currentStream SubTreeStream
+ treeService ServiceClient
+ bktInfo *data.BucketInfo
+ uploadID string
+ keyMarker string
+ headPrefix string
+ prefix string
}
-func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
- // sorting in getSubTree leads to skipping nodes that doesn't have FileName attribute
- // so when we are only interested in multipart nodes, we can set this flag
- // (despite we sort multiparts in above layer anyway)
- // to skip its children (parts) that don't have FileName
- subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth, true)
+func (m *multipartInfoStream) Next(ctx context.Context) (*data.MultipartInfo, error) {
+ var tNode *treeNode
+ var filePath string
+
+ if m.currentStream == nil {
+ var err error
+ if m.currentStream, err = m.openNewStream(ctx); err != nil {
+ return nil, err
+ }
+ }
+ for {
+ var err error
+ tNode, err = getTreeNodeFromStream(m.currentStream)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ if m.currentStream, err = m.openNewStream(ctx); err != nil {
+ return nil, err
+ }
+ continue
+ }
+ return nil, err
+ }
+ var ok bool
+ if filePath, ok = m.checkTreeNode(tNode); ok {
+ break
+ }
+ }
+
+ return newMultipartInfoFromTreeNode(m.log, filePath, tNode)
+}
+
+// openNewStream creates subtree stream from childStream`s node.
+func (m *multipartInfoStream) openNewStream(ctx context.Context) (SubTreeStream, error) {
+ node, err := getTreeNodeFromStream(m.childStream)
if err != nil {
return nil, err
}
-
- var parentPrefix string
- if parentFilePath != "" { // The root of subTree can also have a parent
- parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
+ if m.rootID.Equal(node.ID) {
+ // skip root node
+ return m.openNewStream(ctx)
}
+ stream, err := m.treeService.GetSubTreeStream(ctx, m.bktInfo, systemTree, node.ID, maxGetSubTreeDepth)
+ if err != nil {
+ return nil, err
+ }
+ return stream, nil
+}
- var filepath string
- namesMap := make(map[uint64]string, len(subTree))
- multiparts := make(map[string][]*data.MultipartInfo, len(subTree))
+func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error) {
+ node, err := stream.Next()
+ if err != nil {
+ return nil, err
+ }
+ tNode, err := newTreeNode(node)
+ if err != nil {
+ return nil, err
+ }
+ return tNode, nil
+}
- for i, node := range subTree {
- tNode, fileName, err := parseTreeNode(node)
- if err != nil {
- continue
- }
+func (m *multipartInfoStream) checkTreeNode(tNode *treeNode) (string, bool) {
+ var ok bool
+ var err error
- if i != 0 {
- if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
- return nil, fmt.Errorf("invalid node order: %w", err)
- }
- } else {
- filepath = parentPrefix + fileName
- for _, id := range tNode.ID {
- namesMap[id] = filepath
- }
- }
-
- multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode)
- if err != nil || multipartInfo.Finished {
- continue
- }
-
- for _, id := range node.GetParentID() {
- key := formLatestNodeKey(id, fileName)
- multipartInfos, ok := multiparts[key]
- if !ok {
- multipartInfos = []*data.MultipartInfo{multipartInfo}
- } else {
- multipartInfos = append(multipartInfos, multipartInfo)
- }
-
- multiparts[key] = multipartInfos
+ if tNode.IsSplit() {
+ return "", false
+ }
+ fileName, ok := tNode.FileName()
+ if !ok {
+ return "", false
+ }
+ filePath, err := formFilePath(tNode, fileName, m.nodePaths)
+ if err != nil {
+ filePath = fileName
+ m.nodePaths[tNode.ID[0]] = filePath
+ }
+ filePath = m.headPrefix + filePath
+ if !strings.HasPrefix(filePath, m.prefix) {
+ return "", false
+ }
+ if _, ok = tNode.Meta[finishedKV]; ok {
+ return "", false
+ }
+ if id, ok := tNode.Meta[uploadIDKV]; ok {
+ if filePath > m.keyMarker || (filePath == m.keyMarker && m.uploadID != "" && id > m.uploadID) {
+ return filePath, true
}
}
- result := make([]*data.MultipartInfo, 0, len(multiparts))
- for _, multipartInfo := range multiparts {
- result = append(result, multipartInfo...)
+ return "", false
+}
+
+func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error) {
+ stream, rootID, err := c.getSubTreeByPrefixStream(ctx, bktInfo, systemTree, params.Prefix)
+ if err != nil {
+ if errors.Is(err, tree.ErrNodeNotFound) {
+ return nil, nil
+ }
+ return nil, err
}
- return result, nil
+ return &multipartInfoStream{
+ log: c.reqLogger(ctx),
+ rootID: rootID,
+ childStream: stream,
+ nodePaths: make(map[uint64]string),
+ treeService: c.service,
+ bktInfo: bktInfo,
+ uploadID: params.UploadIDMarker,
+ keyMarker: params.KeyMarker,
+ prefix: params.Prefix,
+ headPrefix: strings.TrimRightFunc(params.Prefix, func(r rune) bool {
+ return r != '/'
+ }),
+ }, nil
}
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {
diff --git a/pkg/service/tree/tree_test.go b/pkg/service/tree/tree_test.go
index 058e46a..c9f4e94 100644
--- a/pkg/service/tree/tree_test.go
+++ b/pkg/service/tree/tree_test.go
@@ -12,6 +12,7 @@ import (
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
+ "go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
@@ -359,3 +360,127 @@ func TestSplitTreeMultiparts(t *testing.T) {
require.NoError(t, err)
require.Len(t, parts, 1)
}
+
+func TestCheckTreeNode(t *testing.T) {
+ treeNodes := []*treeNode{
+ // foo/
+ {
+ ID: []uint64{1},
+ ParentID: []uint64{0},
+ TimeStamp: []uint64{1},
+ Meta: map[string]string{
+ "FileName": "foo",
+ },
+ },
+ // foo/ant
+ {
+ ID: []uint64{2},
+ ParentID: []uint64{1},
+ TimeStamp: []uint64{1},
+ Meta: map[string]string{
+ "FileName": "ant",
+ "UploadId": "d",
+ },
+ },
+ // foo/bar
+ {
+ ID: []uint64{3},
+ ParentID: []uint64{1},
+ TimeStamp: []uint64{1},
+ Meta: map[string]string{
+ "FileName": "bar",
+ "UploadId": "c",
+ },
+ },
+ // foo/finished
+ {
+ ID: []uint64{4},
+ ParentID: []uint64{1},
+ TimeStamp: []uint64{1},
+ Meta: map[string]string{
+ "FileName": "finished",
+ "UploadId": "e",
+ "Finished": "True",
+ },
+ },
+ // hello/
+ {
+ ID: []uint64{5},
+ ParentID: []uint64{0},
+ TimeStamp: []uint64{1},
+ Meta: map[string]string{
+ "FileName": "hello",
+ },
+ },
+ // hello/world
+ {
+ ID: []uint64{6},
+ ParentID: []uint64{5},
+ TimeStamp: []uint64{1},
+ Meta: map[string]string{
+ "FileName": "world",
+ "UploadId": "a",
+ },
+ },
+ // hello/world
+ {
+ ID: []uint64{7},
+ ParentID: []uint64{5},
+ TimeStamp: []uint64{1},
+ Meta: map[string]string{
+ "FileName": "world",
+ "UploadId": "b",
+ },
+ },
+ }
+
+ info := multipartInfoStream{
+ log: zap.NewNop(),
+ rootID: []uint64{0},
+ }
+
+ t.Run("without markers", func(t *testing.T) {
+ info.nodePaths = make(map[uint64]string)
+ results := make([]bool, 0, len(treeNodes))
+ for _, node := range treeNodes {
+ _, valid := info.checkTreeNode(node)
+ results = append(results, valid)
+ }
+ require.Equal(t, []bool{false, true, true, false, false, true, true}, results)
+ })
+
+ t.Run("with prefix", func(t *testing.T) {
+ info.nodePaths = make(map[uint64]string)
+ info.prefix = "hello"
+ info.headPrefix = ""
+ results := make([]bool, 0, len(treeNodes))
+ for _, node := range treeNodes {
+ _, valid := info.checkTreeNode(node)
+ results = append(results, valid)
+ }
+ require.Equal(t, []bool{false, false, false, false, false, true, true}, results)
+ })
+
+ t.Run("with key marker", func(t *testing.T) {
+ info.nodePaths = make(map[uint64]string)
+ info.keyMarker = "foo/bar"
+ results := make([]bool, 0, len(treeNodes))
+ for _, node := range treeNodes {
+ _, valid := info.checkTreeNode(node)
+ results = append(results, valid)
+ }
+ require.Equal(t, []bool{false, false, false, false, false, true, true}, results)
+ })
+
+ t.Run("with key and upload id markers", func(t *testing.T) {
+ info.nodePaths = make(map[uint64]string)
+ info.keyMarker = "hello/world"
+ info.uploadID = "a"
+ results := make([]bool, 0, len(treeNodes))
+ for _, node := range treeNodes {
+ _, valid := info.checkTreeNode(node)
+ results = append(results, valid)
+ }
+ require.Equal(t, []bool{false, false, false, false, false, false, true}, results)
+ })
+}