[#469] List multipart uploads streaming #527

Open
nzinkevich wants to merge 1 commit from nzinkevich/frostfs-s3-gw:multiparts_list_streaming into master
21 changed files with 678 additions and 363 deletions
Showing only changes of commit 0f5a2e0a15 - Show all commits

109
api/cache/listmultipart.go vendored Normal file
View file

@ -0,0 +1,109 @@
package cache
import (
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/bluele/gcache"
"go.uber.org/zap"
)
type (
// ListMultipartSessionCache contains cache for list multiparts session (during pagination).
ListMultipartSessionCache struct {
cache gcache.Cache
logger *zap.Logger
}
// ListMultipartSessionKey is a key to find a ListMultipartSessionCache's entry.
ListMultipartSessionKey struct {
cid cid.ID
prefix string
marker string
uploadID string
}
)
const (
// DefaultListMultipartSessionCacheLifetime is a default lifetime of entries in cache of ListMultipartUploads.
DefaultListMultipartSessionCacheLifetime = time.Minute
// DefaultListMultipartSessionCacheSize is a default size of cache of ListMultipartUploads.
DefaultListMultipartSessionCacheSize = 100

If we add new cache we also should be able configure this

If we add new cache we also should be able configure this

Added config params

Added config params
Please mention these params in https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/config/config.yaml and https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/config/config.env Also we can write `time.Minute` instead of `time.Second * 60`
)
// DefaultListMultipartSessionConfig returns new default cache expiration values.
func DefaultListMultipartSessionConfig(logger *zap.Logger) *Config {
return &Config{
Size: DefaultListMultipartSessionCacheSize,
Lifetime: DefaultListMultipartSessionCacheLifetime,
Logger: logger,
}
}
func (k *ListMultipartSessionKey) String() string {
return k.cid.EncodeToString() + k.prefix + k.marker + k.uploadID
}
// NewListMultipartSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries.
func NewListMultipartSessionCache(config *Config) *ListMultipartSessionCache {
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(_ interface{}, val interface{}) {
session, ok := val.(*data.ListMultipartSession)
if !ok {
config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)),
zap.String("expected", fmt.Sprintf("%T", session)))
}
if !session.Acquired.Load() {
session.Cancel()
}
}).Build()
return &ListMultipartSessionCache{cache: gc, logger: config.Logger}
}
// GetListMultipartSession returns a session of ListMultipartUploads request.
func (l *ListMultipartSessionCache) GetListMultipartSession(key ListMultipartSessionKey) *data.ListMultipartSession {
entry, err := l.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*data.ListMultipartSession)
if !ok {
l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
zap.String("expected", fmt.Sprintf("%T", result)))
return nil
}
return result
}
// PutListMultipartSession puts a ListMultipartUploads session info to cache.
func (l *ListMultipartSessionCache) PutListMultipartSession(key ListMultipartSessionKey, session *data.ListMultipartSession) error {
s := l.GetListMultipartSession(key)
if s != nil && s != session {
if !s.Acquired.Load() {
s.Cancel()
}
}
return l.cache.Set(key, session)
}
// DeleteListMultipartSession removes key from cache.
func (l *ListMultipartSessionCache) DeleteListMultipartSession(key ListMultipartSessionKey) {
l.cache.Remove(key)
}
// CreateListMultipartSessionCacheKey returns ListMultipartSessionKey with the given CID, prefix, marker and uploadID.
func CreateListMultipartSessionCacheKey(cnr cid.ID, prefix, marker, uploadID string) ListMultipartSessionKey {
p := ListMultipartSessionKey{
cid: cnr,
prefix: prefix,
marker: marker,
uploadID: uploadID,
}
return p
}

View file

@ -28,7 +28,7 @@ type (
const (
// DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects.
DefaultListSessionCacheLifetime = time.Second * 60
DefaultListSessionCacheLifetime = time.Minute
// DefaultListSessionCacheSize is a default size of cache of ListObjects.
DefaultListSessionCacheSize = 100
)

View file

@ -9,11 +9,25 @@ type VersionsStream interface {
Next(ctx context.Context) (*NodeVersion, error)
}
type ListSession struct {
Next []*ExtendedNodeVersion
Stream VersionsStream
NamesMap map[string]struct{}
type CommonSession struct {
Context context.Context
Cancel context.CancelFunc
Acquired atomic.Bool
}
type ListSession struct {
CommonSession
Next []*ExtendedNodeVersion
Stream VersionsStream
NamesMap map[string]struct{}
}
type MultipartInfoStream interface {
Next(ctx context.Context) (*MultipartInfo, error)
dkirillov marked this conversation as resolved Outdated

Why do we pass marker and uploadID?

Why do we pass `marker` and `uploadID`?
}
type ListMultipartSession struct {
CommonSession
Next *MultipartInfo
Stream MultipartInfoStream
}

View file

@ -207,3 +207,9 @@ func (l LockInfo) UntilDate() string {
func (l LockInfo) IsCompliance() bool {
return l.isCompliance
}
type MultipartStreamParams struct {
Prefix string
KeyMarker string
UploadIDMarker string
}

View file

@ -244,6 +244,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
Buckets: minCacheCfg,
System: minCacheCfg,
AccessControl: minCacheCfg,
MultipartList: minCacheCfg,
NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime},
}
}

View file

