[#469] List multipart uploads streaming #527

Open
nzinkevich wants to merge 1 commit from nzinkevich/frostfs-s3-gw:multiparts_list_streaming into master
21 changed files with 678 additions and 363 deletions

109
api/cache/listmultipart.go vendored Normal file
View file

@ -0,0 +1,109 @@
package cache
import (
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/bluele/gcache"
"go.uber.org/zap"
)
type (
// ListMultipartSessionCache contains cache for list multiparts session (during pagination).
ListMultipartSessionCache struct {
cache gcache.Cache
logger *zap.Logger
}
// ListMultipartSessionKey is a key to find a ListMultipartSessionCache's entry.
ListMultipartSessionKey struct {
cid cid.ID
prefix string
marker string
uploadID string
}
)
const (
// DefaultListMultipartSessionCacheLifetime is a default lifetime of entries in cache of ListMultipartUploads.
DefaultListMultipartSessionCacheLifetime = time.Minute
// DefaultListMultipartSessionCacheSize is a default size of cache of ListMultipartUploads.
DefaultListMultipartSessionCacheSize = 100

If we add new cache we also should be able configure this

If we add new cache we also should be able configure this

Added config params

Added config params
Please mention these params in https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/config/config.yaml and https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/config/config.env Also we can write `time.Minute` instead of `time.Second * 60`
)
// DefaultListMultipartSessionConfig returns new default cache expiration values.
func DefaultListMultipartSessionConfig(logger *zap.Logger) *Config {
return &Config{
Size: DefaultListMultipartSessionCacheSize,
Lifetime: DefaultListMultipartSessionCacheLifetime,
Logger: logger,
}
}
func (k *ListMultipartSessionKey) String() string {
return k.cid.EncodeToString() + k.prefix + k.marker + k.uploadID
}
// NewListMultipartSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries.
func NewListMultipartSessionCache(config *Config) *ListMultipartSessionCache {
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(_ interface{}, val interface{}) {
session, ok := val.(*data.ListMultipartSession)
if !ok {
config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)),
zap.String("expected", fmt.Sprintf("%T", session)))
}
if !session.Acquired.Load() {
session.Cancel()
}
}).Build()
return &ListMultipartSessionCache{cache: gc, logger: config.Logger}
}
// GetListMultipartSession returns a session of ListMultipartUploads request.
func (l *ListMultipartSessionCache) GetListMultipartSession(key ListMultipartSessionKey) *data.ListMultipartSession {
entry, err := l.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*data.ListMultipartSession)
if !ok {
l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
zap.String("expected", fmt.Sprintf("%T", result)))
return nil
}
return result
}
// PutListMultipartSession puts a ListMultipartUploads session info to cache.
func (l *ListMultipartSessionCache) PutListMultipartSession(key ListMultipartSessionKey, session *data.ListMultipartSession) error {
s := l.GetListMultipartSession(key)
if s != nil && s != session {
if !s.Acquired.Load() {
s.Cancel()
}
}
return l.cache.Set(key, session)
}
// DeleteListMultipartSession removes key from cache.
func (l *ListMultipartSessionCache) DeleteListMultipartSession(key ListMultipartSessionKey) {
l.cache.Remove(key)
}
// CreateListMultipartSessionCacheKey returns ListMultipartSessionKey with the given CID, prefix, marker and uploadID.
func CreateListMultipartSessionCacheKey(cnr cid.ID, prefix, marker, uploadID string) ListMultipartSessionKey {
p := ListMultipartSessionKey{
cid: cnr,
prefix: prefix,
marker: marker,
uploadID: uploadID,
}
return p
}

View file

@ -28,7 +28,7 @@ type (
const ( const (
// DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects. // DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects.
DefaultListSessionCacheLifetime = time.Second * 60 DefaultListSessionCacheLifetime = time.Minute
// DefaultListSessionCacheSize is a default size of cache of ListObjects. // DefaultListSessionCacheSize is a default size of cache of ListObjects.
DefaultListSessionCacheSize = 100 DefaultListSessionCacheSize = 100
) )

View file

@ -9,11 +9,25 @@ type VersionsStream interface {
Next(ctx context.Context) (*NodeVersion, error) Next(ctx context.Context) (*NodeVersion, error)
} }
type ListSession struct { type CommonSession struct {
Next []*ExtendedNodeVersion
Stream VersionsStream
NamesMap map[string]struct{}
Context context.Context Context context.Context
Cancel context.CancelFunc Cancel context.CancelFunc
Acquired atomic.Bool Acquired atomic.Bool
} }
type ListSession struct {
CommonSession
Next []*ExtendedNodeVersion
Stream VersionsStream
NamesMap map[string]struct{}
}
type MultipartInfoStream interface {
Next(ctx context.Context) (*MultipartInfo, error)
dkirillov marked this conversation as resolved Outdated

Why do we pass marker and uploadID?

Why do we pass `marker` and `uploadID`?
}
type ListMultipartSession struct {
CommonSession
Next *MultipartInfo
Stream MultipartInfoStream
}

View file

@ -207,3 +207,9 @@ func (l LockInfo) UntilDate() string {
func (l LockInfo) IsCompliance() bool { func (l LockInfo) IsCompliance() bool {
return l.isCompliance return l.isCompliance
} }
type MultipartStreamParams struct {
Prefix string
KeyMarker string
UploadIDMarker string
}

View file

@ -244,6 +244,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
Buckets: minCacheCfg, Buckets: minCacheCfg,
System: minCacheCfg, System: minCacheCfg,
AccessControl: minCacheCfg, AccessControl: minCacheCfg,
MultipartList: minCacheCfg,
NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime}, NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime},
} }
} }

View file

