[#469] List multipart uploads streaming
Some checks failed
/ DCO (pull_request) Successful in 1m37s
/ Vulncheck (pull_request) Successful in 2m0s
/ Builds (pull_request) Successful in 1m58s
/ Lint (pull_request) Failing after 2m6s
/ Tests (pull_request) Successful in 2m0s

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
Nikita Zinkevich 2024-10-30 10:04:53 +03:00
parent eff0de43d5
commit 5586b8fb96
Signed by: nzinkevich
GPG key ID: 748EA1D0B2E6420A
21 changed files with 626 additions and 363 deletions

109
api/cache/listmultipart.go vendored Normal file
View file

@ -0,0 +1,109 @@
package cache
import (
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/bluele/gcache"
"go.uber.org/zap"
)
type (
// ListMultipartSessionCache contains cache for list multiparts session (during pagination).
ListMultipartSessionCache struct {
cache gcache.Cache
logger *zap.Logger
}
// ListMultipartSessionKey is a key to find a ListMultipartSessionCache's entry.
ListMultipartSessionKey struct {
cid cid.ID
prefix string
marker string
uploadID string
}
)
const (
// DefaultListMultipartSessionCacheLifetime is a default lifetime of entries in cache of ListMultipartUploads.
DefaultListMultipartSessionCacheLifetime = time.Minute
// DefaultListMultipartSessionCacheSize is a default size of cache of ListMultipartUploads.
DefaultListMultipartSessionCacheSize = 100
)
// DefaultListMultipartSessionConfig returns new default cache expiration values.
func DefaultListMultipartSessionConfig(logger *zap.Logger) *Config {
return &Config{
Size: DefaultListMultipartSessionCacheSize,
Lifetime: DefaultListMultipartSessionCacheLifetime,
Logger: logger,
}
}
func (k *ListMultipartSessionKey) String() string {
return k.cid.EncodeToString() + k.prefix + k.marker + k.uploadID
}
// NewListMultipartSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries.
func NewListMultipartSessionCache(config *Config) *ListMultipartSessionCache {
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(_ interface{}, val interface{}) {
session, ok := val.(*data.ListMultipartSession)
if !ok {
config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)),
zap.String("expected", fmt.Sprintf("%T", session)))
}
if !session.Acquired.Load() {
session.Cancel()
}
}).Build()
return &ListMultipartSessionCache{cache: gc, logger: config.Logger}
}
// GetListMultipartSession returns a session of ListMultipartUploads request.
func (l *ListMultipartSessionCache) GetListMultipartSession(key ListMultipartSessionKey) *data.ListMultipartSession {
entry, err := l.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*data.ListMultipartSession)
if !ok {
l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
zap.String("expected", fmt.Sprintf("%T", result)))
return nil
}
return result
}
// PutListMultipartSession puts a ListMultipartUploads session info to cache.
func (l *ListMultipartSessionCache) PutListMultipartSession(key ListMultipartSessionKey, session *data.ListMultipartSession) error {
s := l.GetListMultipartSession(key)
if s != nil && s != session {
if !s.Acquired.Load() {
s.Cancel()
}
}
return l.cache.Set(key, session)
}
// DeleteListMultipartSession removes key from cache.
func (l *ListMultipartSessionCache) DeleteListMultipartSession(key ListMultipartSessionKey) {
l.cache.Remove(key)
}
// CreateListMultipartSessionCacheKey returns ListMultipartSessionKey with the given CID, prefix, marker and uploadID.
func CreateListMultipartSessionCacheKey(cnr cid.ID, prefix, marker, uploadID string) ListMultipartSessionKey {
p := ListMultipartSessionKey{
cid: cnr,
prefix: prefix,
marker: marker,
uploadID: uploadID,
}
return p
}

View file

@ -28,7 +28,7 @@ type (
const ( const (
// DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects. // DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects.
DefaultListSessionCacheLifetime = time.Second * 60 DefaultListSessionCacheLifetime = time.Minute
// DefaultListSessionCacheSize is a default size of cache of ListObjects. // DefaultListSessionCacheSize is a default size of cache of ListObjects.
DefaultListSessionCacheSize = 100 DefaultListSessionCacheSize = 100
) )

View file

@ -9,11 +9,25 @@ type VersionsStream interface {
Next(ctx context.Context) (*NodeVersion, error) Next(ctx context.Context) (*NodeVersion, error)
} }
type ListSession struct { type CommonSession struct {
Next []*ExtendedNodeVersion
Stream VersionsStream
NamesMap map[string]struct{}
Context context.Context Context context.Context
Cancel context.CancelFunc Cancel context.CancelFunc
Acquired atomic.Bool Acquired atomic.Bool
} }
type ListSession struct {
CommonSession
Next []*ExtendedNodeVersion
Stream VersionsStream
NamesMap map[string]struct{}
}
type MultipartInfoStream interface {
Next(ctx context.Context) (*MultipartInfo, error)
}
type ListMultipartSession struct {
CommonSession
Next *MultipartInfo
Stream MultipartInfoStream
}

View file