@ -513,7 +513,7 @@ func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Req
if maxUploadsStr != "" {
val, err := strconv.Atoi(maxUploadsStr)
if err != nil || val < 1 || val > 1000 {
if err != nil || val < 1 || val > maxObjectList {
h.logAndSendError(ctx, w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
return
}

View file

@ -266,6 +266,36 @@ func TestListMultipartUploads(t *testing.T) {
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID)
})
t.Run("check delimiter", func(t *testing.T) {
t.Run("not truncated", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", -1)
require.Len(t, listUploads.Uploads, 0)
require.Len(t, listUploads.CommonPrefixes, 2)
require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
require.Equal(t, "/zzz/", listUploads.CommonPrefixes[1].Prefix)
})
t.Run("truncated", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", 1)
require.Len(t, listUploads.Uploads, 0)
require.Len(t, listUploads.CommonPrefixes, 1)
require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
require.True(t, listUploads.IsTruncated)
listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
require.Len(t, listUploads.Uploads, 0)
require.Len(t, listUploads.CommonPrefixes, 1)
require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
require.True(t, listUploads.IsTruncated)
listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
require.Len(t, listUploads.Uploads, 0)
require.Len(t, listUploads.CommonPrefixes, 1)
require.Equal(t, "/zzz/", listUploads.CommonPrefixes[0].Prefix)
require.False(t, listUploads.IsTruncated)
})
})
t.Run("check markers", func(t *testing.T) {
t.Run("check only key-marker", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "", "", "", objName2, -1)
@ -294,6 +324,58 @@ func TestListMultipartUploads(t *testing.T) {
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
})
})
t.Run("check next markers", func(t *testing.T) {
t.Run("check both next-key-marker and next-upload-id-marker", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
require.False(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Empty(t, listUploads.NextUploadIDMarker)
require.Empty(t, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
})
t.Run("check only next-key-marker", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1)
require.False(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Empty(t, listUploads.NextUploadIDMarker)
require.Empty(t, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
})
})
}
func TestMultipartUploadSize(t *testing.T) {

View file

@ -21,6 +21,7 @@ type Cache struct {
systemCache *cache.SystemCache
accessCache *cache.AccessControlCache
networkInfoCache *cache.NetworkInfoCache
sessionMultipartCache *cache.ListMultipartSessionCache
}
// CachesConfig contains params for caches.
@ -33,6 +34,7 @@ type CachesConfig struct {
Buckets *cache.Config
System *cache.Config
AccessControl *cache.Config
MultipartList *cache.Config
NetworkInfo *cache.NetworkInfoCacheConfig
}
@ -48,6 +50,7 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
System: cache.DefaultSystemConfig(logger),
AccessControl: cache.DefaultAccessControlConfig(logger),
NetworkInfo: cache.DefaultNetworkInfoConfig(logger),
MultipartList: cache.DefaultListMultipartSessionConfig(logger),
}
}
@ -62,6 +65,7 @@ func NewCache(cfg *CachesConfig) *Cache {
systemCache: cache.NewSystemCache(cfg.System),
accessCache: cache.NewAccessControlCache(cfg.AccessControl),
networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo),
sessionMultipartCache: cache.NewListMultipartSessionCache(cfg.MultipartList),
}
}
@ -161,6 +165,14 @@ func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.Li
return c.sessionListCache.GetListSession(key)
}
func (c *Cache) GetListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) *data.ListMultipartSession {
if !c.accessCache.Get(owner, key.String()) {
return nil
}
return c.sessionMultipartCache.GetListMultipartSession(key)
}
func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) {
if err := c.sessionListCache.PutListSession(key, session); err != nil {
c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err))
@ -176,6 +188,21 @@ func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) {
c.accessCache.Delete(owner, key.String())
}
func (c *Cache) PutListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey, session *data.ListMultipartSession) {
if err := c.sessionMultipartCache.PutListMultipartSession(key, session); err != nil {
c.logger.Warn(logs.CouldntCacheListMultipartSession, zap.Error(err))
}
if err := c.accessCache.Put(owner, key.String()); err != nil {
c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
}
}
func (c *Cache) DeleteListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) {
c.sessionMultipartCache.DeleteListMultipartSession(key)
c.accessCache.Delete(owner, key.String())
}
func (c *Cache) GetTagging(owner user.ID, key string) map[string]string {
if !c.accessCache.Get(owner, key) {
return nil

View file

@ -79,6 +79,7 @@ type (
Prefix string
MaxKeys int
Marker string
// Bookmark contains Marker or ContinuationToken and is used for pagination and as part of a cache key for list session.
dkirillov marked this conversation as resolved Outdated

Not only. It also contains Marker for listing v1 and ContinuationToken for listing v2

Not only. It also contains `Marker` for listing v1 and `ContinuationToken` for listing v2
Bookmark string
}
@ -193,11 +194,10 @@ func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
return nil, nil, nil
}
session, err := n.getListLatestVersionsSession(ctx, p)
session, err := n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
if err != nil {
return nil, nil, err
}
generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator)
if err != nil {
@ -230,7 +230,7 @@ func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListi
return nil, false, nil
}
session, err := n.getListAllVersionsSession(ctx, p)
session, err := n.getListVersionsSession(ctx, p, false)
if err != nil {
return nil, false, err
}
@ -301,48 +301,31 @@ func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int,
}
}
func (n *Layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) {
return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
}
func (n *Layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
return n.getListVersionsSession(ctx, p, false)
}
func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
session := n.cache.GetListSession(owner, cacheKey)
if session == nil {
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
session = n.cache.GetListSession(owner, cacheKey)
if session == nil || session.Acquired.Swap(true) {
session = n.newSession(ctx)
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
return session, err
}
if session.Acquired.Swap(true) {
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
}
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
return session, nil
}
func (n *Layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
func (n *Layer) newSession(ctx context.Context) *data.ListSession {
session := &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
// save access box data for next requests
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
}
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
if err != nil {
return nil, err
}
return session, nil
return session
}
func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {

View file

@ -2,6 +2,7 @@ package layer
import (
"bytes"
"cmp"
"context"
"crypto/md5"
"encoding/base64"
@ -10,17 +11,20 @@ import (
"errors"
"fmt"
"io"
"slices"
"sort"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -499,47 +503,65 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
return &result, nil
}
multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix)
session, err := n.getListMultipartUploadsSession(ctx, p)
if err != nil {
return nil, err
}
uploads := make([]*UploadInfo, 0, len(multipartInfos))
uploads := make([]*UploadInfo, 0, p.MaxUploads)
uniqDirs := make(map[string]struct{})
uploadsCount := 0
if session.Next != nil {
upload := uploadInfoFromMultipartInfo(session.Next, p.Prefix, p.Delimiter)
switch {
case upload.IsDir && isUniqDir(upload.Key, uniqDirs):
uniqDirs[upload.Key] = struct{}{}
fallthrough
dkirillov marked this conversation as resolved Outdated

Why don't just for uploadsCount < p.MaxUploads { ?

Why don't just `for uploadsCount < p.MaxUploads {` ?

I need to check next element to determine Truncated state. Extracted this check outside the loop

I need to check next element to determine Truncated state. Extracted this check outside the loop
case !upload.IsDir:
uploads = append(uploads, upload)
uploadsCount++
}

I would use errors.Is(err, io.EOF). In service/tree also

I would use `errors.Is(err, io.EOF)`. In `service/tree` also
}

Probably we should return error rather than continue. And consider cancel context (also for regular listing) @alexvanin

Probably we should return error rather than continue. And consider cancel context (also for regular listing) @alexvanin
for _, multipartInfo := range multipartInfos {
info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter)
if info != nil {
if info.IsDir {
if _, ok := uniqDirs[info.Key]; ok {
info := session.Next
for uploadsCount < p.MaxUploads {
info, err = session.Stream.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
n.reqLogger(ctx).Warn(logs.CouldNotGetMultipartUploadInfo, zap.Error(err))
continue

It seems now we can skip checking prefix inside uploadInfoFromMultipart and result always be non nil

It seems now we can skip checking prefix inside `uploadInfoFromMultipart` and result always be non nil

This isn't changed

This isn't changed

Without checking there is a fail in a test - a call with prefix "/my" returns also item with "/zzz" prefix. Because GetSubTreeStream trims prefix to "/"

Without checking there is a fail in a [test](https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/api/handler/multipart_upload_test.go#L262-L267) - a call with prefix "/my" returns also item with "/zzz" prefix. Because GetSubTreeStream trims prefix to "/"

Then we should fix streaming in tree, because we it must return only nodes that have provided prefix. (/my in this case, and object /zzz/object/name3 doesn't have such prefix )

Then we should fix streaming in tree, because we it must return only nodes that have provided prefix. (`/my` in this case, and object `/zzz/object/name3` doesn't have such prefix )
}
uniqDirs[info.Key] = struct{}{}
upload := uploadInfoFromMultipartInfo(info, p.Prefix, p.Delimiter)
if upload.IsDir {
if !isUniqDir(upload.Key, uniqDirs) {
continue
}
uploads = append(uploads, info)
uniqDirs[upload.Key] = struct{}{}
}
uploads = append(uploads, upload)

I'm not sure if it's ok to separate this invocation. At least as it's done now.
See test:

diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go
index 6836fc5d..dcff5e5b 100644
--- a/api/handler/multipart_upload_test.go
+++ b/api/handler/multipart_upload_test.go
@@ -266,6 +266,30 @@ func TestListMultipartUploads(t *testing.T) {
                require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID)
        })
 
+       t.Run("check delimiter", func(t *testing.T) {
+               t.Run("not truncated", func(t *testing.T) {
+                       listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", -1)
+                       require.Len(t, listUploads.Uploads, 0)
+                       require.Len(t, listUploads.CommonPrefixes, 2)
+                       require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
+                       require.Equal(t, "/zzz/", listUploads.CommonPrefixes[1].Prefix)
+               })
+
+               t.Run("truncated", func(t *testing.T) {
+                       listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", 1)
+                       require.Len(t, listUploads.Uploads, 0)
+                       require.Len(t, listUploads.CommonPrefixes, 1)
+                       require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
+                       require.True(t, listUploads.IsTruncated)
+
+                       listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
+                       require.Len(t, listUploads.Uploads, 0)
+                       require.Len(t, listUploads.CommonPrefixes, 1)
+                       require.Equal(t, "/zzz/", listUploads.CommonPrefixes[0].Prefix)
+                       require.False(t, listUploads.IsTruncated)
+               })
+       })
+

I'm not sure if it's ok to separate this invocation. At least as it's done now. See test: ```diff diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go index 6836fc5d..dcff5e5b 100644 --- a/api/handler/multipart_upload_test.go +++ b/api/handler/multipart_upload_test.go @@ -266,6 +266,30 @@ func TestListMultipartUploads(t *testing.T) { require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID) }) + t.Run("check delimiter", func(t *testing.T) { + t.Run("not truncated", func(t *testing.T) { + listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", -1) + require.Len(t, listUploads.Uploads, 0) + require.Len(t, listUploads.CommonPrefixes, 2) + require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix) + require.Equal(t, "/zzz/", listUploads.CommonPrefixes[1].Prefix) + }) + + t.Run("truncated", func(t *testing.T) { + listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", 1) + require.Len(t, listUploads.Uploads, 0) + require.Len(t, listUploads.CommonPrefixes, 1) + require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix) + require.True(t, listUploads.IsTruncated) + + listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1) + require.Len(t, listUploads.Uploads, 0) + require.Len(t, listUploads.CommonPrefixes, 1) + require.Equal(t, "/zzz/", listUploads.CommonPrefixes[0].Prefix) + require.False(t, listUploads.IsTruncated) + }) + }) + ```

Modified a bit this test (because there are two objects in /my/ folder so this common prefix will appear twice and only on the third call it will be /zzz/. And fixed this scenario. But I'm not quite sure whether I understand the problem with separate invocations you proposed, could you clarify a bit?

Modified a bit this test (because there are two objects in `/my/` folder so this common prefix will appear twice and only on the third call it will be `/zzz/`. And fixed this scenario. But I'm not quite sure whether I understand the problem with `separate invocations` you proposed, could you clarify a bit?
uploadsCount++
}
sort.Slice(uploads, func(i, j int) bool {
if uploads[i].Key == uploads[j].Key {
return uploads[i].UploadID < uploads[j].UploadID
}
return uploads[i].Key < uploads[j].Key
})
if p.KeyMarker != "" {
if p.UploadIDMarker != "" {
uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads)
isTruncated := true
next, err := session.Stream.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
isTruncated = false
} else {
uploads = trimAfterUploadKey(p.KeyMarker, uploads)
return nil, err
dkirillov marked this conversation as resolved Outdated

It seems here we must use next rather than info.
Please add tests for that case and others

It seems here we must use `next` rather than `info`. Please add tests for that case and others

In previous implementation NextKeyMarker and NextUploadIDMarker was brought from the last element of current list. And the new one acts the same. So using info in this case is correct. Anyway, I'm going to write tests for this

In previous implementation `NextKeyMarker` and `NextUploadIDMarker` was brought from the last element of current list. And the new one acts the same. So using `info` in this case is correct. Anyway, I'm going to write tests for this

Well, maybe I point to different error. Output sometime doesn't contain NextUploadIDMarker

$ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084   list-multipart-uploads --bucket test   | grep UploadId
            "UploadId": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
            "UploadId": "8fd7e720-a5c4-4e56-86c6-dc99146199c7",
            "UploadId": "8f60cdb1-7e62-41bb-98c1-567498be6dc2",
            "UploadId": "75988c0a-d94f-4667-9087-056af63acefc",
            "UploadId": "dda0fa13-cc27-4c52-aba6-2aa561d39b19",


$ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084   list-multipart-uploads --bucket test   --max-uploads 1
{
    "Bucket": "test",
    "KeyMarker": "",
    "NextKeyMarker": "dir/dir/obj",
    "Prefix": "",
    "NextUploadIdMarker": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
    "MaxUploads": 1,
    "IsTruncated": true,
    "Uploads": [
        {
            "UploadId": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
            "Key": "dir/dir/obj",
            "Initiated": "2024-11-12T09:34:43+00:00",
            "StorageClass": "STANDARD",
            "Owner": {
                "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
                "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
            },
            "Initiator": {
                "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
                "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
            }
        }
    ]
}

$ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084   list-multipart-uploads --bucket test   --max-uploads 1 --key-marker dir/dir/obj --upload-id-marker bd50e12d-fc4b-450b-afb1-ed082f5e2ef9
{
    "Bucket": "test",
    "KeyMarker": "dir/dir/obj",
    "UploadIdMarker": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
    "Prefix": "",
    "MaxUploads": 1,
    "IsTruncated": false,
    "Uploads": [
        {
            "UploadId": "8fd7e720-a5c4-4e56-86c6-dc99146199c7",
            "Key": "dir/dir/obj",
            "Initiated": "2024-11-08T14:21:51+00:00",
            "StorageClass": "STANDARD",
            "Owner": {
                "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
                "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
            },
            "Initiator": {
                "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
                "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
            }
        }
    ]
}

Well, maybe I point to different error. Output sometime doesn't contain NextUploadIDMarker ``` $ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084 list-multipart-uploads --bucket test | grep UploadId "UploadId": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9", "UploadId": "8fd7e720-a5c4-4e56-86c6-dc99146199c7", "UploadId": "8f60cdb1-7e62-41bb-98c1-567498be6dc2", "UploadId": "75988c0a-d94f-4667-9087-056af63acefc", "UploadId": "dda0fa13-cc27-4c52-aba6-2aa561d39b19", $ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084 list-multipart-uploads --bucket test --max-uploads 1 { "Bucket": "test", "KeyMarker": "", "NextKeyMarker": "dir/dir/obj", "Prefix": "", "NextUploadIdMarker": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9", "MaxUploads": 1, "IsTruncated": true, "Uploads": [ { "UploadId": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9", "Key": "dir/dir/obj", "Initiated": "2024-11-12T09:34:43+00:00", "StorageClass": "STANDARD", "Owner": { "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt", "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt" }, "Initiator": { "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt", "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt" } } ] } $ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084 list-multipart-uploads --bucket test --max-uploads 1 --key-marker dir/dir/obj --upload-id-marker bd50e12d-fc4b-450b-afb1-ed082f5e2ef9 { "Bucket": "test", "KeyMarker": "dir/dir/obj", "UploadIdMarker": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9", "Prefix": "", "MaxUploads": 1, "IsTruncated": false, "Uploads": [ { "UploadId": "8fd7e720-a5c4-4e56-86c6-dc99146199c7", "Key": "dir/dir/obj", "Initiated": "2024-11-08T14:21:51+00:00", "StorageClass": "STANDARD", "Owner": { "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt", "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt" }, "Initiator": { "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt", "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt" } } ] } ```

Fixed and added test for this scenario

Fixed and added test for this scenario
}
}
if len(uploads) > p.MaxUploads {
if isTruncated && info != nil {
// put to session redundant multipart upload which we read to check for EOF
session.Next = next
result.IsTruncated = true
uploads = uploads[:p.MaxUploads]
result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID
result.NextKeyMarker = uploads[len(uploads)-1].Key
result.NextUploadIDMarker = info.UploadID
result.NextKeyMarker = info.Key
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, info.Key, info.UploadID)
n.putListMultipartUploadsSession(ctx, session, cacheKey)
}
for _, ov := range uploads {
dkirillov marked this conversation as resolved
Review

Result must be sorted by upload-id if some uploads have the same key

Result must be sorted by upload-id if some uploads have the same key
@ -550,9 +572,62 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
}
}
slices.SortFunc(result.Uploads, func(a, b *UploadInfo) int {
keyCmp := cmp.Compare(a.Key, b.Key)
if keyCmp == 0 {
dkirillov marked this conversation as resolved Outdated

Why do this if differ from the similar from object listing?

if session == nil || session.Acquired.Swap(true) {

Why do this if differ from the similar from object listing? https://git.frostfs.info/nzinkevich/frostfs-s3-gw/src/commit/e673d727e9c9486e0b70127fb224ed58b6842fb3/api/layer/listing.go#L309

Changed method to be more similar

Changed method to be more similar
return cmp.Compare(a.UploadID, b.UploadID)
dkirillov marked this conversation as resolved Outdated

We cannot use ctx from current request. It will be canceled after first request be finished

We cannot use `ctx` from current request. It will be canceled after first request be finished
}
dkirillov marked this conversation as resolved Outdated

We also have to add AccessBox to this context to be able to get access to tree service. Otherwise currently we get

2024-11-12T11:52:19.895+0300    error   request failed  {"request_id": "ef743d2a-0f81-4f02-ae61-82afac3db746", "method": "ListMultipartUploads", "bucket": "test", "object": "", "description": "could not list multipart uploads", "user": "NbUgTSFvPmsRxmGeWpuuGeJUoRoi6PErcM", "error": "access denied: address s01.frostfs.devenv:8080: access denied: rpc error: code = Unknown desc = status: code = 2048 message = access to object operation denied", "status": 403}

See how this done for regular listing

if bd, err := middleware.GetBoxData(ctx); err == nil {

We also have to add AccessBox to this context to be able to get access to tree service. Otherwise currently we get ``` 2024-11-12T11:52:19.895+0300 error request failed {"request_id": "ef743d2a-0f81-4f02-ae61-82afac3db746", "method": "ListMultipartUploads", "bucket": "test", "object": "", "description": "could not list multipart uploads", "user": "NbUgTSFvPmsRxmGeWpuuGeJUoRoi6PErcM", "error": "access denied: address s01.frostfs.devenv:8080: access denied: rpc error: code = Unknown desc = status: code = 2048 message = access to object operation denied", "status": 403} ``` See how this done for regular listing https://git.frostfs.info/nzinkevich/frostfs-s3-gw/src/commit/aaca4f84b8ba29031481c54f6bdcae90909287a0/api/layer/listing.go#L324

Fixed

Fixed
return keyCmp
})
return &result, nil
}
func (n *Layer) putListMultipartUploadsSession(ctx context.Context, session *data.ListMultipartSession, cacheKey cache.ListMultipartSessionKey) {
session.Acquired.Store(false)
n.cache.PutListMultipartSession(n.BearerOwner(ctx), cacheKey, session)
}
func (n *Layer) getListMultipartUploadsSession(ctx context.Context, p *ListMultipartUploadsParams) (session *data.ListMultipartSession, err error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, p.KeyMarker, p.UploadIDMarker)
session = n.cache.GetListMultipartSession(owner, cacheKey)
if session == nil || session.Acquired.Swap(true) {
session = newListMultipartSession(ctx)
params := data.MultipartStreamParams{
Prefix: p.Prefix,
KeyMarker: p.KeyMarker,
UploadIDMarker: p.UploadIDMarker,
}
session.Stream, err = n.treeService.GetMultipartUploadsByPrefix(session.Context, p.Bkt, params)
if err != nil {
return nil, err
}
}
// if after reading next object from stream in session the current cache value already
// doesn't match with next token in cache key
n.cache.DeleteListMultipartSession(owner, cacheKey)
return session, nil
}
func newListMultipartSession(ctx context.Context) *data.ListMultipartSession {
reqCtx, cancel := context.WithCancel(context.Background())
session := &data.ListMultipartSession{
CommonSession: data.CommonSession{
Context: reqCtx,
Cancel: cancel,
},
}
// save access box data for next requests
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
}
return session
}
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
multipartInfo, parts, err := n.getUploadParts(ctx, p)
if err != nil {
@ -677,44 +752,10 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
return multipartInfo, res, nil
}
func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo {
var res []*UploadInfo
if len(uploads) != 0 && uploads[len(uploads)-1].Key < key {
return res
}
for _, obj := range uploads {
if obj.Key >= key && obj.UploadID > id {
res = append(res, obj)
}
}
return res
}
func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
var result []*UploadInfo
if len(objects) != 0 && objects[len(objects)-1].Key <= key {
return result
}
for i, obj := range objects {
if obj.Key > key {
result = objects[i:]
break
}
}
return result
}
func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo {
var isDir bool
key := uploadInfo.Key
if !strings.HasPrefix(key, prefix) {
return nil
}
if len(delimiter) > 0 {
tail := strings.TrimPrefix(key, prefix)
index := strings.Index(tail, delimiter)
@ -732,3 +773,10 @@ func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimit
Created: uploadInfo.Created,
}
}
func isUniqDir(key string, uniqDirs map[string]struct{}) bool {
if _, ok := uniqDirs[key]; ok {
return false
}
return true
}

View file

@ -1,108 +0,0 @@
package layer
import (
"sort"
"testing"
"github.com/stretchr/testify/require"
)
func TestTrimAfterUploadIDAndKey(t *testing.T) {
uploads := []*UploadInfo{
{Key: "j", UploadID: "k"}, // key < id <
{Key: "l", UploadID: "p"}, // key < id >
{Key: "n", UploadID: "m"}, // key = id <
{Key: "n", UploadID: "o"}, // pivot
{Key: "n", UploadID: "q"}, // key = id >
{Key: "p", UploadID: "h"}, // key > id <
{Key: "q", UploadID: "r"}, // key > id >
}
expectedUploadsListsIndexes := [][]int{
{1, 2, 3, 4, 6},
{4, 6},
{3, 4, 6},
{4, 6},
{6},
{6},
{},
}
sort.Slice(uploads, func(i, j int) bool {
if uploads[i].Key == uploads[j].Key {
return uploads[i].UploadID < uploads[j].UploadID
}
return uploads[i].Key < uploads[j].Key
})
length := len(uploads)
t.Run("the last element's key is less, upload id is less", func(t *testing.T) {
keys := trimAfterUploadIDAndKey("z", "a", uploads)
require.Empty(t, keys)
require.Len(t, uploads, length)
})
t.Run("the last element's key is less, upload id is greater", func(t *testing.T) {
keys := trimAfterUploadIDAndKey("z", "a", uploads)
require.Empty(t, keys)
require.Len(t, uploads, length)
})
t.Run("check for uploads", func(t *testing.T) {
for i, u := range uploads {
list := trimAfterUploadIDAndKey(u.Key, u.UploadID, uploads)
require.Equal(t, len(list), len(expectedUploadsListsIndexes[i]))
for j, idx := range expectedUploadsListsIndexes[i] {
require.Equal(t, list[j], uploads[idx])
}
}
})
}
func TestTrimAfterUploadKey(t *testing.T) {
var (
uploadKeys = []string{"e", "f", "f", "g", "h", "i"}
theSameKeyIdx = []int{1, 2}
diffKeyIdx = []int{0, 3}
lastIdx = len(uploadKeys) - 1
)
uploadsInfos := make([]*UploadInfo, 0, len(uploadKeys))
for _, k := range uploadKeys {
uploadsInfos = append(uploadsInfos, &UploadInfo{Key: k})
}
t.Run("empty list", func(t *testing.T) {
keys := trimAfterUploadKey("f", []*UploadInfo{})
require.Len(t, keys, 0)
})
t.Run("the last element is less than a key", func(t *testing.T) {
keys := trimAfterUploadKey("j", uploadsInfos)
require.Empty(t, keys)
require.Len(t, uploadsInfos, len(uploadKeys))
})
t.Run("different keys in sequence", func(t *testing.T) {
for _, i := range diffKeyIdx {
keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos)
require.Len(t, keys, len(uploadKeys)-i-1)
require.Equal(t, keys, uploadsInfos[i+1:])
require.Len(t, uploadsInfos, len(uploadKeys))
}
})
t.Run("the same keys in the sequence first element", func(t *testing.T) {
for _, i := range theSameKeyIdx {
keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos)
require.Len(t, keys, 3)
require.Equal(t, keys, uploadsInfos[3:])
require.Len(t, uploadsInfos, len(uploadKeys))
}
})
t.Run("last element", func(t *testing.T) {
keys := trimAfterUploadKey(uploadKeys[lastIdx], uploadsInfos)
require.Empty(t, keys)
})
}