@ -513,7 +513,7 @@ func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Req
if maxUploadsStr != "" { if maxUploadsStr != "" {
val, err := strconv.Atoi(maxUploadsStr) val, err := strconv.Atoi(maxUploadsStr)
if err != nil || val < 1 || val > 1000 { if err != nil || val < 1 || val > maxObjectList {
h.logAndSendError(ctx, w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads)) h.logAndSendError(ctx, w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
return return
} }

View file

@ -266,6 +266,36 @@ func TestListMultipartUploads(t *testing.T) {
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID) require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID)
}) })
t.Run("check delimiter", func(t *testing.T) {
t.Run("not truncated", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", -1)
require.Len(t, listUploads.Uploads, 0)
require.Len(t, listUploads.CommonPrefixes, 2)
require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
require.Equal(t, "/zzz/", listUploads.CommonPrefixes[1].Prefix)
})
t.Run("truncated", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", 1)
require.Len(t, listUploads.Uploads, 0)
require.Len(t, listUploads.CommonPrefixes, 1)
require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
require.True(t, listUploads.IsTruncated)
listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
require.Len(t, listUploads.Uploads, 0)
require.Len(t, listUploads.CommonPrefixes, 1)
require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
require.True(t, listUploads.IsTruncated)
listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
require.Len(t, listUploads.Uploads, 0)
require.Len(t, listUploads.CommonPrefixes, 1)
require.Equal(t, "/zzz/", listUploads.CommonPrefixes[0].Prefix)
require.False(t, listUploads.IsTruncated)
})
})
t.Run("check markers", func(t *testing.T) { t.Run("check markers", func(t *testing.T) {
t.Run("check only key-marker", func(t *testing.T) { t.Run("check only key-marker", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "", "", "", objName2, -1) listUploads := listMultipartUploads(hc, bktName, "", "", "", objName2, -1)
@ -294,6 +324,58 @@ func TestListMultipartUploads(t *testing.T) {
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID) require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
}) })
}) })
t.Run("check next markers", func(t *testing.T) {
t.Run("check both next-key-marker and next-upload-id-marker", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
require.False(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Empty(t, listUploads.NextUploadIDMarker)
require.Empty(t, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
})
t.Run("check only next-key-marker", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1)
require.False(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Empty(t, listUploads.NextUploadIDMarker)
require.Empty(t, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
})
})
} }
func TestMultipartUploadSize(t *testing.T) { func TestMultipartUploadSize(t *testing.T) {

View file

@ -12,15 +12,16 @@ import (
) )
type Cache struct { type Cache struct {
logger *zap.Logger logger *zap.Logger
listsCache *cache.ObjectsListCache listsCache *cache.ObjectsListCache
sessionListCache *cache.ListSessionCache sessionListCache *cache.ListSessionCache
objCache *cache.ObjectsCache objCache *cache.ObjectsCache
namesCache *cache.ObjectsNameCache namesCache *cache.ObjectsNameCache
bucketCache *cache.BucketCache bucketCache *cache.BucketCache
systemCache *cache.SystemCache systemCache *cache.SystemCache
accessCache *cache.AccessControlCache accessCache *cache.AccessControlCache
networkInfoCache *cache.NetworkInfoCache networkInfoCache *cache.NetworkInfoCache
sessionMultipartCache *cache.ListMultipartSessionCache
} }
// CachesConfig contains params for caches. // CachesConfig contains params for caches.
@ -33,6 +34,7 @@ type CachesConfig struct {
Buckets *cache.Config Buckets *cache.Config
System *cache.Config System *cache.Config
AccessControl *cache.Config AccessControl *cache.Config
MultipartList *cache.Config
NetworkInfo *cache.NetworkInfoCacheConfig NetworkInfo *cache.NetworkInfoCacheConfig
} }
@ -48,20 +50,22 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
System: cache.DefaultSystemConfig(logger), System: cache.DefaultSystemConfig(logger),
AccessControl: cache.DefaultAccessControlConfig(logger), AccessControl: cache.DefaultAccessControlConfig(logger),
NetworkInfo: cache.DefaultNetworkInfoConfig(logger), NetworkInfo: cache.DefaultNetworkInfoConfig(logger),
MultipartList: cache.DefaultListMultipartSessionConfig(logger),
} }
} }
func NewCache(cfg *CachesConfig) *Cache { func NewCache(cfg *CachesConfig) *Cache {
return &Cache{ return &Cache{
logger: cfg.Logger, logger: cfg.Logger,
listsCache: cache.NewObjectsListCache(cfg.ObjectsList), listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
sessionListCache: cache.NewListSessionCache(cfg.SessionList), sessionListCache: cache.NewListSessionCache(cfg.SessionList),
objCache: cache.New(cfg.Objects), objCache: cache.New(cfg.Objects),
namesCache: cache.NewObjectsNameCache(cfg.Names), namesCache: cache.NewObjectsNameCache(cfg.Names),
bucketCache: cache.NewBucketCache(cfg.Buckets), bucketCache: cache.NewBucketCache(cfg.Buckets),
systemCache: cache.NewSystemCache(cfg.System), systemCache: cache.NewSystemCache(cfg.System),
accessCache: cache.NewAccessControlCache(cfg.AccessControl), accessCache: cache.NewAccessControlCache(cfg.AccessControl),
networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo), networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo),
sessionMultipartCache: cache.NewListMultipartSessionCache(cfg.MultipartList),
} }
} }
@ -161,6 +165,14 @@ func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.Li
return c.sessionListCache.GetListSession(key) return c.sessionListCache.GetListSession(key)
} }
func (c *Cache) GetListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) *data.ListMultipartSession {
if !c.accessCache.Get(owner, key.String()) {
return nil
}
return c.sessionMultipartCache.GetListMultipartSession(key)
}
func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) { func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) {
if err := c.sessionListCache.PutListSession(key, session); err != nil { if err := c.sessionListCache.PutListSession(key, session); err != nil {
c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err)) c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err))
@ -176,6 +188,21 @@ func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) {
c.accessCache.Delete(owner, key.String()) c.accessCache.Delete(owner, key.String())
} }
func (c *Cache) PutListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey, session *data.ListMultipartSession) {
if err := c.sessionMultipartCache.PutListMultipartSession(key, session); err != nil {
c.logger.Warn(logs.CouldntCacheListMultipartSession, zap.Error(err))
}
if err := c.accessCache.Put(owner, key.String()); err != nil {
c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
}
}
func (c *Cache) DeleteListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) {
c.sessionMultipartCache.DeleteListMultipartSession(key)
c.accessCache.Delete(owner, key.String())
}
func (c *Cache) GetTagging(owner user.ID, key string) map[string]string { func (c *Cache) GetTagging(owner user.ID, key string) map[string]string {
if !c.accessCache.Get(owner, key) { if !c.accessCache.Get(owner, key) {
return nil return nil

View file

@ -79,7 +79,8 @@ type (
Prefix string Prefix string
MaxKeys int MaxKeys int
Marker string Marker string
Bookmark string // Bookmark contains Marker or ContinuationToken and is used for pagination and as part of a cache key for list session.
dkirillov marked this conversation as resolved Outdated

Not only. It also contains Marker for listing v1 and ContinuationToken for listing v2

Not only. It also contains `Marker` for listing v1 and `ContinuationToken` for listing v2
Bookmark string
} }
commonLatestVersionsListingParams struct { commonLatestVersionsListingParams struct {
@ -193,11 +194,10 @@ func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
return nil, nil, nil return nil, nil, nil
} }
session, err := n.getListLatestVersionsSession(ctx, p) session, err := n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session) generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator) objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator)
if err != nil { if err != nil {
@ -230,7 +230,7 @@ func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListi
return nil, false, nil return nil, false, nil
} }
session, err := n.getListAllVersionsSession(ctx, p) session, err := n.getListVersionsSession(ctx, p, false)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -301,48 +301,31 @@ func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int,
} }
} }
func (n *Layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) { func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
}
func (n *Layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
return n.getListVersionsSession(ctx, p, false)
}
func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
owner := n.BearerOwner(ctx) owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
session := n.cache.GetListSession(owner, cacheKey) session = n.cache.GetListSession(owner, cacheKey)
if session == nil { if session == nil || session.Acquired.Swap(true) {
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly) session = n.newSession(ctx)
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
return session, err
} }
if session.Acquired.Swap(true) {
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
}
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey) n.cache.DeleteListSession(owner, cacheKey)
return session, nil return session, nil
} }
func (n *Layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) { func (n *Layer) newSession(ctx context.Context) *data.ListSession {
session = &data.ListSession{NamesMap: make(map[string]struct{})} session := &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background()) session.Context, session.Cancel = context.WithCancel(context.Background())
// save access box data for next requests
if bd, err := middleware.GetBoxData(ctx); err == nil { if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd}) session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
} }
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly) return session
if err != nil {
return nil, err
}
return session, nil
} }
func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) { func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {

View file

@ -2,6 +2,7 @@ package layer
import ( import (
"bytes" "bytes"
"cmp"
"context" "context"
"crypto/md5" "crypto/md5"
"encoding/base64" "encoding/base64"
@ -10,17 +11,20 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"slices"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -499,47 +503,65 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
return &result, nil return &result, nil
} }
multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix) session, err := n.getListMultipartUploadsSession(ctx, p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
uploads := make([]*UploadInfo, 0, len(multipartInfos)) uploads := make([]*UploadInfo, 0, p.MaxUploads)
uniqDirs := make(map[string]struct{}) uniqDirs := make(map[string]struct{})
uploadsCount := 0
if session.Next != nil {
upload := uploadInfoFromMultipartInfo(session.Next, p.Prefix, p.Delimiter)
switch {
case upload.IsDir && isUniqDir(upload.Key, uniqDirs):
uniqDirs[upload.Key] = struct{}{}
fallthrough
dkirillov marked this conversation as resolved Outdated

Why don't just for uploadsCount < p.MaxUploads { ?

Why don't just `for uploadsCount < p.MaxUploads {` ?

I need to check next element to determine Truncated state. Extracted this check outside the loop

I need to check next element to determine Truncated state. Extracted this check outside the loop
case !upload.IsDir:
uploads = append(uploads, upload)
uploadsCount++
}

I would use errors.Is(err, io.EOF). In service/tree also

I would use `errors.Is(err, io.EOF)`. In `service/tree` also
}

Probably we should return error rather than continue. And consider cancel context (also for regular listing) @alexvanin

Probably we should return error rather than continue. And consider cancel context (also for regular listing) @alexvanin
for _, multipartInfo := range multipartInfos { info := session.Next
info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter) for uploadsCount < p.MaxUploads {
if info != nil { info, err = session.Stream.Next(ctx)
if info.IsDir { if err != nil {
if _, ok := uniqDirs[info.Key]; ok { if errors.Is(err, io.EOF) {
continue break
}
uniqDirs[info.Key] = struct{}{}
} }
uploads = append(uploads, info) n.reqLogger(ctx).Warn(logs.CouldNotGetMultipartUploadInfo, zap.Error(err))
continue

It seems now we can skip checking prefix inside uploadInfoFromMultipart and result always be non nil

It seems now we can skip checking prefix inside `uploadInfoFromMultipart` and result always be non nil

This isn't changed

This isn't changed

Without checking there is a fail in a test - a call with prefix "/my" returns also item with "/zzz" prefix. Because GetSubTreeStream trims prefix to "/"

Without checking there is a fail in a [test](https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/api/handler/multipart_upload_test.go#L262-L267) - a call with prefix "/my" returns also item with "/zzz" prefix. Because GetSubTreeStream trims prefix to "/"

Then we should fix streaming in tree, because we it must return only nodes that have provided prefix. (/my in this case, and object /zzz/object/name3 doesn't have such prefix )

Then we should fix streaming in tree, because we it must return only nodes that have provided prefix. (`/my` in this case, and object `/zzz/object/name3` doesn't have such prefix )
} }
upload := uploadInfoFromMultipartInfo(info, p.Prefix, p.Delimiter)
if upload.IsDir {
if !isUniqDir(upload.Key, uniqDirs) {
continue
}
uniqDirs[upload.Key] = struct{}{}
}
uploads = append(uploads, upload)

I'm not sure if it's ok to separate this invocation. At least as it's done now.
See test:

diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go
index 6836fc5d..dcff5e5b 100644
--- a/api/handler/multipart_upload_test.go
+++ b/api/handler/multipart_upload_test.go
@@ -266,6 +266,30 @@ func TestListMultipartUploads(t *testing.T) {
                require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID)
        })
 
+       t.Run("check delimiter", func(t *testing.T) {
+               t.Run("not truncated", func(t *testing.T) {
+                       listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", -1)
+                       require.Len(t, listUploads.Uploads, 0)
+                       require.Len(t, listUploads.CommonPrefixes, 2)
+                       require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
+                       require.Equal(t, "/zzz/", listUploads.CommonPrefixes[1].Prefix)
+               })
+
+               t.Run("truncated", func(t *testing.T) {
+                       listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", 1)
+                       require.Len(t, listUploads.Uploads, 0)
+                       require.Len(t, listUploads.CommonPrefixes, 1)
+                       require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
+                       require.True(t, listUploads.IsTruncated)
+
+                       listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
+                       require.Len(t, listUploads.Uploads, 0)
+                       require.Len(t, listUploads.CommonPrefixes, 1)
+                       require.Equal(t, "/zzz/", listUploads.CommonPrefixes[0].Prefix)
+                       require.False(t, listUploads.IsTruncated)
+               })
+       })
+

I'm not sure if it's ok to separate this invocation. At least as it's done now. See test: ```diff diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go index 6836fc5d..dcff5e5b 100644 --- a/api/handler/multipart_upload_test.go +++ b/api/handler/multipart_upload_test.go @@ -266,6 +266,30 @@ func TestListMultipartUploads(t *testing.T) { require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID) }) + t.Run("check delimiter", func(t *testing.T) { + t.Run("not truncated", func(t *testing.T) { + listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", -1) + require.Len(t, listUploads.Uploads, 0) + require.Len(t, listUploads.CommonPrefixes, 2) + require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix) + require.Equal(t, "/zzz/", listUploads.CommonPrefixes[1].Prefix) + }) + + t.Run("truncated", func(t *testing.T) { + listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", 1) + require.Len(t, listUploads.Uploads, 0) + require.Len(t, listUploads.CommonPrefixes, 1) + require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix) + require.True(t, listUploads.IsTruncated) + + listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1) + require.Len(t, listUploads.Uploads, 0) + require.Len(t, listUploads.CommonPrefixes, 1) + require.Equal(t, "/zzz/", listUploads.CommonPrefixes[0].Prefix) + require.False(t, listUploads.IsTruncated) + }) + }) + ```

Modified a bit this test (because there are two objects in /my/ folder so this common prefix will appear twice and only on the third call it will be /zzz/. And fixed this scenario. But I'm not quite sure whether I understand the problem with separate invocations you proposed, could you clarify a bit?

Modified a bit this test (because there are two objects in `/my/` folder so this common prefix will appear twice and only on the third call it will be `/zzz/`. And fixed this scenario. But I'm not quite sure whether I understand the problem with `separate invocations` you proposed, could you clarify a bit?
uploadsCount++
} }
sort.Slice(uploads, func(i, j int) bool { isTruncated := true
if uploads[i].Key == uploads[j].Key { next, err := session.Stream.Next(ctx)
return uploads[i].UploadID < uploads[j].UploadID if err != nil {
} if errors.Is(err, io.EOF) {
return uploads[i].Key < uploads[j].Key isTruncated = false
})
if p.KeyMarker != "" {
if p.UploadIDMarker != "" {
uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads)
} else { } else {
uploads = trimAfterUploadKey(p.KeyMarker, uploads) return nil, err
dkirillov marked this conversation as resolved Outdated

It seems here we must use next rather than info.
Please add tests for that case and others

It seems here we must use `next` rather than `info`. Please add tests for that case and others

In previous implementation NextKeyMarker and NextUploadIDMarker was brought from the last element of current list. And the new one acts the same. So using info in this case is correct. Anyway, I'm going to write tests for this

In previous implementation `NextKeyMarker` and `NextUploadIDMarker` was brought from the last element of current list. And the new one acts the same. So using `info` in this case is correct. Anyway, I'm going to write tests for this

Well, maybe I point to different error. Output sometime doesn't contain NextUploadIDMarker

$ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084   list-multipart-uploads --bucket test   | grep UploadId
            "UploadId": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
            "UploadId": "8fd7e720-a5c4-4e56-86c6-dc99146199c7",
            "UploadId": "8f60cdb1-7e62-41bb-98c1-567498be6dc2",
            "UploadId": "75988c0a-d94f-4667-9087-056af63acefc",
            "UploadId": "dda0fa13-cc27-4c52-aba6-2aa561d39b19",


$ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084   list-multipart-uploads --bucket test   --max-uploads 1
{
    "Bucket": "test",
    "KeyMarker": "",
    "NextKeyMarker": "dir/dir/obj",
    "Prefix": "",
    "NextUploadIdMarker": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
    "MaxUploads": 1,
    "IsTruncated": true,
    "Uploads": [
        {
            "UploadId": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
            "Key": "dir/dir/obj",
            "Initiated": "2024-11-12T09:34:43+00:00",
            "StorageClass": "STANDARD",
            "Owner": {
                "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
                "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
            },
            "Initiator": {
                "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
                "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
            }
        }
    ]
}

$ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084   list-multipart-uploads --bucket test   --max-uploads 1 --key-marker dir/dir/obj --upload-id-marker bd50e12d-fc4b-450b-afb1-ed082f5e2ef9
{
    "Bucket": "test",
    "KeyMarker": "dir/dir/obj",
    "UploadIdMarker": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
    "Prefix": "",
    "MaxUploads": 1,
    "IsTruncated": false,
    "Uploads": [
        {
            "UploadId": "8fd7e720-a5c4-4e56-86c6-dc99146199c7",
            "Key": "dir/dir/obj",
            "Initiated": "2024-11-08T14:21:51+00:00",
            "StorageClass": "STANDARD",
            "Owner": {
                "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
                "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
            },
            "Initiator": {
                "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
                "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
            }
        }
    ]
}

Well, maybe I point to different error. Output sometime doesn't contain NextUploadIDMarker ``` $ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084 list-multipart-uploads --bucket test | grep UploadId "UploadId": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9", "UploadId": "8fd7e720-a5c4-4e56-86c6-dc99146199c7", "UploadId": "8f60cdb1-7e62-41bb-98c1-567498be6dc2", "UploadId": "75988c0a-d94f-4667-9087-056af63acefc", "UploadId": "dda0fa13-cc27-4c52-aba6-2aa561d39b19", $ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084 list-multipart-uploads --bucket test --max-uploads 1 { "Bucket": "test", "KeyMarker": "", "NextKeyMarker": "dir/dir/obj", "Prefix": "", "NextUploadIdMarker": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9", "MaxUploads": 1, "IsTruncated": true, "Uploads": [ { "UploadId": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9", "Key": "dir/dir/obj", "Initiated": "2024-11-12T09:34:43+00:00", "StorageClass": "STANDARD", "Owner": { "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt", "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt" }, "Initiator": { "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt", "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt" } } ] } $ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084 list-multipart-uploads --bucket test --max-uploads 1 --key-marker dir/dir/obj --upload-id-marker bd50e12d-fc4b-450b-afb1-ed082f5e2ef9 { "Bucket": "test", "KeyMarker": "dir/dir/obj", "UploadIdMarker": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9", "Prefix": "", "MaxUploads": 1, "IsTruncated": false, "Uploads": [ { "UploadId": "8fd7e720-a5c4-4e56-86c6-dc99146199c7", "Key": "dir/dir/obj", "Initiated": "2024-11-08T14:21:51+00:00", "StorageClass": "STANDARD", "Owner": { "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt", "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt" }, "Initiator": { "ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt", "DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt" } } ] } ```

Fixed and added test for this scenario

Fixed and added test for this scenario
} }
} }
if len(uploads) > p.MaxUploads { if isTruncated && info != nil {
// put to session redundant multipart upload which we read to check for EOF
session.Next = next
result.IsTruncated = true result.IsTruncated = true
uploads = uploads[:p.MaxUploads] result.NextUploadIDMarker = info.UploadID
result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID result.NextKeyMarker = info.Key
result.NextKeyMarker = uploads[len(uploads)-1].Key cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, info.Key, info.UploadID)
n.putListMultipartUploadsSession(ctx, session, cacheKey)
} }
for _, ov := range uploads { for _, ov := range uploads {
dkirillov marked this conversation as resolved
Review

Result must be sorted by upload-id if some uploads have the same key

Result must be sorted by upload-id if some uploads have the same key
@ -550,9 +572,62 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
} }
} }
slices.SortFunc(result.Uploads, func(a, b *UploadInfo) int {
keyCmp := cmp.Compare(a.Key, b.Key)
if keyCmp == 0 {
dkirillov marked this conversation as resolved Outdated

Why do this if differ from the similar from object listing?

if session == nil || session.Acquired.Swap(true) {

Why do this if differ from the similar from object listing? https://git.frostfs.info/nzinkevich/frostfs-s3-gw/src/commit/e673d727e9c9486e0b70127fb224ed58b6842fb3/api/layer/listing.go#L309

Changed method to be more similar

Changed method to be more similar
return cmp.Compare(a.UploadID, b.UploadID)
dkirillov marked this conversation as resolved Outdated

We cannot use ctx from current request. It will be canceled after first request be finished

We cannot use `ctx` from current request. It will be canceled after first request be finished
}
dkirillov marked this conversation as resolved Outdated

We also have to add AccessBox to this context to be able to get access to tree service. Otherwise currently we get

2024-11-12T11:52:19.895+0300    error   request failed  {"request_id": "ef743d2a-0f81-4f02-ae61-82afac3db746", "method": "ListMultipartUploads", "bucket": "test", "object": "", "description": "could not list multipart uploads", "user": "NbUgTSFvPmsRxmGeWpuuGeJUoRoi6PErcM", "error": "access denied: address s01.frostfs.devenv:8080: access denied: rpc error: code = Unknown desc = status: code = 2048 message = access to object operation denied", "status": 403}

See how this done for regular listing

if bd, err := middleware.GetBoxData(ctx); err == nil {

We also have to add AccessBox to this context to be able to get access to tree service. Otherwise currently we get ``` 2024-11-12T11:52:19.895+0300 error request failed {"request_id": "ef743d2a-0f81-4f02-ae61-82afac3db746", "method": "ListMultipartUploads", "bucket": "test", "object": "", "description": "could not list multipart uploads", "user": "NbUgTSFvPmsRxmGeWpuuGeJUoRoi6PErcM", "error": "access denied: address s01.frostfs.devenv:8080: access denied: rpc error: code = Unknown desc = status: code = 2048 message = access to object operation denied", "status": 403} ``` See how this done for regular listing https://git.frostfs.info/nzinkevich/frostfs-s3-gw/src/commit/aaca4f84b8ba29031481c54f6bdcae90909287a0/api/layer/listing.go#L324

Fixed

Fixed
return keyCmp
})
return &result, nil return &result, nil
} }
func (n *Layer) putListMultipartUploadsSession(ctx context.Context, session *data.ListMultipartSession, cacheKey cache.ListMultipartSessionKey) {
session.Acquired.Store(false)
n.cache.PutListMultipartSession(n.BearerOwner(ctx), cacheKey, session)
}
func (n *Layer) getListMultipartUploadsSession(ctx context.Context, p *ListMultipartUploadsParams) (session *data.ListMultipartSession, err error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, p.KeyMarker, p.UploadIDMarker)
session = n.cache.GetListMultipartSession(owner, cacheKey)
if session == nil || session.Acquired.Swap(true) {
session = newListMultipartSession(ctx)
params := data.MultipartStreamParams{
Prefix: p.Prefix,
KeyMarker: p.KeyMarker,
UploadIDMarker: p.UploadIDMarker,
}
session.Stream, err = n.treeService.GetMultipartUploadsByPrefix(session.Context, p.Bkt, params)
if err != nil {
return nil, err
}
}
// if after reading next object from stream in session the current cache value already
// doesn't match with next token in cache key
n.cache.DeleteListMultipartSession(owner, cacheKey)
return session, nil
}
func newListMultipartSession(ctx context.Context) *data.ListMultipartSession {
reqCtx, cancel := context.WithCancel(context.Background())
session := &data.ListMultipartSession{
CommonSession: data.CommonSession{
Context: reqCtx,
Cancel: cancel,
},
}
// save access box data for next requests
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
}
return session
}
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error { func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
multipartInfo, parts, err := n.getUploadParts(ctx, p) multipartInfo, parts, err := n.getUploadParts(ctx, p)
if err != nil { if err != nil {
@ -677,44 +752,10 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
return multipartInfo, res, nil return multipartInfo, res, nil
} }
func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo {
var res []*UploadInfo
if len(uploads) != 0 && uploads[len(uploads)-1].Key < key {
return res
}
for _, obj := range uploads {
if obj.Key >= key && obj.UploadID > id {
res = append(res, obj)
}
}
return res
}
func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
var result []*UploadInfo
if len(objects) != 0 && objects[len(objects)-1].Key <= key {
return result
}
for i, obj := range objects {
if obj.Key > key {
result = objects[i:]
break
}
}
return result
}
func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo { func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo {
var isDir bool var isDir bool
key := uploadInfo.Key key := uploadInfo.Key
if !strings.HasPrefix(key, prefix) {
return nil
}
if len(delimiter) > 0 { if len(delimiter) > 0 {
tail := strings.TrimPrefix(key, prefix) tail := strings.TrimPrefix(key, prefix)
index := strings.Index(tail, delimiter) index := strings.Index(tail, delimiter)
@ -732,3 +773,10 @@ func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimit
Created: uploadInfo.Created, Created: uploadInfo.Created,
} }
} }
func isUniqDir(key string, uniqDirs map[string]struct{}) bool {
if _, ok := uniqDirs[key]; ok {
return false
}
return true
}