@ -207,3 +207,9 @@ func (l LockInfo) UntilDate() string {
func (l LockInfo) IsCompliance() bool { func (l LockInfo) IsCompliance() bool {
return l.isCompliance return l.isCompliance
} }
type MultipartStreamParams struct {
Prefix string
KeyMarker string
UploadIDMarker string
}

View file

@ -244,6 +244,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
Buckets: minCacheCfg, Buckets: minCacheCfg,
System: minCacheCfg, System: minCacheCfg,
AccessControl: minCacheCfg, AccessControl: minCacheCfg,
MultipartList: minCacheCfg,
NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime}, NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime},
} }
} }

View file

@ -513,7 +513,7 @@ func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Req
if maxUploadsStr != "" { if maxUploadsStr != "" {
val, err := strconv.Atoi(maxUploadsStr) val, err := strconv.Atoi(maxUploadsStr)
if err != nil || val < 1 || val > 1000 { if err != nil || val < 1 || val > maxObjectList {
h.logAndSendError(ctx, w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads)) h.logAndSendError(ctx, w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
return return
} }

View file

@ -294,6 +294,58 @@ func TestListMultipartUploads(t *testing.T) {
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID) require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
}) })
}) })
t.Run("check next markers", func(t *testing.T) {
t.Run("check both next-key-marker and next-upload-id-marker", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
require.False(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Empty(t, listUploads.NextUploadIDMarker)
require.Empty(t, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
})
t.Run("check only next-key-marker", func(t *testing.T) {
listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1)
require.True(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker)
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID)
require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key)
listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1)
require.False(t, listUploads.IsTruncated)
require.Len(t, listUploads.Uploads, 1)
require.Empty(t, listUploads.NextUploadIDMarker)
require.Empty(t, listUploads.NextKeyMarker)
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
})
})
} }
func TestMultipartUploadSize(t *testing.T) { func TestMultipartUploadSize(t *testing.T) {

View file

@ -12,15 +12,16 @@ import (
) )
type Cache struct { type Cache struct {
logger *zap.Logger logger *zap.Logger
listsCache *cache.ObjectsListCache listsCache *cache.ObjectsListCache
sessionListCache *cache.ListSessionCache sessionListCache *cache.ListSessionCache
objCache *cache.ObjectsCache objCache *cache.ObjectsCache
namesCache *cache.ObjectsNameCache namesCache *cache.ObjectsNameCache
bucketCache *cache.BucketCache bucketCache *cache.BucketCache
systemCache *cache.SystemCache systemCache *cache.SystemCache
accessCache *cache.AccessControlCache accessCache *cache.AccessControlCache
networkInfoCache *cache.NetworkInfoCache networkInfoCache *cache.NetworkInfoCache
sessionMultipartCache *cache.ListMultipartSessionCache
} }
// CachesConfig contains params for caches. // CachesConfig contains params for caches.
@ -33,6 +34,7 @@ type CachesConfig struct {
Buckets *cache.Config Buckets *cache.Config
System *cache.Config System *cache.Config
AccessControl *cache.Config AccessControl *cache.Config
MultipartList *cache.Config
NetworkInfo *cache.NetworkInfoCacheConfig NetworkInfo *cache.NetworkInfoCacheConfig
} }
@ -48,20 +50,22 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
System: cache.DefaultSystemConfig(logger), System: cache.DefaultSystemConfig(logger),
AccessControl: cache.DefaultAccessControlConfig(logger), AccessControl: cache.DefaultAccessControlConfig(logger),
NetworkInfo: cache.DefaultNetworkInfoConfig(logger), NetworkInfo: cache.DefaultNetworkInfoConfig(logger),
MultipartList: cache.DefaultListMultipartSessionConfig(logger),
} }
} }
func NewCache(cfg *CachesConfig) *Cache { func NewCache(cfg *CachesConfig) *Cache {
return &Cache{ return &Cache{
logger: cfg.Logger, logger: cfg.Logger,
listsCache: cache.NewObjectsListCache(cfg.ObjectsList), listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
sessionListCache: cache.NewListSessionCache(cfg.SessionList), sessionListCache: cache.NewListSessionCache(cfg.SessionList),
objCache: cache.New(cfg.Objects), objCache: cache.New(cfg.Objects),
namesCache: cache.NewObjectsNameCache(cfg.Names), namesCache: cache.NewObjectsNameCache(cfg.Names),
bucketCache: cache.NewBucketCache(cfg.Buckets), bucketCache: cache.NewBucketCache(cfg.Buckets),
systemCache: cache.NewSystemCache(cfg.System), systemCache: cache.NewSystemCache(cfg.System),
accessCache: cache.NewAccessControlCache(cfg.AccessControl), accessCache: cache.NewAccessControlCache(cfg.AccessControl),
networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo), networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo),
sessionMultipartCache: cache.NewListMultipartSessionCache(cfg.MultipartList),
} }
} }
@ -161,6 +165,14 @@ func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.Li
return c.sessionListCache.GetListSession(key) return c.sessionListCache.GetListSession(key)
} }
func (c *Cache) GetListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) *data.ListMultipartSession {
if !c.accessCache.Get(owner, key.String()) {
return nil
}
return c.sessionMultipartCache.GetListMultipartSession(key)
}
func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) { func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) {
if err := c.sessionListCache.PutListSession(key, session); err != nil { if err := c.sessionListCache.PutListSession(key, session); err != nil {
c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err)) c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err))
@ -176,6 +188,21 @@ func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) {
c.accessCache.Delete(owner, key.String()) c.accessCache.Delete(owner, key.String())
} }
func (c *Cache) PutListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey, session *data.ListMultipartSession) {
if err := c.sessionMultipartCache.PutListMultipartSession(key, session); err != nil {
c.logger.Warn(logs.CouldntCacheListMultipartSession, zap.Error(err))
}
if err := c.accessCache.Put(owner, key.String()); err != nil {
c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
}
}
func (c *Cache) DeleteListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) {
c.sessionMultipartCache.DeleteListMultipartSession(key)
c.accessCache.Delete(owner, key.String())
}
func (c *Cache) GetTagging(owner user.ID, key string) map[string]string { func (c *Cache) GetTagging(owner user.ID, key string) map[string]string {
if !c.accessCache.Get(owner, key) { if !c.accessCache.Get(owner, key) {
return nil return nil

View file

@ -79,7 +79,8 @@ type (
Prefix string Prefix string
MaxKeys int MaxKeys int
Marker string Marker string
Bookmark string // Bookmark contains Marker or ContinuationToken and is used for pagination and as part of a cache key for list session.
Bookmark string
} }
commonLatestVersionsListingParams struct { commonLatestVersionsListingParams struct {
@ -193,11 +194,10 @@ func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
return nil, nil, nil return nil, nil, nil
} }
session, err := n.getListLatestVersionsSession(ctx, p) session, err := n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session) generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator) objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator)
if err != nil { if err != nil {
@ -230,7 +230,7 @@ func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListi
return nil, false, nil return nil, false, nil
} }
session, err := n.getListAllVersionsSession(ctx, p) session, err := n.getListVersionsSession(ctx, p, false)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -301,48 +301,31 @@ func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int,
} }
} }
func (n *Layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) { func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
}
func (n *Layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
return n.getListVersionsSession(ctx, p, false)
}
func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
owner := n.BearerOwner(ctx) owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
session := n.cache.GetListSession(owner, cacheKey) session = n.cache.GetListSession(owner, cacheKey)
if session == nil { if session == nil || session.Acquired.Swap(true) {
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly) session = n.newSession(ctx)
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
return session, err
} }
if session.Acquired.Swap(true) {
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
}
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey) n.cache.DeleteListSession(owner, cacheKey)
return session, nil return session, nil
} }
func (n *Layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) { func (n *Layer) newSession(ctx context.Context) *data.ListSession {
session = &data.ListSession{NamesMap: make(map[string]struct{})} session := &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background()) session.Context, session.Cancel = context.WithCancel(context.Background())
// save access box data for next requests
if bd, err := middleware.GetBoxData(ctx); err == nil { if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd}) session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
} }
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly) return session
if err != nil {
return nil, err
}
return session, nil
} }
func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) { func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {

View file

@ -2,6 +2,7 @@ package layer
import ( import (
"bytes" "bytes"
"cmp"
"context" "context"
"crypto/md5" "crypto/md5"
"encoding/base64" "encoding/base64"
@ -10,17 +11,20 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"slices"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -499,47 +503,58 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
return &result, nil return &result, nil
} }
multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix) session, err := n.getListMultipartUploadsSession(ctx, p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
uploads := make([]*UploadInfo, 0, len(multipartInfos)) uploads := make([]*UploadInfo, 0, p.MaxUploads)
uniqDirs := make(map[string]struct{}) uniqDirs := make(map[string]struct{})
uploadsCount := 0
if session.Next != nil {
uploads = append(uploads, uploadInfoFromMultipartInfo(session.Next, p.Prefix, p.Delimiter))
uploadsCount++
}
for _, multipartInfo := range multipartInfos { info := session.Next
info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter) for uploadsCount < p.MaxUploads {
if info != nil { info, err = session.Stream.Next(ctx)
if info.IsDir { if err != nil {
if _, ok := uniqDirs[info.Key]; ok { if err == io.EOF {
continue break
}
uniqDirs[info.Key] = struct{}{}
} }
uploads = append(uploads, info) n.reqLogger(ctx).Warn(logs.CouldNotGetMultipartUploadInfo, zap.Error(err))
continue
} }
upload := uploadInfoFromMultipartInfo(info, p.Prefix, p.Delimiter)
if upload.IsDir {
if _, ok := uniqDirs[upload.Key]; ok {
continue
}
uniqDirs[upload.Key] = struct{}{}
}
uploads = append(uploads, upload)
uploadsCount++
} }
sort.Slice(uploads, func(i, j int) bool { isTruncated := true
if uploads[i].Key == uploads[j].Key { next, err := session.Stream.Next(ctx)
return uploads[i].UploadID < uploads[j].UploadID if err != nil {
} if err == io.EOF {
return uploads[i].Key < uploads[j].Key isTruncated = false
})
if p.KeyMarker != "" {
if p.UploadIDMarker != "" {
uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads)
} else { } else {
uploads = trimAfterUploadKey(p.KeyMarker, uploads) return nil, err
} }
} }
if len(uploads) > p.MaxUploads { if isTruncated && info != nil {
// put to session redundant multipart upload which we read to check for EOF
session.Next = next
result.IsTruncated = true result.IsTruncated = true
uploads = uploads[:p.MaxUploads] result.NextUploadIDMarker = info.UploadID
result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID result.NextKeyMarker = info.Key
result.NextKeyMarker = uploads[len(uploads)-1].Key cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, info.Key, info.UploadID)
n.putListMultipartUploadsSession(ctx, session, cacheKey)
} }
for _, ov := range uploads { for _, ov := range uploads {
@ -550,9 +565,62 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
} }
} }
slices.SortFunc(result.Uploads, func(a, b *UploadInfo) int {
keyCmp := cmp.Compare(a.Key, b.Key)
if keyCmp == 0 {
return cmp.Compare(a.UploadID, b.UploadID)
}
return keyCmp
})
return &result, nil return &result, nil
} }
func (n *Layer) putListMultipartUploadsSession(ctx context.Context, session *data.ListMultipartSession, cacheKey cache.ListMultipartSessionKey) {
session.Acquired.Store(false)
n.cache.PutListMultipartSession(n.BearerOwner(ctx), cacheKey, session)
}
func (n *Layer) getListMultipartUploadsSession(ctx context.Context, p *ListMultipartUploadsParams) (session *data.ListMultipartSession, err error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, p.KeyMarker, p.UploadIDMarker)
session = n.cache.GetListMultipartSession(owner, cacheKey)
if session == nil || session.Acquired.Swap(true) {
session = newListMultipartSession(ctx)
params := data.MultipartStreamParams{
Prefix: p.Prefix,
KeyMarker: p.KeyMarker,
UploadIDMarker: p.UploadIDMarker,
}
session.Stream, err = n.treeService.GetMultipartUploadsByPrefix(session.Context, p.Bkt, params)
if err != nil {
return nil, err
}
}
// if after reading next object from stream in session the current cache value already
// doesn't match with next token in cache key
n.cache.DeleteListMultipartSession(owner, cacheKey)
return session, nil
}
func newListMultipartSession(ctx context.Context) *data.ListMultipartSession {
reqCtx, cancel := context.WithCancel(context.Background())
session := &data.ListMultipartSession{
CommonSession: data.CommonSession{
Context: reqCtx,
Cancel: cancel,
},
}
// save access box data for next requests
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
}
return session
}
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error { func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
multipartInfo, parts, err := n.getUploadParts(ctx, p) multipartInfo, parts, err := n.getUploadParts(ctx, p)
if err != nil { if err != nil {
@ -677,44 +745,10 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
return multipartInfo, res, nil return multipartInfo, res, nil
} }
func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo {
var res []*UploadInfo
if len(uploads) != 0 && uploads[len(uploads)-1].Key < key {
return res
}
for _, obj := range uploads {
if obj.Key >= key && obj.UploadID > id {
res = append(res, obj)
}
}
return res
}
func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
var result []*UploadInfo
if len(objects) != 0 && objects[len(objects)-1].Key <= key {
return result
}
for i, obj := range objects {
if obj.Key > key {
result = objects[i:]
break
}
}
return result
}
func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo { func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo {
var isDir bool var isDir bool
key := uploadInfo.Key key := uploadInfo.Key
if !strings.HasPrefix(key, prefix) {
return nil
}
if len(delimiter) > 0 { if len(delimiter) > 0 {
tail := strings.TrimPrefix(key, prefix) tail := strings.TrimPrefix(key, prefix)
index := strings.Index(tail, delimiter) index := strings.Index(tail, delimiter)

View file

@ -1,108 +0,0 @@
package layer
import (
"sort"
"testing"
"github.com/stretchr/testify/require"
)
func TestTrimAfterUploadIDAndKey(t *testing.T) {
uploads := []*UploadInfo{
{Key: "j", UploadID: "k"}, // key < id <
{Key: "l", UploadID: "p"}, // key < id >
{Key: "n", UploadID: "m"}, // key = id <
{Key: "n", UploadID: "o"}, // pivot
{Key: "n", UploadID: "q"}, // key = id >
{Key: "p", UploadID: "h"}, // key > id <
{Key: "q", UploadID: "r"}, // key > id >
}
expectedUploadsListsIndexes := [][]int{
{1, 2, 3, 4, 6},
{4, 6},
{3, 4, 6},
{4, 6},
{6},
{6},
{},
}
sort.Slice(uploads, func(i, j int) bool {
if uploads[i].Key == uploads[j].Key {
return uploads[i].UploadID < uploads[j].UploadID
}
return uploads[i].Key < uploads[j].Key
})
length := len(uploads)
t.Run("the last element's key is less, upload id is less", func(t *testing.T) {
keys := trimAfterUploadIDAndKey("z", "a", uploads)
require.Empty(t, keys)
require.Len(t, uploads, length)
})
t.Run("the last element's key is less, upload id is greater", func(t *testing.T) {
keys := trimAfterUploadIDAndKey("z", "a", uploads)
require.Empty(t, keys)
require.Len(t, uploads, length)
})
t.Run("check for uploads", func(t *testing.T) {
for i, u := range uploads {
list := trimAfterUploadIDAndKey(u.Key, u.UploadID, uploads)
require.Equal(t, len(list), len(expectedUploadsListsIndexes[i]))
for j, idx := range expectedUploadsListsIndexes[i] {
require.Equal(t, list[j], uploads[idx])
}
}
})
}
func TestTrimAfterUploadKey(t *testing.T) {
var (
uploadKeys = []string{"e", "f", "f", "g", "h", "i"}
theSameKeyIdx = []int{1, 2}
diffKeyIdx = []int{0, 3}
lastIdx = len(uploadKeys) - 1
)
uploadsInfos := make([]*UploadInfo, 0, len(uploadKeys))
for _, k := range uploadKeys {
uploadsInfos = append(uploadsInfos, &UploadInfo{Key: k})
}
t.Run("empty list", func(t *testing.T) {
keys := trimAfterUploadKey("f", []*UploadInfo{})
require.Len(t, keys, 0)
})
t.Run("the last element is less than a key", func(t *testing.T) {
keys := trimAfterUploadKey("j", uploadsInfos)
require.Empty(t, keys)
require.Len(t, uploadsInfos, len(uploadKeys))
})
t.Run("different keys in sequence", func(t *testing.T) {
for _, i := range diffKeyIdx {
keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos)
require.Len(t, keys, len(uploadKeys)-i-1)
require.Equal(t, keys, uploadsInfos[i+1:])
require.Len(t, uploadsInfos, len(uploadKeys))
}
})
t.Run("the same keys in the sequence first element", func(t *testing.T) {
for _, i := range theSameKeyIdx {
keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos)
require.Len(t, keys, 3)
require.Equal(t, keys, uploadsInfos[3:])
require.Len(t, uploadsInfos, len(uploadKeys))
}
})
t.Run("last element", func(t *testing.T) {
keys := trimAfterUploadKey(uploadKeys[lastIdx], uploadsInfos)
require.Empty(t, keys)
})
}

View file

@ -53,7 +53,7 @@ type Service interface {
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error)
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
// AddPart puts a node to a system tree as a child of appropriate multipart upload // AddPart puts a node to a system tree as a child of appropriate multipart upload

View file

@ -328,7 +328,7 @@ func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data
return nil return nil
} }
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, string) ([]*data.MultipartInfo, error) { func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, data.MultipartStreamParams) (data.MultipartInfoStream, error) {
panic("implement me") panic("implement me")
} }