View file

@ -53,7 +53,7 @@ type Service interface {
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error)
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
// AddPart puts a node to a system tree as a child of appropriate multipart upload

View file

@ -328,7 +328,7 @@ func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data
return nil
}
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, string) ([]*data.MultipartInfo, error) {
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, data.MultipartStreamParams) (data.MultipartInfoStream, error) {
panic("implement me")
}

View file

@ -1007,6 +1007,8 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime)
cacheCfg.MultipartList.Lifetime = fetchCacheLifetime(v, l, cfgMultipartListCacheLifetime, cacheCfg.MultipartList.Lifetime)
cacheCfg.MultipartList.Size = fetchCacheSize(v, l, cfgMultipartListCacheSize, cacheCfg.MultipartList.Size)
return cacheCfg
}

View file

@ -141,6 +141,8 @@ const ( // Settings.
cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime"
cfgFrostfsIDCacheSize = "cache.frostfsid.size"
cfgNetworkInfoCacheLifetime = "cache.network_info.lifetime"
cfgMultipartListCacheLifetime = "cache.multipart_list_session.lifetime"
cfgMultipartListCacheSize = "cache.multipart_list_session.size"
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"

View file

@ -101,9 +101,12 @@ S3_GW_CACHE_OBJECTS_SIZE=1000000
# Cache which keeps lists of objects in buckets
S3_GW_CACHE_LIST_LIFETIME=1m
S3_GW_CACHE_LIST_SIZE=100000
# Cache which keeps listing session
# Cache which keeps listing objects session
S3_GW_CACHE_LIST_SESSION_LIFETIME=1m
S3_GW_CACHE_LIST_SESSION_SIZE=100
# Cache which keeps listing multipart uploads session
S3_GW_CACHE_MULTIPART_LIST_SESSION_LIFETIME=1m
S3_GW_CACHE_MULTIPART_LIST_SESSION_SIZE=100
# Cache which contains mapping of bucket name to bucket info
S3_GW_CACHE_BUCKETS_LIFETIME=1m
S3_GW_CACHE_BUCKETS_SIZE=1000

View file

@ -129,6 +129,9 @@ cache:
list_session:
lifetime: 1m
size: 100
multipart_list_session:
lifetime: 1m
size: 100
# Cache which contains mapping of nice name to object addresses
names:
lifetime: 1m

View file

@ -426,6 +426,9 @@ cache:
list_session:
lifetime: 1m
size: 100
multipart_list_session:
lifetime: 1m
size: 10000
names:
lifetime: 1m
size: 1000
@ -453,10 +456,11 @@ cache:
```
| Parameter | Type | Default value | Description |
|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
|--------------------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
| `multipart_list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing of multipart uploads. |
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings etc. |

View file

@ -8,6 +8,8 @@ const (
UpdateAccessCredObjectIntoFrostFS = "update access cred object into FrostFS" // Info in ../../authmate/authmate.go
MetricsAreDisabled = "metrics are disabled" // Warn in ../../metrics/app.go
FoundMoreThanOneUnversionedNode = "found more than one unversioned node" // Debug in ../../pkg/service/tree/tree.go
CouldNotParseTreeNode = "could not parse tree node" // Error in ../../pkg/service/tree/tree.go
CouldNotFormFilePath = "could not form file path" // Error in ../../pkg/service/tree/tree.go
ServiceIsRunning = "service is running" // Info in ../../cmd/s3-gw/service.go
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../cmd/s3-gw/service.go
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../cmd/s3-gw/service.go
@ -67,6 +69,7 @@ const (
CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go
CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go
MismatchedObjEncryptionInfo = "mismatched obj encryptionInfo" // Warn in ../../api/layer/multipart_upload.go
CouldNotGetMultipartUploadInfo = "could not get multipart upload info" // Warn in ../../api/layer/multipart_upload.go
UploadPart = "upload part" // Debug in ../../api/layer/multipart_upload.go
CouldntDeleteOldPartObject = "couldn't delete old part object" // Error in ../../api/layer/multipart_upload.go
CouldNotPutCompletedObject = "could not put a completed object (multipart upload)" // Error in ../../api/layer/multipart_upload.go
@ -89,6 +92,7 @@ const (
CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go
CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go
CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go
CouldntCacheListMultipartSession = "couldn't cache list multipart session" // Warn in ../../api/layer/cache.go
CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go

View file

@ -84,7 +84,9 @@ var (
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
ErrGatewayTimeout = frostfs.ErrGatewayTimeout
errNodeDoesntContainFileName = fmt.Errorf("node doesn't contain FileName")
errNodeDoesntContainFileName = errors.New("node doesn't contain FileName")
errParentPathNotFound = errors.New("couldn't get parent path")
)
const (
@ -1061,7 +1063,7 @@ func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *
var filepath string
if !s.intermediateRootID.Equal(trNode.ID) {
if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil {
if filepath, err = formFilePath(trNode, fileName, s.namesMap); err != nil {
return nil, false, fmt.Errorf("invalid node order: %w", err)
}
} else {
@ -1165,58 +1167,18 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr
return intermediateNodes, nil
}
func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
func (c *Tree) getSubTreeByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, []uint64, error) {
rootID, _, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, "", nil
return nil, nil, nil
}
return nil, "", err
return nil, nil, err
}
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2, false)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
stream, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
nodesMap := make(map[string][]NodeResponse, len(subTree))
for _, node := range subTree {
if MultiID(rootID).Equal(node.GetNodeID()) {
continue
}
fileName := getFilename(node)
if !strings.HasPrefix(fileName, tailPrefix) {
continue
}
nodes := nodesMap[fileName]
// Add all nodes if flag latestOnly is false.
// Add all intermediate nodes
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
if len(nodes) == 0 {
nodes = []NodeResponse{node}
} else if !latestOnly || isIntermediate(node) {
nodes = append(nodes, node)
} else if isIntermediate(nodes[0]) {
nodes = append([]NodeResponse{node}, nodes...)
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
nodes[0] = node
}
nodesMap[fileName] = nodes
}
result := make([]NodeResponse, 0, len(subTree))
for _, nodes := range nodesMap {
result = append(result, nodes...)
}
return result, strings.TrimSuffix(prefix, tailPrefix), nil
return stream, rootID, err
}
func getFilename(node NodeResponse) string {
@ -1237,20 +1199,19 @@ func isIntermediate(node NodeResponse) bool {
return node.GetMeta()[0].GetKey() == FileNameKey
}
func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) {
var filepath string
func formFilePath(node *treeNode, fileName string, namesMap map[uint64]string) (string, error) {
var filePath string
for i, id := range node.GetParentID() {
for i, id := range node.ParentID {
parentPath, ok := namesMap[id]
if !ok {
return "", fmt.Errorf("couldn't get parent path")
return "", errParentPathNotFound
}
filePath = parentPath + separator + fileName
namesMap[node.ID[i]] = filePath
}
filepath = parentPath + separator + fileName
namesMap[node.GetNodeID()[i]] = filepath
}
return filepath, nil
return filePath, nil
dkirillov marked this conversation as resolved Outdated

It's better to use more meaningful name.
By the way why do we need separate function? And why do we use only 0th parentID below (namesMap[curNode.ParentID[0]])?

It's better to use more meaningful name. By the way why do we need separate function? And why do we use only 0th parentID below (`namesMap[curNode.ParentID[0]]`)?

It seems to me that multipart info can't be split so I use only first element
https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/pkg/service/tree/tree.go#L332-L334

It seems to me that multipart info can't be split so I use only first element https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/pkg/service/tree/tree.go#L332-L334
}
func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
@ -1267,10 +1228,6 @@ func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
return tNode, fileName, nil
}
func formLatestNodeKey(parentID uint64, fileName string) string {
return strconv.FormatUint(parentID, 10) + "." + fileName
}
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
}
@ -1313,84 +1270,137 @@ func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketIn
return err
}
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
if err != nil {
type multipartInfoStream struct {
log *zap.Logger
nodePaths map[uint64]string
rootID MultiID
// childStream stream of children nodes of prefix node.
childStream SubTreeStream
// currentStream stream of children's nodes with max depth.
currentStream SubTreeStream
treeService ServiceClient
bktInfo *data.BucketInfo
uploadID string
keyMarker string
headPrefix string
prefix string
}
func (m *multipartInfoStream) Next(ctx context.Context) (*data.MultipartInfo, error) {
var tNode *treeNode
var filePath string
if m.currentStream == nil {
var err error
if m.currentStream, err = m.openNewStream(ctx); err != nil {
return nil, err
}
var result []*data.MultipartInfo
for _, node := range subTreeNodes {
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix)
}
for {
var err error
tNode, err = getTreeNodeFromStream(m.currentStream)
if err != nil {
if errors.Is(err, io.EOF) {

Do we really need new variable err2?
It seems we can write

if m.currentStream, err = m.openNewStream(ctx); err != nil {
	return nil, err
}

because of continue below

Do we really need new variable `err2`? It seems we can write ```golang if m.currentStream, err = m.openNewStream(ctx); err != nil { return nil, err } ``` because of `continue` below
if m.currentStream, err = m.openNewStream(ctx); err != nil {
return nil, err
}
result = append(result, multipartUploads...)
}
return result, nil
}
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
// sorting in getSubTree leads to skipping nodes that doesn't have FileName attribute
// so when we are only interested in multipart nodes, we can set this flag
// (despite we sort multiparts in above layer anyway)
// to skip its children (parts) that don't have FileName
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth, true)
if err != nil {
return nil, err
}
var parentPrefix string
if parentFilePath != "" { // The root of subTree can also have a parent
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
}
var filepath string
namesMap := make(map[uint64]string, len(subTree))
multiparts := make(map[string][]*data.MultipartInfo, len(subTree))
for i, node := range subTree {
tNode, fileName, err := parseTreeNode(node)
if err != nil {
continue
}
if i != 0 {
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
return nil, fmt.Errorf("invalid node order: %w", err)
return nil, err
}
} else {
filepath = parentPrefix + fileName
for _, id := range tNode.ID {
namesMap[id] = filepath
var ok bool
if filePath, ok = m.checkTreeNode(tNode); ok {

Why not just fileName, ok := tNode.FileName() ?

Why not just `fileName, ok := tNode.FileName()` ?
break
}
}
dkirillov marked this conversation as resolved Outdated

Why do we use only tNode.ID[0]?

Why do we use only `tNode.ID[0]`?
multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode)
if err != nil || multipartInfo.Finished {
continue
return newMultipartInfoFromTreeNode(m.log, filePath, tNode)

Why do we treat this error special? It seems normally we traverse node in right order and if we don't see parent it's a bug in storage node. Root node filepath we can set initially to this map

Why do we treat this error special? It seems normally we traverse node in right order and if we don't see parent it's a bug in storage node. Root node filepath we can set initially to this map
}

I suppose we should check if tNode.IsSplit() and skip if necessary. And only after that use tNode.ID[0]

I suppose we should check if `tNode.IsSplit()` and skip if necessary. And only after that use `tNode.ID[0]`
for _, id := range node.GetParentID() {
key := formLatestNodeKey(id, fileName)
multipartInfos, ok := multiparts[key]
// openNewStream creates subtree stream from childStream`s node.
func (m *multipartInfoStream) openNewStream(ctx context.Context) (SubTreeStream, error) {
node, err := getTreeNodeFromStream(m.childStream)
if err != nil {
return nil, err
}
if m.rootID.Equal(node.ID) {
// skip root node
return m.openNewStream(ctx)
}

It seems we can simplify this to

if m.keyMarker < filePath || (m.keyMarker == filePath && m.uploadID < id) {
It seems we can simplify this to ```golang if m.keyMarker < filePath || (m.keyMarker == filePath && m.uploadID < id) { ```
stream, err := m.treeService.GetSubTreeStream(ctx, m.bktInfo, systemTree, node.ID, maxGetSubTreeDepth)
if err != nil {
return nil, err
}
return stream, nil
}
func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error) {
node, err := stream.Next()
if err != nil {
return nil, err
}
tNode, err := newTreeNode(node)
if err != nil {
return nil, err

Please, use the similar check

if !s.rootID.Equal(node.GetNodeID()) && strings.HasPrefix(getFilename(node), s.tailPrefix) {

Please, use the similar check https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/commit/a12fea8a5b1e72cdd8cc10eb7278033305a24a3b/pkg/service/tree/tree.go#L987
}
return tNode, nil
}
dkirillov marked this conversation as resolved Outdated

Consider using approach like here

subTree, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)


Otherwise if we will have objects:

  • dir/objdir1/obj
  • dir/objdir1/obj2
  • dir/obj1/obj1
  • ...
  • dir/obj1/obj1000000

and request will contain prefix: dir/objdir we will list that greater than 1000000 objects and just filter them (but we will get them anyway from storage) despite we are interested in only 2 of them

Consider using approach like here https://git.frostfs.info/nzinkevich/frostfs-s3-gw/src/commit/aaca4f84b8ba29031481c54f6bdcae90909287a0/pkg/service/tree/tree.go#L1114 Otherwise if we will have objects: * `dir/objdir1/obj` * `dir/objdir1/obj2` * `dir/obj1/obj1` * ... * `dir/obj1/obj1000000` and request will contain `prefix: dir/objdir` we will list that greater than 1000000 objects and just filter them (but we will get them anyway from storage) despite we are interested in only 2 of them

Changed the implementation. Now, at first, c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2) is called, and then the GetSubTreeStream with max depth is applied to the children as needed. Which should prevents from getting redundant nodes

Changed the implementation. Now, at first, `c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)` is called, and then the `GetSubTreeStream` with max depth is applied to the children as needed. Which should prevents from getting redundant nodes
func (m *multipartInfoStream) checkTreeNode(tNode *treeNode) (string, bool) {
var ok bool
var err error
if tNode.IsSplit() {
return "", false
}

This function should be
func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error)
or
func (m *multipartInfoStream) getTreeNodeFromStream() (*treeNode, error)

This function should be `func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error)` or `func (m *multipartInfoStream) getTreeNodeFromStream() (*treeNode, error)`
fileName, ok := tNode.FileName()
if !ok {
multipartInfos = []*data.MultipartInfo{multipartInfo}
} else {
multipartInfos = append(multipartInfos, multipartInfo)
return "", false
}
multiparts[key] = multipartInfos
filePath, err := formFilePath(tNode, fileName, m.nodePaths)
if err != nil {
filePath = fileName
m.nodePaths[tNode.ID[0]] = filePath
}
filePath = m.headPrefix + filePath
if !strings.HasPrefix(filePath, m.prefix) {
return "", false
}
if _, ok = tNode.Meta[finishedKV]; ok {
return "", false
}
if id, ok := tNode.Meta[uploadIDKV]; ok {
if filePath > m.keyMarker || (filePath == m.keyMarker && m.uploadID != "" && id > m.uploadID) {
return filePath, true
}
}
result := make([]*data.MultipartInfo, 0, len(multiparts))
for _, multipartInfo := range multiparts {
result = append(result, multipartInfo...)
return "", false
}
return result, nil
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error) {
stream, rootID, err := c.getSubTreeByPrefixStream(ctx, bktInfo, systemTree, params.Prefix)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, nil
}
return nil, err
}
return &multipartInfoStream{
log: c.reqLogger(ctx),
rootID: rootID,
childStream: stream,
nodePaths: make(map[uint64]string),
treeService: c.service,
bktInfo: bktInfo,
uploadID: params.UploadIDMarker,
keyMarker: params.KeyMarker,
prefix: params.Prefix,
headPrefix: strings.TrimRightFunc(params.Prefix, func(r rune) bool {
return r != '/'
}),
}, nil
}
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {

View file

@ -12,6 +12,7 @@ import (
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
@ -359,3 +360,127 @@ func TestSplitTreeMultiparts(t *testing.T) {
require.NoError(t, err)
require.Len(t, parts, 1)
}
func TestCheckTreeNode(t *testing.T) {
treeNodes := []*treeNode{
// foo/
{
ID: []uint64{1},
ParentID: []uint64{0},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "foo",
},
},
// foo/ant
{
ID: []uint64{2},
ParentID: []uint64{1},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "ant",
"UploadId": "d",
},
},
// foo/bar
{
ID: []uint64{3},
ParentID: []uint64{1},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "bar",
"UploadId": "c",
},
},
// foo/finished
{
ID: []uint64{4},
ParentID: []uint64{1},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "finished",
"UploadId": "e",
"Finished": "True",
},
},
// hello/
{
ID: []uint64{5},
ParentID: []uint64{0},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "hello",
},
},
// hello/world
{
ID: []uint64{6},
ParentID: []uint64{5},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "world",
"UploadId": "a",
},
},
// hello/world
{
ID: []uint64{7},
ParentID: []uint64{5},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "world",
"UploadId": "b",
},
},
}
info := multipartInfoStream{
log: zap.NewNop(),
rootID: []uint64{0},
}
t.Run("without markers", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, true, true, false, false, true, true}, results)
})
t.Run("with prefix", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
info.prefix = "hello"
info.headPrefix = ""
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, false, false, false, false, true, true}, results)
})
t.Run("with key marker", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
info.keyMarker = "foo/bar"
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, false, false, false, false, true, true}, results)
})
t.Run("with key and upload id markers", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
info.keyMarker = "hello/world"
info.uploadID = "a"
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, false, false, false, false, false, true}, results)
})
}