View file

@ -1,108 +0,0 @@
package layer
import (
"sort"
"testing"
"github.com/stretchr/testify/require"
)
func TestTrimAfterUploadIDAndKey(t *testing.T) {
uploads := []*UploadInfo{
{Key: "j", UploadID: "k"}, // key < id <
{Key: "l", UploadID: "p"}, // key < id >
{Key: "n", UploadID: "m"}, // key = id <
{Key: "n", UploadID: "o"}, // pivot
{Key: "n", UploadID: "q"}, // key = id >
{Key: "p", UploadID: "h"}, // key > id <
{Key: "q", UploadID: "r"}, // key > id >
}
expectedUploadsListsIndexes := [][]int{
{1, 2, 3, 4, 6},
{4, 6},
{3, 4, 6},
{4, 6},
{6},
{6},
{},
}
sort.Slice(uploads, func(i, j int) bool {
if uploads[i].Key == uploads[j].Key {
return uploads[i].UploadID < uploads[j].UploadID
}
return uploads[i].Key < uploads[j].Key
})
length := len(uploads)
t.Run("the last element's key is less, upload id is less", func(t *testing.T) {
keys := trimAfterUploadIDAndKey("z", "a", uploads)
require.Empty(t, keys)
require.Len(t, uploads, length)
})
t.Run("the last element's key is less, upload id is greater", func(t *testing.T) {
keys := trimAfterUploadIDAndKey("z", "a", uploads)
require.Empty(t, keys)
require.Len(t, uploads, length)
})
t.Run("check for uploads", func(t *testing.T) {
for i, u := range uploads {
list := trimAfterUploadIDAndKey(u.Key, u.UploadID, uploads)
require.Equal(t, len(list), len(expectedUploadsListsIndexes[i]))
for j, idx := range expectedUploadsListsIndexes[i] {
require.Equal(t, list[j], uploads[idx])
}
}
})
}
func TestTrimAfterUploadKey(t *testing.T) {
var (
uploadKeys = []string{"e", "f", "f", "g", "h", "i"}
theSameKeyIdx = []int{1, 2}
diffKeyIdx = []int{0, 3}
lastIdx = len(uploadKeys) - 1
)
uploadsInfos := make([]*UploadInfo, 0, len(uploadKeys))
for _, k := range uploadKeys {
uploadsInfos = append(uploadsInfos, &UploadInfo{Key: k})
}
t.Run("empty list", func(t *testing.T) {
keys := trimAfterUploadKey("f", []*UploadInfo{})
require.Len(t, keys, 0)
})
t.Run("the last element is less than a key", func(t *testing.T) {
keys := trimAfterUploadKey("j", uploadsInfos)
require.Empty(t, keys)
require.Len(t, uploadsInfos, len(uploadKeys))
})
t.Run("different keys in sequence", func(t *testing.T) {
for _, i := range diffKeyIdx {
keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos)
require.Len(t, keys, len(uploadKeys)-i-1)
require.Equal(t, keys, uploadsInfos[i+1:])
require.Len(t, uploadsInfos, len(uploadKeys))
}
})
t.Run("the same keys in the sequence first element", func(t *testing.T) {
for _, i := range theSameKeyIdx {
keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos)
require.Len(t, keys, 3)
require.Equal(t, keys, uploadsInfos[3:])
require.Len(t, uploadsInfos, len(uploadKeys))
}
})
t.Run("last element", func(t *testing.T) {
keys := trimAfterUploadKey(uploadKeys[lastIdx], uploadsInfos)
require.Empty(t, keys)
})
}

