diff --git a/api/cache/listmultipart.go b/api/cache/listmultipart.go new file mode 100644 index 0000000..cfa43ed --- /dev/null +++ b/api/cache/listmultipart.go @@ -0,0 +1,109 @@ +package cache + +import ( + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "github.com/bluele/gcache" + "go.uber.org/zap" +) + +type ( + // ListMultipartSessionCache contains cache for list multiparts session (during pagination). + ListMultipartSessionCache struct { + cache gcache.Cache + logger *zap.Logger + } + + // ListMultipartSessionKey is a key to find a ListMultipartSessionCache's entry. + ListMultipartSessionKey struct { + cid cid.ID + prefix string + marker string + uploadID string + } +) + +const ( + // DefaultListMultipartSessionCacheLifetime is a default lifetime of entries in cache of ListMultipartUploads. + DefaultListMultipartSessionCacheLifetime = time.Minute + // DefaultListMultipartSessionCacheSize is a default size of cache of ListMultipartUploads. + DefaultListMultipartSessionCacheSize = 100 +) + +// DefaultListMultipartSessionConfig returns new default cache expiration values. +func DefaultListMultipartSessionConfig(logger *zap.Logger) *Config { + return &Config{ + Size: DefaultListMultipartSessionCacheSize, + Lifetime: DefaultListMultipartSessionCacheLifetime, + Logger: logger, + } +} + +func (k *ListMultipartSessionKey) String() string { + return k.cid.EncodeToString() + k.prefix + k.marker + k.uploadID +} + +// NewListMultipartSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries. +func NewListMultipartSessionCache(config *Config) *ListMultipartSessionCache { + gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(_ interface{}, val interface{}) { + session, ok := val.(*data.ListMultipartSession) + if !ok { + config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)), + zap.String("expected", fmt.Sprintf("%T", session))) + } + + if !session.Acquired.Load() { + session.Cancel() + } + }).Build() + return &ListMultipartSessionCache{cache: gc, logger: config.Logger} +} + +// GetListMultipartSession returns a session of ListMultipartUploads request. +func (l *ListMultipartSessionCache) GetListMultipartSession(key ListMultipartSessionKey) *data.ListMultipartSession { + entry, err := l.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*data.ListMultipartSession) + if !ok { + l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)), + zap.String("expected", fmt.Sprintf("%T", result))) + return nil + } + + return result +} + +// PutListMultipartSession puts a ListMultipartUploads session info to cache. +func (l *ListMultipartSessionCache) PutListMultipartSession(key ListMultipartSessionKey, session *data.ListMultipartSession) error { + s := l.GetListMultipartSession(key) + if s != nil && s != session { + if !s.Acquired.Load() { + s.Cancel() + } + } + return l.cache.Set(key, session) +} + +// DeleteListMultipartSession removes key from cache. +func (l *ListMultipartSessionCache) DeleteListMultipartSession(key ListMultipartSessionKey) { + l.cache.Remove(key) +} + +// CreateListMultipartSessionCacheKey returns ListMultipartSessionKey with the given CID, prefix, marker and uploadID. +func CreateListMultipartSessionCacheKey(cnr cid.ID, prefix, marker, uploadID string) ListMultipartSessionKey { + p := ListMultipartSessionKey{ + cid: cnr, + prefix: prefix, + marker: marker, + uploadID: uploadID, + } + + return p +} diff --git a/api/cache/listsession.go b/api/cache/listsession.go index c8d0b68..a901d97 100644 --- a/api/cache/listsession.go +++ b/api/cache/listsession.go @@ -28,7 +28,7 @@ type ( const ( // DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects. - DefaultListSessionCacheLifetime = time.Second * 60 + DefaultListSessionCacheLifetime = time.Minute // DefaultListSessionCacheSize is a default size of cache of ListObjects. DefaultListSessionCacheSize = 100 ) diff --git a/api/data/listsession.go b/api/data/listsession.go index a13f2e4..c3498f6 100644 --- a/api/data/listsession.go +++ b/api/data/listsession.go @@ -9,11 +9,25 @@ type VersionsStream interface { Next(ctx context.Context) (*NodeVersion, error) } -type ListSession struct { - Next []*ExtendedNodeVersion - Stream VersionsStream - NamesMap map[string]struct{} +type CommonSession struct { Context context.Context Cancel context.CancelFunc Acquired atomic.Bool } + +type ListSession struct { + CommonSession + Next []*ExtendedNodeVersion + Stream VersionsStream + NamesMap map[string]struct{} +} + +type MultipartInfoStream interface { + Next(ctx context.Context) (*MultipartInfo, error) +} + +type ListMultipartSession struct { + CommonSession + Next *MultipartInfo + Stream MultipartInfoStream +} diff --git a/api/data/tree.go b/api/data/tree.go index c75d936..a30f3cd 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -207,3 +207,9 @@ func (l LockInfo) UntilDate() string { func (l LockInfo) IsCompliance() bool { return l.isCompliance } + +type MultipartStreamParams struct { + Prefix string + KeyMarker string + UploadIDMarker string +} diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index 628bba7..319c6d7 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -244,6 +244,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig { Buckets: minCacheCfg, System: minCacheCfg, AccessControl: minCacheCfg, + MultipartList: minCacheCfg, NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime}, } } diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index bb10927..644bcfb 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -513,7 +513,7 @@ func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Req if maxUploadsStr != "" { val, err := strconv.Atoi(maxUploadsStr) - if err != nil || val < 1 || val > 1000 { + if err != nil || val < 1 || val > maxObjectList { h.logAndSendError(ctx, w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads)) return } diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go index 2831b3e..5dcacd8 100644 --- a/api/handler/multipart_upload_test.go +++ b/api/handler/multipart_upload_test.go @@ -266,6 +266,36 @@ func TestListMultipartUploads(t *testing.T) { require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID) }) + t.Run("check delimiter", func(t *testing.T) { + t.Run("not truncated", func(t *testing.T) { + listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", -1) + require.Len(t, listUploads.Uploads, 0) + require.Len(t, listUploads.CommonPrefixes, 2) + require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix) + require.Equal(t, "/zzz/", listUploads.CommonPrefixes[1].Prefix) + }) + + t.Run("truncated", func(t *testing.T) { + listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", 1) + require.Len(t, listUploads.Uploads, 0) + require.Len(t, listUploads.CommonPrefixes, 1) + require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix) + require.True(t, listUploads.IsTruncated) + + listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1) + require.Len(t, listUploads.Uploads, 0) + require.Len(t, listUploads.CommonPrefixes, 1) + require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix) + require.True(t, listUploads.IsTruncated) + + listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1) + require.Len(t, listUploads.Uploads, 0) + require.Len(t, listUploads.CommonPrefixes, 1) + require.Equal(t, "/zzz/", listUploads.CommonPrefixes[0].Prefix) + require.False(t, listUploads.IsTruncated) + }) + }) + t.Run("check markers", func(t *testing.T) { t.Run("check only key-marker", func(t *testing.T) { listUploads := listMultipartUploads(hc, bktName, "", "", "", objName2, -1) @@ -294,6 +324,58 @@ func TestListMultipartUploads(t *testing.T) { require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID) }) }) + + t.Run("check next markers", func(t *testing.T) { + t.Run("check both next-key-marker and next-upload-id-marker", func(t *testing.T) { + listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1) + require.True(t, listUploads.IsTruncated) + require.Len(t, listUploads.Uploads, 1) + require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker) + require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker) + require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID) + require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key) + + listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1) + require.True(t, listUploads.IsTruncated) + require.Len(t, listUploads.Uploads, 1) + require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker) + require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker) + require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID) + require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key) + + listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1) + require.False(t, listUploads.IsTruncated) + require.Len(t, listUploads.Uploads, 1) + require.Empty(t, listUploads.NextUploadIDMarker) + require.Empty(t, listUploads.NextKeyMarker) + require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID) + }) + + t.Run("check only next-key-marker", func(t *testing.T) { + listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1) + require.True(t, listUploads.IsTruncated) + require.Len(t, listUploads.Uploads, 1) + require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker) + require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker) + require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID) + require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key) + + listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1) + require.True(t, listUploads.IsTruncated) + require.Len(t, listUploads.Uploads, 1) + require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker) + require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker) + require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID) + require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key) + + listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1) + require.False(t, listUploads.IsTruncated) + require.Len(t, listUploads.Uploads, 1) + require.Empty(t, listUploads.NextUploadIDMarker) + require.Empty(t, listUploads.NextKeyMarker) + require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID) + }) + }) } func TestMultipartUploadSize(t *testing.T) { diff --git a/api/layer/cache.go b/api/layer/cache.go index c3ceb7c..ecf5ac4 100644 --- a/api/layer/cache.go +++ b/api/layer/cache.go @@ -12,15 +12,16 @@ import ( ) type Cache struct { - logger *zap.Logger - listsCache *cache.ObjectsListCache - sessionListCache *cache.ListSessionCache - objCache *cache.ObjectsCache - namesCache *cache.ObjectsNameCache - bucketCache *cache.BucketCache - systemCache *cache.SystemCache - accessCache *cache.AccessControlCache - networkInfoCache *cache.NetworkInfoCache + logger *zap.Logger + listsCache *cache.ObjectsListCache + sessionListCache *cache.ListSessionCache + objCache *cache.ObjectsCache + namesCache *cache.ObjectsNameCache + bucketCache *cache.BucketCache + systemCache *cache.SystemCache + accessCache *cache.AccessControlCache + networkInfoCache *cache.NetworkInfoCache + sessionMultipartCache *cache.ListMultipartSessionCache } // CachesConfig contains params for caches. @@ -33,6 +34,7 @@ type CachesConfig struct { Buckets *cache.Config System *cache.Config AccessControl *cache.Config + MultipartList *cache.Config NetworkInfo *cache.NetworkInfoCacheConfig } @@ -48,20 +50,22 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig { System: cache.DefaultSystemConfig(logger), AccessControl: cache.DefaultAccessControlConfig(logger), NetworkInfo: cache.DefaultNetworkInfoConfig(logger), + MultipartList: cache.DefaultListMultipartSessionConfig(logger), } } func NewCache(cfg *CachesConfig) *Cache { return &Cache{ - logger: cfg.Logger, - listsCache: cache.NewObjectsListCache(cfg.ObjectsList), - sessionListCache: cache.NewListSessionCache(cfg.SessionList), - objCache: cache.New(cfg.Objects), - namesCache: cache.NewObjectsNameCache(cfg.Names), - bucketCache: cache.NewBucketCache(cfg.Buckets), - systemCache: cache.NewSystemCache(cfg.System), - accessCache: cache.NewAccessControlCache(cfg.AccessControl), - networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo), + logger: cfg.Logger, + listsCache: cache.NewObjectsListCache(cfg.ObjectsList), + sessionListCache: cache.NewListSessionCache(cfg.SessionList), + objCache: cache.New(cfg.Objects), + namesCache: cache.NewObjectsNameCache(cfg.Names), + bucketCache: cache.NewBucketCache(cfg.Buckets), + systemCache: cache.NewSystemCache(cfg.System), + accessCache: cache.NewAccessControlCache(cfg.AccessControl), + networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo), + sessionMultipartCache: cache.NewListMultipartSessionCache(cfg.MultipartList), } } @@ -161,6 +165,14 @@ func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.Li return c.sessionListCache.GetListSession(key) } +func (c *Cache) GetListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) *data.ListMultipartSession { + if !c.accessCache.Get(owner, key.String()) { + return nil + } + + return c.sessionMultipartCache.GetListMultipartSession(key) +} + func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) { if err := c.sessionListCache.PutListSession(key, session); err != nil { c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err)) @@ -176,6 +188,21 @@ func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) { c.accessCache.Delete(owner, key.String()) } +func (c *Cache) PutListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey, session *data.ListMultipartSession) { + if err := c.sessionMultipartCache.PutListMultipartSession(key, session); err != nil { + c.logger.Warn(logs.CouldntCacheListMultipartSession, zap.Error(err)) + } + + if err := c.accessCache.Put(owner, key.String()); err != nil { + c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err)) + } +} + +func (c *Cache) DeleteListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) { + c.sessionMultipartCache.DeleteListMultipartSession(key) + c.accessCache.Delete(owner, key.String()) +} + func (c *Cache) GetTagging(owner user.ID, key string) map[string]string { if !c.accessCache.Get(owner, key) { return nil diff --git a/api/layer/listing.go b/api/layer/listing.go index 790243b..630a15d 100644 --- a/api/layer/listing.go +++ b/api/layer/listing.go @@ -79,7 +79,8 @@ type ( Prefix string MaxKeys int Marker string - Bookmark string + // Bookmark contains Marker or ContinuationToken and is used for pagination and as part of a cache key for list session. + Bookmark string } commonLatestVersionsListingParams struct { @@ -193,11 +194,10 @@ func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers return nil, nil, nil } - session, err := n.getListLatestVersionsSession(ctx, p) + session, err := n.getListVersionsSession(ctx, p.commonVersionsListingParams, true) if err != nil { return nil, nil, err } - generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session) objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator) if err != nil { @@ -230,7 +230,7 @@ func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListi return nil, false, nil } - session, err := n.getListAllVersionsSession(ctx, p) + session, err := n.getListVersionsSession(ctx, p, false) if err != nil { return nil, false, err } @@ -301,48 +301,31 @@ func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int, } } -func (n *Layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) { - return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true) -} - -func (n *Layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) { - return n.getListVersionsSession(ctx, p, false) -} - -func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) { +func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) { owner := n.BearerOwner(ctx) cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) - session := n.cache.GetListSession(owner, cacheKey) - if session == nil { - return n.initNewVersionsByPrefixSession(ctx, p, latestOnly) + session = n.cache.GetListSession(owner, cacheKey) + if session == nil || session.Acquired.Swap(true) { + session = n.newSession(ctx) + session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly) + return session, err } - if session.Acquired.Swap(true) { - return n.initNewVersionsByPrefixSession(ctx, p, latestOnly) - } - - // after reading next object from stream in session - // the current cache value already doesn't match with next token in cache key n.cache.DeleteListSession(owner, cacheKey) - return session, nil } -func (n *Layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) { - session = &data.ListSession{NamesMap: make(map[string]struct{})} +func (n *Layer) newSession(ctx context.Context) *data.ListSession { + session := &data.ListSession{NamesMap: make(map[string]struct{})} session.Context, session.Cancel = context.WithCancel(context.Background()) + // save access box data for next requests if bd, err := middleware.GetBoxData(ctx); err == nil { session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd}) } - session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly) - if err != nil { - return nil, err - } - - return session, nil + return session } func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) { diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index ed7612c..2639f69 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -2,6 +2,7 @@ package layer import ( "bytes" + "cmp" "context" "crypto/md5" "encoding/base64" @@ -10,17 +11,20 @@ import ( "errors" "fmt" "io" + "slices" "sort" "strconv" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" @@ -499,47 +503,65 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload return &result, nil } - multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix) + session, err := n.getListMultipartUploadsSession(ctx, p) if err != nil { return nil, err } - uploads := make([]*UploadInfo, 0, len(multipartInfos)) + uploads := make([]*UploadInfo, 0, p.MaxUploads) uniqDirs := make(map[string]struct{}) + uploadsCount := 0 + if session.Next != nil { + upload := uploadInfoFromMultipartInfo(session.Next, p.Prefix, p.Delimiter) + switch { + case upload.IsDir && isUniqDir(upload.Key, uniqDirs): + uniqDirs[upload.Key] = struct{}{} + fallthrough + case !upload.IsDir: + uploads = append(uploads, upload) + uploadsCount++ + } + } - for _, multipartInfo := range multipartInfos { - info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter) - if info != nil { - if info.IsDir { - if _, ok := uniqDirs[info.Key]; ok { - continue - } - uniqDirs[info.Key] = struct{}{} + info := session.Next + for uploadsCount < p.MaxUploads { + info, err = session.Stream.Next(ctx) + if err != nil { + if errors.Is(err, io.EOF) { + break } - uploads = append(uploads, info) + n.reqLogger(ctx).Warn(logs.CouldNotGetMultipartUploadInfo, zap.Error(err)) + continue } + upload := uploadInfoFromMultipartInfo(info, p.Prefix, p.Delimiter) + if upload.IsDir { + if !isUniqDir(upload.Key, uniqDirs) { + continue + } + uniqDirs[upload.Key] = struct{}{} + } + uploads = append(uploads, upload) + uploadsCount++ } - sort.Slice(uploads, func(i, j int) bool { - if uploads[i].Key == uploads[j].Key { - return uploads[i].UploadID < uploads[j].UploadID - } - return uploads[i].Key < uploads[j].Key - }) - - if p.KeyMarker != "" { - if p.UploadIDMarker != "" { - uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads) + isTruncated := true + next, err := session.Stream.Next(ctx) + if err != nil { + if errors.Is(err, io.EOF) { + isTruncated = false } else { - uploads = trimAfterUploadKey(p.KeyMarker, uploads) + return nil, err } } - if len(uploads) > p.MaxUploads { + if isTruncated && info != nil { + // put to session redundant multipart upload which we read to check for EOF + session.Next = next result.IsTruncated = true - uploads = uploads[:p.MaxUploads] - result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID - result.NextKeyMarker = uploads[len(uploads)-1].Key + result.NextUploadIDMarker = info.UploadID + result.NextKeyMarker = info.Key + cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, info.Key, info.UploadID) + n.putListMultipartUploadsSession(ctx, session, cacheKey) } for _, ov := range uploads { @@ -550,9 +572,62 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload } } + slices.SortFunc(result.Uploads, func(a, b *UploadInfo) int { + keyCmp := cmp.Compare(a.Key, b.Key) + if keyCmp == 0 { + return cmp.Compare(a.UploadID, b.UploadID) + } + + return keyCmp + }) + return &result, nil } +func (n *Layer) putListMultipartUploadsSession(ctx context.Context, session *data.ListMultipartSession, cacheKey cache.ListMultipartSessionKey) { + session.Acquired.Store(false) + n.cache.PutListMultipartSession(n.BearerOwner(ctx), cacheKey, session) +} + +func (n *Layer) getListMultipartUploadsSession(ctx context.Context, p *ListMultipartUploadsParams) (session *data.ListMultipartSession, err error) { + owner := n.BearerOwner(ctx) + cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, p.KeyMarker, p.UploadIDMarker) + session = n.cache.GetListMultipartSession(owner, cacheKey) + if session == nil || session.Acquired.Swap(true) { + session = newListMultipartSession(ctx) + params := data.MultipartStreamParams{ + Prefix: p.Prefix, + KeyMarker: p.KeyMarker, + UploadIDMarker: p.UploadIDMarker, + } + session.Stream, err = n.treeService.GetMultipartUploadsByPrefix(session.Context, p.Bkt, params) + if err != nil { + return nil, err + } + } + // if after reading next object from stream in session the current cache value already + // doesn't match with next token in cache key + n.cache.DeleteListMultipartSession(owner, cacheKey) + + return session, nil +} + +func newListMultipartSession(ctx context.Context) *data.ListMultipartSession { + reqCtx, cancel := context.WithCancel(context.Background()) + session := &data.ListMultipartSession{ + CommonSession: data.CommonSession{ + Context: reqCtx, + Cancel: cancel, + }, + } + + // save access box data for next requests + if bd, err := middleware.GetBoxData(ctx); err == nil { + session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd}) + } + return session +} + func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error { multipartInfo, parts, err := n.getUploadParts(ctx, p) if err != nil { @@ -677,44 +752,10 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data. return multipartInfo, res, nil } -func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo { - var res []*UploadInfo - if len(uploads) != 0 && uploads[len(uploads)-1].Key < key { - return res - } - - for _, obj := range uploads { - if obj.Key >= key && obj.UploadID > id { - res = append(res, obj) - } - } - - return res -} - -func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo { - var result []*UploadInfo - if len(objects) != 0 && objects[len(objects)-1].Key <= key { - return result - } - for i, obj := range objects { - if obj.Key > key { - result = objects[i:] - break - } - } - - return result -} - func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo { var isDir bool key := uploadInfo.Key - if !strings.HasPrefix(key, prefix) { - return nil - } - if len(delimiter) > 0 { tail := strings.TrimPrefix(key, prefix) index := strings.Index(tail, delimiter) @@ -732,3 +773,10 @@ func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimit Created: uploadInfo.Created, } } + +func isUniqDir(key string, uniqDirs map[string]struct{}) bool { + if _, ok := uniqDirs[key]; ok { + return false + } + return true +} diff --git a/api/layer/multipart_upload_test.go b/api/layer/multipart_upload_test.go deleted file mode 100644 index 5d21523..0000000 --- a/api/layer/multipart_upload_test.go +++ /dev/null @@ -1,108 +0,0 @@ -package layer - -import ( - "sort" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestTrimAfterUploadIDAndKey(t *testing.T) { - uploads := []*UploadInfo{ - {Key: "j", UploadID: "k"}, // key < id < - {Key: "l", UploadID: "p"}, // key < id > - {Key: "n", UploadID: "m"}, // key = id < - {Key: "n", UploadID: "o"}, // pivot - {Key: "n", UploadID: "q"}, // key = id > - {Key: "p", UploadID: "h"}, // key > id < - {Key: "q", UploadID: "r"}, // key > id > - } - expectedUploadsListsIndexes := [][]int{ - {1, 2, 3, 4, 6}, - {4, 6}, - {3, 4, 6}, - {4, 6}, - {6}, - {6}, - {}, - } - - sort.Slice(uploads, func(i, j int) bool { - if uploads[i].Key == uploads[j].Key { - return uploads[i].UploadID < uploads[j].UploadID - } - return uploads[i].Key < uploads[j].Key - }) - - length := len(uploads) - - t.Run("the last element's key is less, upload id is less", func(t *testing.T) { - keys := trimAfterUploadIDAndKey("z", "a", uploads) - require.Empty(t, keys) - require.Len(t, uploads, length) - }) - - t.Run("the last element's key is less, upload id is greater", func(t *testing.T) { - keys := trimAfterUploadIDAndKey("z", "a", uploads) - require.Empty(t, keys) - require.Len(t, uploads, length) - }) - - t.Run("check for uploads", func(t *testing.T) { - for i, u := range uploads { - list := trimAfterUploadIDAndKey(u.Key, u.UploadID, uploads) - require.Equal(t, len(list), len(expectedUploadsListsIndexes[i])) - for j, idx := range expectedUploadsListsIndexes[i] { - require.Equal(t, list[j], uploads[idx]) - } - } - }) -} - -func TestTrimAfterUploadKey(t *testing.T) { - var ( - uploadKeys = []string{"e", "f", "f", "g", "h", "i"} - theSameKeyIdx = []int{1, 2} - diffKeyIdx = []int{0, 3} - lastIdx = len(uploadKeys) - 1 - ) - - uploadsInfos := make([]*UploadInfo, 0, len(uploadKeys)) - for _, k := range uploadKeys { - uploadsInfos = append(uploadsInfos, &UploadInfo{Key: k}) - } - - t.Run("empty list", func(t *testing.T) { - keys := trimAfterUploadKey("f", []*UploadInfo{}) - require.Len(t, keys, 0) - }) - - t.Run("the last element is less than a key", func(t *testing.T) { - keys := trimAfterUploadKey("j", uploadsInfos) - require.Empty(t, keys) - require.Len(t, uploadsInfos, len(uploadKeys)) - }) - - t.Run("different keys in sequence", func(t *testing.T) { - for _, i := range diffKeyIdx { - keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos) - require.Len(t, keys, len(uploadKeys)-i-1) - require.Equal(t, keys, uploadsInfos[i+1:]) - require.Len(t, uploadsInfos, len(uploadKeys)) - } - }) - - t.Run("the same keys in the sequence first element", func(t *testing.T) { - for _, i := range theSameKeyIdx { - keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos) - require.Len(t, keys, 3) - require.Equal(t, keys, uploadsInfos[3:]) - require.Len(t, uploadsInfos, len(uploadKeys)) - } - }) - - t.Run("last element", func(t *testing.T) { - keys := trimAfterUploadKey(uploadKeys[lastIdx], uploadsInfos) - require.Empty(t, keys) - }) -} diff --git a/api/layer/tree/tree_service.go b/api/layer/tree/tree_service.go index db079b1..94470fb 100644 --- a/api/layer/tree/tree_service.go +++ b/api/layer/tree/tree_service.go @@ -53,7 +53,7 @@ type Service interface { CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error - GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) + GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) // AddPart puts a node to a system tree as a child of appropriate multipart upload diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 577f406..cd5a3ec 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -328,7 +328,7 @@ func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data return nil } -func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, string) ([]*data.MultipartInfo, error) { +func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, data.MultipartStreamParams) (data.MultipartInfoStream, error) { panic("implement me") } diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 1ac2b30..caa6810 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -1007,6 +1007,8 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig { cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size) cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime) + cacheCfg.MultipartList.Lifetime = fetchCacheLifetime(v, l, cfgMultipartListCacheLifetime, cacheCfg.MultipartList.Lifetime) + cacheCfg.MultipartList.Size = fetchCacheSize(v, l, cfgMultipartListCacheSize, cacheCfg.MultipartList.Size) return cacheCfg } diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 691e231..4309727 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -141,6 +141,8 @@ const ( // Settings. cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime" cfgFrostfsIDCacheSize = "cache.frostfsid.size" cfgNetworkInfoCacheLifetime = "cache.network_info.lifetime" + cfgMultipartListCacheLifetime = "cache.multipart_list_session.lifetime" + cfgMultipartListCacheSize = "cache.multipart_list_session.size" cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval" diff --git a/config/config.env b/config/config.env index ef6a27b..706d10f 100644 --- a/config/config.env +++ b/config/config.env @@ -101,9 +101,12 @@ S3_GW_CACHE_OBJECTS_SIZE=1000000 # Cache which keeps lists of objects in buckets S3_GW_CACHE_LIST_LIFETIME=1m S3_GW_CACHE_LIST_SIZE=100000 -# Cache which keeps listing session +# Cache which keeps listing objects session S3_GW_CACHE_LIST_SESSION_LIFETIME=1m S3_GW_CACHE_LIST_SESSION_SIZE=100 +# Cache which keeps listing multipart uploads session +S3_GW_CACHE_MULTIPART_LIST_SESSION_LIFETIME=1m +S3_GW_CACHE_MULTIPART_LIST_SESSION_SIZE=100 # Cache which contains mapping of bucket name to bucket info S3_GW_CACHE_BUCKETS_LIFETIME=1m S3_GW_CACHE_BUCKETS_SIZE=1000 diff --git a/config/config.yaml b/config/config.yaml index 051f5f7..539ec63 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -129,6 +129,9 @@ cache: list_session: lifetime: 1m size: 100 + multipart_list_session: + lifetime: 1m + size: 100 # Cache which contains mapping of nice name to object addresses names: lifetime: 1m diff --git a/docs/configuration.md b/docs/configuration.md index 70b2f33..6fbd5f1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -426,6 +426,9 @@ cache: list_session: lifetime: 1m size: 100 + multipart_list_session: + lifetime: 1m + size: 10000 names: lifetime: 1m size: 1000 @@ -452,19 +455,20 @@ cache: lifetime: 1m ``` -| Parameter | Type | Default value | Description | -|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------| -| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 1000000` | Cache for objects (FrostFS headers). | -| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100000` | Cache which keeps lists of objects in buckets. | -| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing session. | -| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 10000` | Cache which contains mapping of nice name to object addresses. | -| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 1000` | Cache which contains mapping of bucket name to bucket info. | -| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 10000` | Cache for system objects in a bucket: bucket settings etc. | -| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`
`size: 100` | Cache which stores access box with tokens by its address. | -| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 100000` | Cache which stores owner to cache operation mapping. | -| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores list of policy chains. | -| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores FrostfsID subject info. | -| `network_info` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores network info. | +| Parameter | Type | Default value | Description | +|--------------------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------| +| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 1000000` | Cache for objects (FrostFS headers). | +| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100000` | Cache which keeps lists of objects in buckets. | +| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing session. | +| `multipart_list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 100` | Cache which keeps listing of multipart uploads. | +| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 10000` | Cache which contains mapping of nice name to object addresses. | +| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`
`size: 1000` | Cache which contains mapping of bucket name to bucket info. | +| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`
`size: 10000` | Cache for system objects in a bucket: bucket settings etc. | +| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`
`size: 100` | Cache which stores access box with tokens by its address. | +| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 100000` | Cache which stores owner to cache operation mapping. | +| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores list of policy chains. | +| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`
`size: 10000` | Cache which stores FrostfsID subject info. | +| `network_info` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores network info. | #### `cache` subsection diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 3da1255..ea77cad 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -8,6 +8,8 @@ const ( UpdateAccessCredObjectIntoFrostFS = "update access cred object into FrostFS" // Info in ../../authmate/authmate.go MetricsAreDisabled = "metrics are disabled" // Warn in ../../metrics/app.go FoundMoreThanOneUnversionedNode = "found more than one unversioned node" // Debug in ../../pkg/service/tree/tree.go + CouldNotParseTreeNode = "could not parse tree node" // Error in ../../pkg/service/tree/tree.go + CouldNotFormFilePath = "could not form file path" // Error in ../../pkg/service/tree/tree.go ServiceIsRunning = "service is running" // Info in ../../cmd/s3-gw/service.go ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../cmd/s3-gw/service.go ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../cmd/s3-gw/service.go @@ -67,6 +69,7 @@ const ( CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go MismatchedObjEncryptionInfo = "mismatched obj encryptionInfo" // Warn in ../../api/layer/multipart_upload.go + CouldNotGetMultipartUploadInfo = "could not get multipart upload info" // Warn in ../../api/layer/multipart_upload.go UploadPart = "upload part" // Debug in ../../api/layer/multipart_upload.go CouldntDeleteOldPartObject = "couldn't delete old part object" // Error in ../../api/layer/multipart_upload.go CouldNotPutCompletedObject = "could not put a completed object (multipart upload)" // Error in ../../api/layer/multipart_upload.go @@ -89,6 +92,7 @@ const ( CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go + CouldntCacheListMultipartSession = "couldn't cache list multipart session" // Warn in ../../api/layer/cache.go CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 438b129..4b3e538 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -84,7 +84,9 @@ var ( // ErrGatewayTimeout is returned from ServiceClient service in case of timeout error. ErrGatewayTimeout = frostfs.ErrGatewayTimeout - errNodeDoesntContainFileName = fmt.Errorf("node doesn't contain FileName") + errNodeDoesntContainFileName = errors.New("node doesn't contain FileName") + + errParentPathNotFound = errors.New("couldn't get parent path") ) const ( @@ -1061,7 +1063,7 @@ func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res * var filepath string if !s.intermediateRootID.Equal(trNode.ID) { - if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil { + if filepath, err = formFilePath(trNode, fileName, s.namesMap); err != nil { return nil, false, fmt.Errorf("invalid node order: %w", err) } } else { @@ -1165,58 +1167,18 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr return intermediateNodes, nil } -func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) { - rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix) +func (c *Tree) getSubTreeByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, []uint64, error) { + rootID, _, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix) if err != nil { if errors.Is(err, tree.ErrNodeNotFound) { - return nil, "", nil + return nil, nil, nil } - return nil, "", err + return nil, nil, err } - subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2, false) - if err != nil { - if errors.Is(err, tree.ErrNodeNotFound) { - return nil, "", nil - } - return nil, "", err - } + stream, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2) - nodesMap := make(map[string][]NodeResponse, len(subTree)) - for _, node := range subTree { - if MultiID(rootID).Equal(node.GetNodeID()) { - continue - } - - fileName := getFilename(node) - if !strings.HasPrefix(fileName, tailPrefix) { - continue - } - - nodes := nodesMap[fileName] - - // Add all nodes if flag latestOnly is false. - // Add all intermediate nodes - // and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0] - if len(nodes) == 0 { - nodes = []NodeResponse{node} - } else if !latestOnly || isIntermediate(node) { - nodes = append(nodes, node) - } else if isIntermediate(nodes[0]) { - nodes = append([]NodeResponse{node}, nodes...) - } else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) { - nodes[0] = node - } - - nodesMap[fileName] = nodes - } - - result := make([]NodeResponse, 0, len(subTree)) - for _, nodes := range nodesMap { - result = append(result, nodes...) - } - - return result, strings.TrimSuffix(prefix, tailPrefix), nil + return stream, rootID, err } func getFilename(node NodeResponse) string { @@ -1237,20 +1199,19 @@ func isIntermediate(node NodeResponse) bool { return node.GetMeta()[0].GetKey() == FileNameKey } -func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) { - var filepath string +func formFilePath(node *treeNode, fileName string, namesMap map[uint64]string) (string, error) { + var filePath string - for i, id := range node.GetParentID() { + for i, id := range node.ParentID { parentPath, ok := namesMap[id] if !ok { - return "", fmt.Errorf("couldn't get parent path") + return "", errParentPathNotFound } - - filepath = parentPath + separator + fileName - namesMap[node.GetNodeID()[i]] = filepath + filePath = parentPath + separator + fileName + namesMap[node.ID[i]] = filePath } - return filepath, nil + return filePath, nil } func parseTreeNode(node NodeResponse) (*treeNode, string, error) { @@ -1267,10 +1228,6 @@ func parseTreeNode(node NodeResponse) (*treeNode, string, error) { return tNode, fileName, nil } -func formLatestNodeKey(parentID uint64, fileName string) string { - return strconv.FormatUint(parentID, 10) + "." + fileName -} - func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) { return c.getUnversioned(ctx, bktInfo, versionTree, filepath) } @@ -1313,84 +1270,137 @@ func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketIn return err } -func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) { - subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false) - if err != nil { - return nil, err - } - - var result []*data.MultipartInfo - for _, node := range subTreeNodes { - multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix) - if err != nil { - return nil, err - } - result = append(result, multipartUploads...) - } - - return result, nil +type multipartInfoStream struct { + log *zap.Logger + nodePaths map[uint64]string + rootID MultiID + // childStream stream of children nodes of prefix node. + childStream SubTreeStream + // currentStream stream of children's nodes with max depth. + currentStream SubTreeStream + treeService ServiceClient + bktInfo *data.BucketInfo + uploadID string + keyMarker string + headPrefix string + prefix string } -func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) { - // sorting in getSubTree leads to skipping nodes that doesn't have FileName attribute - // so when we are only interested in multipart nodes, we can set this flag - // (despite we sort multiparts in above layer anyway) - // to skip its children (parts) that don't have FileName - subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth, true) +func (m *multipartInfoStream) Next(ctx context.Context) (*data.MultipartInfo, error) { + var tNode *treeNode + var filePath string + + if m.currentStream == nil { + var err error + if m.currentStream, err = m.openNewStream(ctx); err != nil { + return nil, err + } + } + for { + var err error + tNode, err = getTreeNodeFromStream(m.currentStream) + if err != nil { + if errors.Is(err, io.EOF) { + if m.currentStream, err = m.openNewStream(ctx); err != nil { + return nil, err + } + continue + } + return nil, err + } + var ok bool + if filePath, ok = m.checkTreeNode(tNode); ok { + break + } + } + + return newMultipartInfoFromTreeNode(m.log, filePath, tNode) +} + +// openNewStream creates subtree stream from childStream`s node. +func (m *multipartInfoStream) openNewStream(ctx context.Context) (SubTreeStream, error) { + node, err := getTreeNodeFromStream(m.childStream) if err != nil { return nil, err } - - var parentPrefix string - if parentFilePath != "" { // The root of subTree can also have a parent - parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar' + if m.rootID.Equal(node.ID) { + // skip root node + return m.openNewStream(ctx) } + stream, err := m.treeService.GetSubTreeStream(ctx, m.bktInfo, systemTree, node.ID, maxGetSubTreeDepth) + if err != nil { + return nil, err + } + return stream, nil +} - var filepath string - namesMap := make(map[uint64]string, len(subTree)) - multiparts := make(map[string][]*data.MultipartInfo, len(subTree)) +func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error) { + node, err := stream.Next() + if err != nil { + return nil, err + } + tNode, err := newTreeNode(node) + if err != nil { + return nil, err + } + return tNode, nil +} - for i, node := range subTree { - tNode, fileName, err := parseTreeNode(node) - if err != nil { - continue - } +func (m *multipartInfoStream) checkTreeNode(tNode *treeNode) (string, bool) { + var ok bool + var err error - if i != 0 { - if filepath, err = formFilePath(node, fileName, namesMap); err != nil { - return nil, fmt.Errorf("invalid node order: %w", err) - } - } else { - filepath = parentPrefix + fileName - for _, id := range tNode.ID { - namesMap[id] = filepath - } - } - - multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode) - if err != nil || multipartInfo.Finished { - continue - } - - for _, id := range node.GetParentID() { - key := formLatestNodeKey(id, fileName) - multipartInfos, ok := multiparts[key] - if !ok { - multipartInfos = []*data.MultipartInfo{multipartInfo} - } else { - multipartInfos = append(multipartInfos, multipartInfo) - } - - multiparts[key] = multipartInfos + if tNode.IsSplit() { + return "", false + } + fileName, ok := tNode.FileName() + if !ok { + return "", false + } + filePath, err := formFilePath(tNode, fileName, m.nodePaths) + if err != nil { + filePath = fileName + m.nodePaths[tNode.ID[0]] = filePath + } + filePath = m.headPrefix + filePath + if !strings.HasPrefix(filePath, m.prefix) { + return "", false + } + if _, ok = tNode.Meta[finishedKV]; ok { + return "", false + } + if id, ok := tNode.Meta[uploadIDKV]; ok { + if filePath > m.keyMarker || (filePath == m.keyMarker && m.uploadID != "" && id > m.uploadID) { + return filePath, true } } - result := make([]*data.MultipartInfo, 0, len(multiparts)) - for _, multipartInfo := range multiparts { - result = append(result, multipartInfo...) + return "", false +} + +func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error) { + stream, rootID, err := c.getSubTreeByPrefixStream(ctx, bktInfo, systemTree, params.Prefix) + if err != nil { + if errors.Is(err, tree.ErrNodeNotFound) { + return nil, nil + } + return nil, err } - return result, nil + return &multipartInfoStream{ + log: c.reqLogger(ctx), + rootID: rootID, + childStream: stream, + nodePaths: make(map[uint64]string), + treeService: c.service, + bktInfo: bktInfo, + uploadID: params.UploadIDMarker, + keyMarker: params.KeyMarker, + prefix: params.Prefix, + headPrefix: strings.TrimRightFunc(params.Prefix, func(r rune) bool { + return r != '/' + }), + }, nil } func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) { diff --git a/pkg/service/tree/tree_test.go b/pkg/service/tree/tree_test.go index 058e46a..c9f4e94 100644 --- a/pkg/service/tree/tree_test.go +++ b/pkg/service/tree/tree_test.go @@ -12,6 +12,7 @@ import ( usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -359,3 +360,127 @@ func TestSplitTreeMultiparts(t *testing.T) { require.NoError(t, err) require.Len(t, parts, 1) } + +func TestCheckTreeNode(t *testing.T) { + treeNodes := []*treeNode{ + // foo/ + { + ID: []uint64{1}, + ParentID: []uint64{0}, + TimeStamp: []uint64{1}, + Meta: map[string]string{ + "FileName": "foo", + }, + }, + // foo/ant + { + ID: []uint64{2}, + ParentID: []uint64{1}, + TimeStamp: []uint64{1}, + Meta: map[string]string{ + "FileName": "ant", + "UploadId": "d", + }, + }, + // foo/bar + { + ID: []uint64{3}, + ParentID: []uint64{1}, + TimeStamp: []uint64{1}, + Meta: map[string]string{ + "FileName": "bar", + "UploadId": "c", + }, + }, + // foo/finished + { + ID: []uint64{4}, + ParentID: []uint64{1}, + TimeStamp: []uint64{1}, + Meta: map[string]string{ + "FileName": "finished", + "UploadId": "e", + "Finished": "True", + }, + }, + // hello/ + { + ID: []uint64{5}, + ParentID: []uint64{0}, + TimeStamp: []uint64{1}, + Meta: map[string]string{ + "FileName": "hello", + }, + }, + // hello/world + { + ID: []uint64{6}, + ParentID: []uint64{5}, + TimeStamp: []uint64{1}, + Meta: map[string]string{ + "FileName": "world", + "UploadId": "a", + }, + }, + // hello/world + { + ID: []uint64{7}, + ParentID: []uint64{5}, + TimeStamp: []uint64{1}, + Meta: map[string]string{ + "FileName": "world", + "UploadId": "b", + }, + }, + } + + info := multipartInfoStream{ + log: zap.NewNop(), + rootID: []uint64{0}, + } + + t.Run("without markers", func(t *testing.T) { + info.nodePaths = make(map[uint64]string) + results := make([]bool, 0, len(treeNodes)) + for _, node := range treeNodes { + _, valid := info.checkTreeNode(node) + results = append(results, valid) + } + require.Equal(t, []bool{false, true, true, false, false, true, true}, results) + }) + + t.Run("with prefix", func(t *testing.T) { + info.nodePaths = make(map[uint64]string) + info.prefix = "hello" + info.headPrefix = "" + results := make([]bool, 0, len(treeNodes)) + for _, node := range treeNodes { + _, valid := info.checkTreeNode(node) + results = append(results, valid) + } + require.Equal(t, []bool{false, false, false, false, false, true, true}, results) + }) + + t.Run("with key marker", func(t *testing.T) { + info.nodePaths = make(map[uint64]string) + info.keyMarker = "foo/bar" + results := make([]bool, 0, len(treeNodes)) + for _, node := range treeNodes { + _, valid := info.checkTreeNode(node) + results = append(results, valid) + } + require.Equal(t, []bool{false, false, false, false, false, true, true}, results) + }) + + t.Run("with key and upload id markers", func(t *testing.T) { + info.nodePaths = make(map[uint64]string) + info.keyMarker = "hello/world" + info.uploadID = "a" + results := make([]bool, 0, len(treeNodes)) + for _, node := range treeNodes { + _, valid := info.checkTreeNode(node) + results = append(results, valid) + } + require.Equal(t, []bool{false, false, false, false, false, false, true}, results) + }) +}