View file

@ -1000,6 +1000,8 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size) cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime) cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime)
cacheCfg.MultipartList.Lifetime = fetchCacheLifetime(v, l, cfgMultipartListCacheLifetime, cacheCfg.MultipartList.Lifetime)
cacheCfg.MultipartList.Size = fetchCacheSize(v, l, cfgMultipartListCacheSize, cacheCfg.MultipartList.Size)
return cacheCfg return cacheCfg
} }

View file

@ -141,6 +141,8 @@ const ( // Settings.
cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime" cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime"
cfgFrostfsIDCacheSize = "cache.frostfsid.size" cfgFrostfsIDCacheSize = "cache.frostfsid.size"
cfgNetworkInfoCacheLifetime = "cache.network_info.lifetime" cfgNetworkInfoCacheLifetime = "cache.network_info.lifetime"
cfgMultipartListCacheLifetime = "cache.multipart_list_session.lifetime"
cfgMultipartListCacheSize = "cache.multipart_list_session.size"
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval" cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"

View file

@ -101,9 +101,12 @@ S3_GW_CACHE_OBJECTS_SIZE=1000000
# Cache which keeps lists of objects in buckets # Cache which keeps lists of objects in buckets
S3_GW_CACHE_LIST_LIFETIME=1m S3_GW_CACHE_LIST_LIFETIME=1m
S3_GW_CACHE_LIST_SIZE=100000 S3_GW_CACHE_LIST_SIZE=100000
# Cache which keeps listing session # Cache which keeps listing objects session
S3_GW_CACHE_LIST_SESSION_LIFETIME=1m S3_GW_CACHE_LIST_SESSION_LIFETIME=1m
S3_GW_CACHE_LIST_SESSION_SIZE=100 S3_GW_CACHE_LIST_SESSION_SIZE=100
# Cache which keeps listing multipart uploads session
S3_GW_CACHE_MULTIPART_LIST_SESSION_LIFETIME=1m
S3_GW_CACHE_MULTIPART_LIST_SESSION_SIZE=100
# Cache which contains mapping of bucket name to bucket info # Cache which contains mapping of bucket name to bucket info
S3_GW_CACHE_BUCKETS_LIFETIME=1m S3_GW_CACHE_BUCKETS_LIFETIME=1m
S3_GW_CACHE_BUCKETS_SIZE=1000 S3_GW_CACHE_BUCKETS_SIZE=1000

View file

@ -124,6 +124,9 @@ cache:
list_session: list_session:
lifetime: 1m lifetime: 1m
size: 100 size: 100
multipart_list_session:
lifetime: 1m
size: 100
# Cache which contains mapping of nice name to object addresses # Cache which contains mapping of nice name to object addresses
names: names:
lifetime: 1m lifetime: 1m

View file

@ -426,6 +426,9 @@ cache:
list_session: list_session:
lifetime: 1m lifetime: 1m
size: 100 size: 100
multipart_list_session:
lifetime: 1m
size: 10000
names: names:
lifetime: 1m lifetime: 1m
size: 1000 size: 1000
@ -452,19 +455,20 @@ cache:
lifetime: 1m lifetime: 1m
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------| |--------------------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). | | `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. | | `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. | | `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. | | `multipart_list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing of multipart uploads. |
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. | | `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings etc. | | `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
| `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`<br>`size: 100` | Cache which stores access box with tokens by its address. | | `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings etc. |
| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. | | `accessbox` | [Accessbox cache config](#accessbox-subsection) | `lifetime: 10m`<br>`size: 100` | Cache which stores access box with tokens by its address. |
| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. | | `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. |
| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. | | `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. |
| `network_info` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores network info. | | `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. |
| `network_info` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores network info. |
#### `cache` subsection #### `cache` subsection