View file

@ -53,7 +53,7 @@ type Service interface {
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error)
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
// AddPart puts a node to a system tree as a child of appropriate multipart upload // AddPart puts a node to a system tree as a child of appropriate multipart upload

View file

@ -328,7 +328,7 @@ func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data
return nil return nil
} }
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, string) ([]*data.MultipartInfo, error) { func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, data.MultipartStreamParams) (data.MultipartInfoStream, error) {
panic("implement me") panic("implement me")
} }

View file

@ -1007,6 +1007,8 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size) cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime) cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime)
cacheCfg.MultipartList.Lifetime = fetchCacheLifetime(v, l, cfgMultipartListCacheLifetime, cacheCfg.MultipartList.Lifetime)
cacheCfg.MultipartList.Size = fetchCacheSize(v, l, cfgMultipartListCacheSize, cacheCfg.MultipartList.Size)
return cacheCfg return cacheCfg
} }

View file

@ -141,6 +141,8 @@ const ( // Settings.
cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime" cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime"
cfgFrostfsIDCacheSize = "cache.frostfsid.size" cfgFrostfsIDCacheSize = "cache.frostfsid.size"
cfgNetworkInfoCacheLifetime = "cache.network_info.lifetime" cfgNetworkInfoCacheLifetime = "cache.network_info.lifetime"
cfgMultipartListCacheLifetime = "cache.multipart_list_session.lifetime"
cfgMultipartListCacheSize = "cache.multipart_list_session.size"
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval" cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"

View file

@ -101,9 +101,12 @@ S3_GW_CACHE_OBJECTS_SIZE=1000000
# Cache which keeps lists of objects in buckets # Cache which keeps lists of objects in buckets
S3_GW_CACHE_LIST_LIFETIME=1m S3_GW_CACHE_LIST_LIFETIME=1m
S3_GW_CACHE_LIST_SIZE=100000 S3_GW_CACHE_LIST_SIZE=100000
# Cache which keeps listing session # Cache which keeps listing objects session
S3_GW_CACHE_LIST_SESSION_LIFETIME=1m S3_GW_CACHE_LIST_SESSION_LIFETIME=1m
S3_GW_CACHE_LIST_SESSION_SIZE=100 S3_GW_CACHE_LIST_SESSION_SIZE=100
# Cache which keeps listing multipart uploads session
S3_GW_CACHE_MULTIPART_LIST_SESSION_LIFETIME=1m
S3_GW_CACHE_MULTIPART_LIST_SESSION_SIZE=100
# Cache which contains mapping of bucket name to bucket info # Cache which contains mapping of bucket name to bucket info
S3_GW_CACHE_BUCKETS_LIFETIME=1m S3_GW_CACHE_BUCKETS_LIFETIME=1m
S3_GW_CACHE_BUCKETS_SIZE=1000 S3_GW_CACHE_BUCKETS_SIZE=1000

View file

@ -129,6 +129,9 @@ cache:
list_session: list_session:
lifetime: 1m lifetime: 1m
size: 100 size: 100
multipart_list_session:
lifetime: 1m
size: 100
# Cache which contains mapping of nice name to object addresses # Cache which contains mapping of nice name to object addresses
names: names:
lifetime: 1m lifetime: 1m

View file

@ -426,6 +426,9 @@ cache:
list_session: list_session:
lifetime: 1m lifetime: 1m
size: 100 size: 100
multipart_list_session:
lifetime: 1m
size: 10000
names: names:
lifetime: 1m lifetime: 1m
size: 1000 size: 1000
@ -452,19 +455,20 @@ cache:
lifetime: 1m lifetime: 1m
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------| |--------------------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). | | `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. | | `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. | | `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. | | `multipart_list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing of multipart uploads. |
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. | | `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings etc. | | `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`<br>`size: 100` | Cache which stores access box with tokens by its address. | | `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings etc. |
| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. | | `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`<br>`size: 100` | Cache which stores access box with tokens by its address. |
| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. | | `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. |
| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. | | `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. |
| `network_info` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores network info. | | `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. |
| `network_info` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores network info. |
#### `cache` subsection #### `cache` subsection

View file

@ -8,6 +8,8 @@ const (
UpdateAccessCredObjectIntoFrostFS = "update access cred object into FrostFS" // Info in ../../authmate/authmate.go UpdateAccessCredObjectIntoFrostFS = "update access cred object into FrostFS" // Info in ../../authmate/authmate.go
MetricsAreDisabled = "metrics are disabled" // Warn in ../../metrics/app.go MetricsAreDisabled = "metrics are disabled" // Warn in ../../metrics/app.go
FoundMoreThanOneUnversionedNode = "found more than one unversioned node" // Debug in ../../pkg/service/tree/tree.go FoundMoreThanOneUnversionedNode = "found more than one unversioned node" // Debug in ../../pkg/service/tree/tree.go
CouldNotParseTreeNode = "could not parse tree node" // Error in ../../pkg/service/tree/tree.go
CouldNotFormFilePath = "could not form file path" // Error in ../../pkg/service/tree/tree.go
ServiceIsRunning = "service is running" // Info in ../../cmd/s3-gw/service.go ServiceIsRunning = "service is running" // Info in ../../cmd/s3-gw/service.go
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../cmd/s3-gw/service.go ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../cmd/s3-gw/service.go
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../cmd/s3-gw/service.go ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../cmd/s3-gw/service.go
@ -67,6 +69,7 @@ const (
CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go
CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go
MismatchedObjEncryptionInfo = "mismatched obj encryptionInfo" // Warn in ../../api/layer/multipart_upload.go MismatchedObjEncryptionInfo = "mismatched obj encryptionInfo" // Warn in ../../api/layer/multipart_upload.go
CouldNotGetMultipartUploadInfo = "could not get multipart upload info" // Warn in ../../api/layer/multipart_upload.go
UploadPart = "upload part" // Debug in ../../api/layer/multipart_upload.go UploadPart = "upload part" // Debug in ../../api/layer/multipart_upload.go
CouldntDeleteOldPartObject = "couldn't delete old part object" // Error in ../../api/layer/multipart_upload.go CouldntDeleteOldPartObject = "couldn't delete old part object" // Error in ../../api/layer/multipart_upload.go
CouldNotPutCompletedObject = "could not put a completed object (multipart upload)" // Error in ../../api/layer/multipart_upload.go CouldNotPutCompletedObject = "could not put a completed object (multipart upload)" // Error in ../../api/layer/multipart_upload.go
@ -89,6 +92,7 @@ const (
CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go
CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go
CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go
CouldntCacheListMultipartSession = "couldn't cache list multipart session" // Warn in ../../api/layer/cache.go
CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go

View file

@ -84,7 +84,9 @@ var (
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error. // ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
ErrGatewayTimeout = frostfs.ErrGatewayTimeout ErrGatewayTimeout = frostfs.ErrGatewayTimeout
errNodeDoesntContainFileName = fmt.Errorf("node doesn't contain FileName") errNodeDoesntContainFileName = errors.New("node doesn't contain FileName")
errParentPathNotFound = errors.New("couldn't get parent path")
) )
const ( const (
@ -1061,7 +1063,7 @@ func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *
var filepath string var filepath string
if !s.intermediateRootID.Equal(trNode.ID) { if !s.intermediateRootID.Equal(trNode.ID) {
if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil { if filepath, err = formFilePath(trNode, fileName, s.namesMap); err != nil {
return nil, false, fmt.Errorf("invalid node order: %w", err) return nil, false, fmt.Errorf("invalid node order: %w", err)
} }
} else { } else {
@ -1165,58 +1167,18 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr
return intermediateNodes, nil return intermediateNodes, nil
} }
func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) { func (c *Tree) getSubTreeByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, []uint64, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix) rootID, _, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil { if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) { if errors.Is(err, tree.ErrNodeNotFound) {
return nil, "", nil return nil, nil, nil
} }
return nil, "", err return nil, nil, err
} }
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2, false) stream, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
nodesMap := make(map[string][]NodeResponse, len(subTree)) return stream, rootID, err
for _, node := range subTree {
if MultiID(rootID).Equal(node.GetNodeID()) {
continue
}
fileName := getFilename(node)
if !strings.HasPrefix(fileName, tailPrefix) {
continue
}
nodes := nodesMap[fileName]
// Add all nodes if flag latestOnly is false.
// Add all intermediate nodes
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
if len(nodes) == 0 {
nodes = []NodeResponse{node}
} else if !latestOnly || isIntermediate(node) {
nodes = append(nodes, node)
} else if isIntermediate(nodes[0]) {
nodes = append([]NodeResponse{node}, nodes...)
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
nodes[0] = node
}
nodesMap[fileName] = nodes
}
result := make([]NodeResponse, 0, len(subTree))
for _, nodes := range nodesMap {
result = append(result, nodes...)
}
return result, strings.TrimSuffix(prefix, tailPrefix), nil
} }
func getFilename(node NodeResponse) string { func getFilename(node NodeResponse) string {
@ -1237,20 +1199,19 @@ func isIntermediate(node NodeResponse) bool {
return node.GetMeta()[0].GetKey() == FileNameKey return node.GetMeta()[0].GetKey() == FileNameKey
} }
func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) { func formFilePath(node *treeNode, fileName string, namesMap map[uint64]string) (string, error) {
var filepath string var filePath string
for i, id := range node.GetParentID() { for i, id := range node.ParentID {
parentPath, ok := namesMap[id] parentPath, ok := namesMap[id]
if !ok { if !ok {
return "", fmt.Errorf("couldn't get parent path") return "", errParentPathNotFound
} }
filePath = parentPath + separator + fileName
filepath = parentPath + separator + fileName namesMap[node.ID[i]] = filePath
namesMap[node.GetNodeID()[i]] = filepath
} }
return filepath, nil return filePath, nil
dkirillov marked this conversation as resolved Outdated

It's better to use more meaningful name.
By the way why do we need separate function? And why do we use only 0th parentID below (namesMap[curNode.ParentID[0]])?

It's better to use more meaningful name. By the way why do we need separate function? And why do we use only 0th parentID below (`namesMap[curNode.ParentID[0]]`)?

It seems to me that multipart info can't be split so I use only first element
https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/pkg/service/tree/tree.go#L332-L334

It seems to me that multipart info can't be split so I use only first element https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/pkg/service/tree/tree.go#L332-L334
} }
func parseTreeNode(node NodeResponse) (*treeNode, string, error) { func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
@ -1267,10 +1228,6 @@ func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
return tNode, fileName, nil return tNode, fileName, nil
} }
func formLatestNodeKey(parentID uint64, fileName string) string {
return strconv.FormatUint(parentID, 10) + "." + fileName
}
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) { func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
return c.getUnversioned(ctx, bktInfo, versionTree, filepath) return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
} }
@ -1313,84 +1270,137 @@ func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketIn
return err return err
} }
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) { type multipartInfoStream struct {
subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false) log *zap.Logger
if err != nil { nodePaths map[uint64]string
return nil, err rootID MultiID
} // childStream stream of children nodes of prefix node.
childStream SubTreeStream
var result []*data.MultipartInfo // currentStream stream of children's nodes with max depth.
for _, node := range subTreeNodes { currentStream SubTreeStream
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix) treeService ServiceClient
if err != nil { bktInfo *data.BucketInfo
return nil, err uploadID string
} keyMarker string
result = append(result, multipartUploads...) headPrefix string
} prefix string
return result, nil
} }
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) { func (m *multipartInfoStream) Next(ctx context.Context) (*data.MultipartInfo, error) {
// sorting in getSubTree leads to skipping nodes that doesn't have FileName attribute var tNode *treeNode
// so when we are only interested in multipart nodes, we can set this flag var filePath string
// (despite we sort multiparts in above layer anyway)
// to skip its children (parts) that don't have FileName if m.currentStream == nil {
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth, true) var err error
if m.currentStream, err = m.openNewStream(ctx); err != nil {
return nil, err
}
}
for {
var err error
tNode, err = getTreeNodeFromStream(m.currentStream)
if err != nil {
if errors.Is(err, io.EOF) {

Do we really need new variable err2?
It seems we can write

if m.currentStream, err = m.openNewStream(ctx); err != nil {
	return nil, err
}

because of continue below

Do we really need new variable `err2`? It seems we can write ```golang if m.currentStream, err = m.openNewStream(ctx); err != nil { return nil, err } ``` because of `continue` below
if m.currentStream, err = m.openNewStream(ctx); err != nil {
return nil, err
}
continue
}
return nil, err
}
var ok bool
if filePath, ok = m.checkTreeNode(tNode); ok {

Why not just fileName, ok := tNode.FileName() ?

Why not just `fileName, ok := tNode.FileName()` ?
break
}
}
dkirillov marked this conversation as resolved Outdated

Why do we use only tNode.ID[0]?

Why do we use only `tNode.ID[0]`?
return newMultipartInfoFromTreeNode(m.log, filePath, tNode)

Why do we treat this error special? It seems normally we traverse node in right order and if we don't see parent it's a bug in storage node. Root node filepath we can set initially to this map

Why do we treat this error special? It seems normally we traverse node in right order and if we don't see parent it's a bug in storage node. Root node filepath we can set initially to this map
}

I suppose we should check if tNode.IsSplit() and skip if necessary. And only after that use tNode.ID[0]

I suppose we should check if `tNode.IsSplit()` and skip if necessary. And only after that use `tNode.ID[0]`
// openNewStream creates subtree stream from childStream`s node.
func (m *multipartInfoStream) openNewStream(ctx context.Context) (SubTreeStream, error) {
node, err := getTreeNodeFromStream(m.childStream)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if m.rootID.Equal(node.ID) {
var parentPrefix string // skip root node
if parentFilePath != "" { // The root of subTree can also have a parent return m.openNewStream(ctx)
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
} }

It seems we can simplify this to

if m.keyMarker < filePath || (m.keyMarker == filePath && m.uploadID < id) {
It seems we can simplify this to ```golang if m.keyMarker < filePath || (m.keyMarker == filePath && m.uploadID < id) { ```
stream, err := m.treeService.GetSubTreeStream(ctx, m.bktInfo, systemTree, node.ID, maxGetSubTreeDepth)
if err != nil {
return nil, err
}
return stream, nil
}
var filepath string func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error) {
namesMap := make(map[uint64]string, len(subTree)) node, err := stream.Next()
multiparts := make(map[string][]*data.MultipartInfo, len(subTree)) if err != nil {
return nil, err
}
tNode, err := newTreeNode(node)
if err != nil {
return nil, err

Please, use the similar check

if !s.rootID.Equal(node.GetNodeID()) && strings.HasPrefix(getFilename(node), s.tailPrefix) {

Please, use the similar check https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/commit/a12fea8a5b1e72cdd8cc10eb7278033305a24a3b/pkg/service/tree/tree.go#L987
}
return tNode, nil
}
dkirillov marked this conversation as resolved Outdated

Consider using approach like here

subTree, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)


Otherwise if we will have objects:

  • dir/objdir1/obj
  • dir/objdir1/obj2
  • dir/obj1/obj1
  • ...
  • dir/obj1/obj1000000

and request will contain prefix: dir/objdir we will list that greater than 1000000 objects and just filter them (but we will get them anyway from storage) despite we are interested in only 2 of them

Consider using approach like here https://git.frostfs.info/nzinkevich/frostfs-s3-gw/src/commit/aaca4f84b8ba29031481c54f6bdcae90909287a0/pkg/service/tree/tree.go#L1114 Otherwise if we will have objects: * `dir/objdir1/obj` * `dir/objdir1/obj2` * `dir/obj1/obj1` * ... * `dir/obj1/obj1000000` and request will contain `prefix: dir/objdir` we will list that greater than 1000000 objects and just filter them (but we will get them anyway from storage) despite we are interested in only 2 of them

Changed the implementation. Now, at first, c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2) is called, and then the GetSubTreeStream with max depth is applied to the children as needed. Which should prevents from getting redundant nodes

Changed the implementation. Now, at first, `c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)` is called, and then the `GetSubTreeStream` with max depth is applied to the children as needed. Which should prevents from getting redundant nodes
for i, node := range subTree { func (m *multipartInfoStream) checkTreeNode(tNode *treeNode) (string, bool) {
tNode, fileName, err := parseTreeNode(node) var ok bool
if err != nil { var err error
continue
}
if i != 0 { if tNode.IsSplit() {
if filepath, err = formFilePath(node, fileName, namesMap); err != nil { return "", false
return nil, fmt.Errorf("invalid node order: %w", err) }

This function should be
func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error)
or
func (m *multipartInfoStream) getTreeNodeFromStream() (*treeNode, error)

This function should be `func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error)` or `func (m *multipartInfoStream) getTreeNodeFromStream() (*treeNode, error)`
} fileName, ok := tNode.FileName()
} else { if !ok {
filepath = parentPrefix + fileName return "", false
for _, id := range tNode.ID { }
namesMap[id] = filepath filePath, err := formFilePath(tNode, fileName, m.nodePaths)
} if err != nil {
} filePath = fileName
m.nodePaths[tNode.ID[0]] = filePath
multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode) }
if err != nil || multipartInfo.Finished { filePath = m.headPrefix + filePath
continue if !strings.HasPrefix(filePath, m.prefix) {
} return "", false
}
for _, id := range node.GetParentID() { if _, ok = tNode.Meta[finishedKV]; ok {
key := formLatestNodeKey(id, fileName) return "", false
multipartInfos, ok := multiparts[key] }
if !ok { if id, ok := tNode.Meta[uploadIDKV]; ok {
multipartInfos = []*data.MultipartInfo{multipartInfo} if filePath > m.keyMarker || (filePath == m.keyMarker && m.uploadID != "" && id > m.uploadID) {
} else { return filePath, true
multipartInfos = append(multipartInfos, multipartInfo)
}
multiparts[key] = multipartInfos
} }
} }
result := make([]*data.MultipartInfo, 0, len(multiparts)) return "", false
for _, multipartInfo := range multiparts { }
result = append(result, multipartInfo...)
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error) {
stream, rootID, err := c.getSubTreeByPrefixStream(ctx, bktInfo, systemTree, params.Prefix)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, nil
}
return nil, err
} }
return result, nil return &multipartInfoStream{
log: c.reqLogger(ctx),
rootID: rootID,
childStream: stream,
nodePaths: make(map[uint64]string),
treeService: c.service,
bktInfo: bktInfo,
uploadID: params.UploadIDMarker,
keyMarker: params.KeyMarker,
prefix: params.Prefix,
headPrefix: strings.TrimRightFunc(params.Prefix, func(r rune) bool {
return r != '/'
}),
}, nil
} }
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) { func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {

View file

@ -12,6 +12,7 @@ import (
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -359,3 +360,127 @@ func TestSplitTreeMultiparts(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Len(t, parts, 1) require.Len(t, parts, 1)
} }
func TestCheckTreeNode(t *testing.T) {
treeNodes := []*treeNode{
// foo/
{
ID: []uint64{1},
ParentID: []uint64{0},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "foo",
},
},
// foo/ant
{
ID: []uint64{2},
ParentID: []uint64{1},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "ant",
"UploadId": "d",
},
},
// foo/bar
{
ID: []uint64{3},
ParentID: []uint64{1},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "bar",
"UploadId": "c",
},
},
// foo/finished
{
ID: []uint64{4},
ParentID: []uint64{1},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "finished",
"UploadId": "e",
"Finished": "True",
},
},
// hello/
{
ID: []uint64{5},
ParentID: []uint64{0},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "hello",
},
},
// hello/world
{
ID: []uint64{6},
ParentID: []uint64{5},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "world",
"UploadId": "a",
},
},
// hello/world
{
ID: []uint64{7},
ParentID: []uint64{5},
TimeStamp: []uint64{1},
Meta: map[string]string{
"FileName": "world",
"UploadId": "b",
},
},
}
info := multipartInfoStream{
log: zap.NewNop(),
rootID: []uint64{0},
}
t.Run("without markers", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, true, true, false, false, true, true}, results)
})
t.Run("with prefix", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
info.prefix = "hello"
info.headPrefix = ""
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, false, false, false, false, true, true}, results)
})
t.Run("with key marker", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
info.keyMarker = "foo/bar"
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, false, false, false, false, true, true}, results)
})
t.Run("with key and upload id markers", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
info.keyMarker = "hello/world"
info.uploadID = "a"
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, false, false, false, false, false, true}, results)
})
}