View file

@ -8,6 +8,8 @@ const (
UpdateAccessCredObjectIntoFrostFS = "update access cred object into FrostFS" // Info in ../../authmate/authmate.go UpdateAccessCredObjectIntoFrostFS = "update access cred object into FrostFS" // Info in ../../authmate/authmate.go
MetricsAreDisabled = "metrics are disabled" // Warn in ../../metrics/app.go MetricsAreDisabled = "metrics are disabled" // Warn in ../../metrics/app.go
FoundMoreThanOneUnversionedNode = "found more than one unversioned node" // Debug in ../../pkg/service/tree/tree.go FoundMoreThanOneUnversionedNode = "found more than one unversioned node" // Debug in ../../pkg/service/tree/tree.go
CouldNotParseTreeNode = "could not parse tree node" // Error in ../../pkg/service/tree/tree.go
CouldNotFormFilePath = "could not form file path" // Error in ../../pkg/service/tree/tree.go
ServiceIsRunning = "service is running" // Info in ../../cmd/s3-gw/service.go ServiceIsRunning = "service is running" // Info in ../../cmd/s3-gw/service.go
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../cmd/s3-gw/service.go ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../cmd/s3-gw/service.go
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../cmd/s3-gw/service.go ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../cmd/s3-gw/service.go
@ -67,6 +69,7 @@ const (
CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go
CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go
MismatchedObjEncryptionInfo = "mismatched obj encryptionInfo" // Warn in ../../api/layer/multipart_upload.go MismatchedObjEncryptionInfo = "mismatched obj encryptionInfo" // Warn in ../../api/layer/multipart_upload.go
CouldNotGetMultipartUploadInfo = "could not get multipart upload info" // Warn in ../../api/layer/multipart_upload.go
UploadPart = "upload part" // Debug in ../../api/layer/multipart_upload.go UploadPart = "upload part" // Debug in ../../api/layer/multipart_upload.go
CouldntDeleteOldPartObject = "couldn't delete old part object" // Error in ../../api/layer/multipart_upload.go CouldntDeleteOldPartObject = "couldn't delete old part object" // Error in ../../api/layer/multipart_upload.go
CouldNotPutCompletedObject = "could not put a completed object (multipart upload)" // Error in ../../api/layer/multipart_upload.go CouldNotPutCompletedObject = "could not put a completed object (multipart upload)" // Error in ../../api/layer/multipart_upload.go
@ -89,6 +92,7 @@ const (
CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go
CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go
CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go
CouldntCacheListMultipartSession = "couldn't cache list multipart session" // Warn in ../../api/layer/cache.go
CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go

View file

@ -84,7 +84,9 @@ var (
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error. // ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
ErrGatewayTimeout = frostfs.ErrGatewayTimeout ErrGatewayTimeout = frostfs.ErrGatewayTimeout
errNodeDoesntContainFileName = fmt.Errorf("node doesn't contain FileName") errNodeDoesntContainFileName = errors.New("node doesn't contain FileName")
errParentPathNotFound = errors.New("couldn't get parent path")
) )
const ( const (
@ -1061,7 +1063,7 @@ func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *
var filepath string var filepath string
if !s.intermediateRootID.Equal(trNode.ID) { if !s.intermediateRootID.Equal(trNode.ID) {
if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil { if filepath, err = formFilePath(trNode, fileName, s.namesMap); err != nil {
return nil, false, fmt.Errorf("invalid node order: %w", err) return nil, false, fmt.Errorf("invalid node order: %w", err)
} }
} else { } else {
@ -1165,58 +1167,18 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr
return intermediateNodes, nil return intermediateNodes, nil
} }
func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) { func (c *Tree) getSubTreeByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, []uint64, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix) rootID, _, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil { if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) { if errors.Is(err, tree.ErrNodeNotFound) {
return nil, "", nil return nil, nil, nil
} }
return nil, "", err return nil, nil, err
} }
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2, false) stream, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
nodesMap := make(map[string][]NodeResponse, len(subTree)) return stream, rootID, err
for _, node := range subTree {
if MultiID(rootID).Equal(node.GetNodeID()) {
continue
}
fileName := getFilename(node)
if !strings.HasPrefix(fileName, tailPrefix) {
continue
}
nodes := nodesMap[fileName]
// Add all nodes if flag latestOnly is false.
// Add all intermediate nodes
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
if len(nodes) == 0 {
nodes = []NodeResponse{node}
} else if !latestOnly || isIntermediate(node) {
nodes = append(nodes, node)
} else if isIntermediate(nodes[0]) {
nodes = append([]NodeResponse{node}, nodes...)
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
nodes[0] = node
}
nodesMap[fileName] = nodes
}
result := make([]NodeResponse, 0, len(subTree))
for _, nodes := range nodesMap {
result = append(result, nodes...)
}
return result, strings.TrimSuffix(prefix, tailPrefix), nil
} }
func getFilename(node NodeResponse) string { func getFilename(node NodeResponse) string {
@ -1237,20 +1199,19 @@ func isIntermediate(node NodeResponse) bool {
return node.GetMeta()[0].GetKey() == FileNameKey return node.GetMeta()[0].GetKey() == FileNameKey
} }
func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) { func formFilePath(node *treeNode, fileName string, namesMap map[uint64]string) (string, error) {
var filepath string var filePath string
for i, id := range node.GetParentID() { for i, id := range node.ParentID {
parentPath, ok := namesMap[id] parentPath, ok := namesMap[id]
if !ok { if !ok {
return "", fmt.Errorf("couldn't get parent path") return "", errParentPathNotFound
} }
filePath = parentPath + separator + fileName
filepath = parentPath + separator + fileName namesMap[node.ID[i]] = filePath
namesMap[node.GetNodeID()[i]] = filepath
} }
return filepath, nil return filePath, nil
} }
func parseTreeNode(node NodeResponse) (*treeNode, string, error) { func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
@ -1267,10 +1228,6 @@ func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
return tNode, fileName, nil return tNode, fileName, nil
} }
func formLatestNodeKey(parentID uint64, fileName string) string {
return strconv.FormatUint(parentID, 10) + "." + fileName
}
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) { func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
return c.getUnversioned(ctx, bktInfo, versionTree, filepath) return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
} }
@ -1313,84 +1270,135 @@ func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketIn
return err return err
} }
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) { type multipartInfoStream struct {
subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false) log *zap.Logger
if err != nil { nodePaths map[uint64]string
return nil, err rootID []uint64
} // childStream stream of children nodes of prefix node.
childStream SubTreeStream
var result []*data.MultipartInfo // currentStream stream of children's nodes with max depth.
for _, node := range subTreeNodes { currentStream SubTreeStream
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix) treeService ServiceClient
if err != nil { bktInfo *data.BucketInfo
return nil, err uploadID string
} keyMarker string
result = append(result, multipartUploads...) headPrefix string
} prefix string
return result, nil
} }
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) { func (m *multipartInfoStream) Next(ctx context.Context) (*data.MultipartInfo, error) {
// sorting in getSubTree leads to skipping nodes that doesn't have FileName attribute var tNode *treeNode
// so when we are only interested in multipart nodes, we can set this flag var filePath string
// (despite we sort multiparts in above layer anyway)
// to skip its children (parts) that don't have FileName if m.currentStream == nil {
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth, true) var err error
if m.currentStream, err = m.openNewStream(ctx); err != nil {
return nil, err
}
}
for {
var err error
tNode, err = m.getTreeNodeFromStream(m.currentStream)
if err != nil {
if err == io.EOF {
var err2 error
if m.currentStream, err2 = m.openNewStream(ctx); err2 != nil {
return nil, err2
}
continue
}
return nil, err
}
var ok bool
if filePath, ok = m.checkTreeNode(tNode); ok {
break
}
}
return newMultipartInfoFromTreeNode(m.log, filePath, tNode)
}
// openNewStream creates subtree stream from childStream`s node.
func (m *multipartInfoStream) openNewStream(ctx context.Context) (SubTreeStream, error) {
node, err := m.getTreeNodeFromStream(m.childStream)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if node.ID[0] == m.rootID[0] {
var parentPrefix string // skip root node
if parentFilePath != "" { // The root of subTree can also have a parent return m.openNewStream(ctx)
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
} }
stream, err := m.treeService.GetSubTreeStream(ctx, m.bktInfo, systemTree, node.ID, maxGetSubTreeDepth)
if err != nil {
return nil, err
}
return stream, nil
}
var filepath string func (m *multipartInfoStream) getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error) {
namesMap := make(map[uint64]string, len(subTree)) node, err := stream.Next()
multiparts := make(map[string][]*data.MultipartInfo, len(subTree)) if err != nil {
return nil, err
}
tNode, err := newTreeNode(node)
if err != nil {
return nil, err
}
return tNode, nil
}
for i, node := range subTree { func (m *multipartInfoStream) checkTreeNode(tNode *treeNode) (string, bool) {
tNode, fileName, err := parseTreeNode(node) var ok bool
if err != nil { var err error
continue
}
if i != 0 { fileName, ok := tNode.FileName()
if filepath, err = formFilePath(node, fileName, namesMap); err != nil { if !ok {
return nil, fmt.Errorf("invalid node order: %w", err) return "", false
} }
} else { filePath, err := formFilePath(tNode, fileName, m.nodePaths)
filepath = parentPrefix + fileName if err != nil {
for _, id := range tNode.ID { filePath = fileName
namesMap[id] = filepath m.nodePaths[tNode.ID[0]] = filePath
} }
} filePath = m.headPrefix + filePath
if !strings.HasPrefix(filePath, m.prefix) {
multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode) return "", false
if err != nil || multipartInfo.Finished { }
continue if _, ok = tNode.Meta[finishedKV]; ok {
} return "", false
}
for _, id := range node.GetParentID() { if id, ok := tNode.Meta[uploadIDKV]; ok {
key := formLatestNodeKey(id, fileName) if m.keyMarker == "" || filePath > m.keyMarker || (filePath == m.keyMarker && m.uploadID != "" && id > m.uploadID) {
multipartInfos, ok := multiparts[key] return filePath, true
if !ok {
multipartInfos = []*data.MultipartInfo{multipartInfo}
} else {
multipartInfos = append(multipartInfos, multipartInfo)
}
multiparts[key] = multipartInfos
} }
} }
result := make([]*data.MultipartInfo, 0, len(multiparts)) return "", false
for _, multipartInfo := range multiparts { }
result = append(result, multipartInfo...)
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error) {
stream, rootID, err := c.getSubTreeByPrefixStream(ctx, bktInfo, systemTree, params.Prefix)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, nil
}
return nil, err
} }
return result, nil return &multipartInfoStream{
log: c.reqLogger(ctx),
rootID: rootID,
childStream: stream,
nodePaths: make(map[uint64]string),
treeService: c.service,
bktInfo: bktInfo,
uploadID: params.UploadIDMarker,
keyMarker: params.KeyMarker,
prefix: params.Prefix,
headPrefix: strings.TrimRightFunc(params.Prefix, func(r rune) bool {
return r != '/'
}),
}, nil
} }
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) { func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {

View file

@ -12,6 +12,7 @@ import (
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -359,3 +360,121 @@ func TestSplitTreeMultiparts(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Len(t, parts, 1) require.Len(t, parts, 1)
} }
func TestCheckTreeNode(t *testing.T) {
treeNodes := []*treeNode{
// foo/
{
ID: []uint64{1},
ParentID: []uint64{0},
Meta: map[string]string{
"FileName": "foo",
},
},
// foo/ant
{
ID: []uint64{2},
ParentID: []uint64{1},
Meta: map[string]string{
"FileName": "ant",
"UploadId": "d",
},
},
// foo/bar
{
ID: []uint64{3},
ParentID: []uint64{1},
Meta: map[string]string{
"FileName": "bar",
"UploadId": "c",
},
},
// foo/finished
{
ID: []uint64{4},
ParentID: []uint64{1},
Meta: map[string]string{
"FileName": "finished",
"UploadId": "e",
"Finished": "True",
},
},
// hello/
{
ID: []uint64{5},
ParentID: []uint64{0},
Meta: map[string]string{
"FileName": "hello",
},
},
// hello/world
{
ID: []uint64{6},
ParentID: []uint64{5},
Meta: map[string]string{
"FileName": "world",
"UploadId": "a",
},
},
// hello/world
{
ID: []uint64{7},
ParentID: []uint64{5},
Meta: map[string]string{
"FileName": "world",
"UploadId": "b",
},
},
}
info := multipartInfoStream{
log: zap.NewNop(),
rootID: []uint64{0},
}
t.Run("without markers", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, true, true, false, false, true, true}, results)
})
t.Run("with prefix", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
info.prefix = "hello"
info.headPrefix = ""
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, false, false, false, false, true, true}, results)
})
t.Run("with key marker", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
info.keyMarker = "foo/bar"
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, false, false, false, false, true, true}, results)
})
t.Run("with key and upload id markers", func(t *testing.T) {
info.nodePaths = make(map[uint64]string)
info.keyMarker = "hello/world"
info.uploadID = "a"
results := make([]bool, 0, len(treeNodes))
for _, node := range treeNodes {
_, valid := info.checkTreeNode(node)
results = append(results, valid)
}
require.Equal(t, []bool{false, false, false, false, false, false, true}, results)
})
}