[#469] List multipart uploads streaming #527
109
api/cache/listmultipart.go
vendored
Normal file
|
@ -0,0 +1,109 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"github.com/bluele/gcache"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
// ListMultipartSessionCache contains cache for list multiparts session (during pagination).
|
||||
ListMultipartSessionCache struct {
|
||||
cache gcache.Cache
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// ListMultipartSessionKey is a key to find a ListMultipartSessionCache's entry.
|
||||
ListMultipartSessionKey struct {
|
||||
cid cid.ID
|
||||
prefix string
|
||||
marker string
|
||||
uploadID string
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultListMultipartSessionCacheLifetime is a default lifetime of entries in cache of ListMultipartUploads.
|
||||
DefaultListMultipartSessionCacheLifetime = time.Minute
|
||||
// DefaultListMultipartSessionCacheSize is a default size of cache of ListMultipartUploads.
|
||||
DefaultListMultipartSessionCacheSize = 100
|
||||
|
||||
)
|
||||
|
||||
// DefaultListMultipartSessionConfig returns new default cache expiration values.
|
||||
func DefaultListMultipartSessionConfig(logger *zap.Logger) *Config {
|
||||
return &Config{
|
||||
Size: DefaultListMultipartSessionCacheSize,
|
||||
Lifetime: DefaultListMultipartSessionCacheLifetime,
|
||||
Logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (k *ListMultipartSessionKey) String() string {
|
||||
return k.cid.EncodeToString() + k.prefix + k.marker + k.uploadID
|
||||
}
|
||||
|
||||
// NewListMultipartSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries.
|
||||
func NewListMultipartSessionCache(config *Config) *ListMultipartSessionCache {
|
||||
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(_ interface{}, val interface{}) {
|
||||
session, ok := val.(*data.ListMultipartSession)
|
||||
if !ok {
|
||||
config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)),
|
||||
zap.String("expected", fmt.Sprintf("%T", session)))
|
||||
}
|
||||
|
||||
if !session.Acquired.Load() {
|
||||
session.Cancel()
|
||||
}
|
||||
}).Build()
|
||||
return &ListMultipartSessionCache{cache: gc, logger: config.Logger}
|
||||
}
|
||||
|
||||
// GetListMultipartSession returns a session of ListMultipartUploads request.
|
||||
func (l *ListMultipartSessionCache) GetListMultipartSession(key ListMultipartSessionKey) *data.ListMultipartSession {
|
||||
entry, err := l.cache.Get(key)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
result, ok := entry.(*data.ListMultipartSession)
|
||||
if !ok {
|
||||
l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||
zap.String("expected", fmt.Sprintf("%T", result)))
|
||||
return nil
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// PutListMultipartSession puts a ListMultipartUploads session info to cache.
|
||||
func (l *ListMultipartSessionCache) PutListMultipartSession(key ListMultipartSessionKey, session *data.ListMultipartSession) error {
|
||||
s := l.GetListMultipartSession(key)
|
||||
if s != nil && s != session {
|
||||
if !s.Acquired.Load() {
|
||||
s.Cancel()
|
||||
}
|
||||
}
|
||||
return l.cache.Set(key, session)
|
||||
}
|
||||
|
||||
// DeleteListMultipartSession removes key from cache.
|
||||
func (l *ListMultipartSessionCache) DeleteListMultipartSession(key ListMultipartSessionKey) {
|
||||
l.cache.Remove(key)
|
||||
}
|
||||
|
||||
// CreateListMultipartSessionCacheKey returns ListMultipartSessionKey with the given CID, prefix, marker and uploadID.
|
||||
func CreateListMultipartSessionCacheKey(cnr cid.ID, prefix, marker, uploadID string) ListMultipartSessionKey {
|
||||
p := ListMultipartSessionKey{
|
||||
cid: cnr,
|
||||
prefix: prefix,
|
||||
marker: marker,
|
||||
uploadID: uploadID,
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
2
api/cache/listsession.go
vendored
|
@ -28,7 +28,7 @@ type (
|
|||
|
||||
const (
|
||||
// DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects.
|
||||
DefaultListSessionCacheLifetime = time.Second * 60
|
||||
DefaultListSessionCacheLifetime = time.Minute
|
||||
// DefaultListSessionCacheSize is a default size of cache of ListObjects.
|
||||
DefaultListSessionCacheSize = 100
|
||||
)
|
||||
|
|
|
@ -9,11 +9,25 @@ type VersionsStream interface {
|
|||
Next(ctx context.Context) (*NodeVersion, error)
|
||||
}
|
||||
|
||||
type ListSession struct {
|
||||
Next []*ExtendedNodeVersion
|
||||
Stream VersionsStream
|
||||
NamesMap map[string]struct{}
|
||||
type CommonSession struct {
|
||||
Context context.Context
|
||||
Cancel context.CancelFunc
|
||||
Acquired atomic.Bool
|
||||
}
|
||||
|
||||
type ListSession struct {
|
||||
CommonSession
|
||||
Next []*ExtendedNodeVersion
|
||||
Stream VersionsStream
|
||||
NamesMap map[string]struct{}
|
||||
}
|
||||
|
||||
type MultipartInfoStream interface {
|
||||
Next(ctx context.Context) (*MultipartInfo, error)
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we pass Why do we pass `marker` and `uploadID`?
|
||||
}
|
||||
|
||||
type ListMultipartSession struct {
|
||||
CommonSession
|
||||
Next *MultipartInfo
|
||||
Stream MultipartInfoStream
|
||||
}
|
||||
|
|
|
@ -207,3 +207,9 @@ func (l LockInfo) UntilDate() string {
|
|||
func (l LockInfo) IsCompliance() bool {
|
||||
return l.isCompliance
|
||||
}
|
||||
|
||||
type MultipartStreamParams struct {
|
||||
Prefix string
|
||||
KeyMarker string
|
||||
UploadIDMarker string
|
||||
}
|
||||
|
|
|
@ -244,6 +244,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
|
|||
Buckets: minCacheCfg,
|
||||
System: minCacheCfg,
|
||||
AccessControl: minCacheCfg,
|
||||
MultipartList: minCacheCfg,
|
||||
NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -513,7 +513,7 @@ func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Req
|
|||
|
||||
if maxUploadsStr != "" {
|
||||
val, err := strconv.Atoi(maxUploadsStr)
|
||||
if err != nil || val < 1 || val > 1000 {
|
||||
if err != nil || val < 1 || val > maxObjectList {
|
||||
h.logAndSendError(ctx, w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
|
||||
return
|
||||
}
|
||||
|
|
|
@ -266,6 +266,36 @@ func TestListMultipartUploads(t *testing.T) {
|
|||
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID)
|
||||
})
|
||||
|
||||
t.Run("check delimiter", func(t *testing.T) {
|
||||
t.Run("not truncated", func(t *testing.T) {
|
||||
listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", -1)
|
||||
require.Len(t, listUploads.Uploads, 0)
|
||||
require.Len(t, listUploads.CommonPrefixes, 2)
|
||||
require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
|
||||
require.Equal(t, "/zzz/", listUploads.CommonPrefixes[1].Prefix)
|
||||
})
|
||||
|
||||
t.Run("truncated", func(t *testing.T) {
|
||||
listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", 1)
|
||||
require.Len(t, listUploads.Uploads, 0)
|
||||
require.Len(t, listUploads.CommonPrefixes, 1)
|
||||
require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
|
||||
require.True(t, listUploads.IsTruncated)
|
||||
|
||||
listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
|
||||
require.Len(t, listUploads.Uploads, 0)
|
||||
require.Len(t, listUploads.CommonPrefixes, 1)
|
||||
require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
|
||||
require.True(t, listUploads.IsTruncated)
|
||||
|
||||
listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
|
||||
require.Len(t, listUploads.Uploads, 0)
|
||||
require.Len(t, listUploads.CommonPrefixes, 1)
|
||||
require.Equal(t, "/zzz/", listUploads.CommonPrefixes[0].Prefix)
|
||||
require.False(t, listUploads.IsTruncated)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("check markers", func(t *testing.T) {
|
||||
t.Run("check only key-marker", func(t *testing.T) {
|
||||
listUploads := listMultipartUploads(hc, bktName, "", "", "", objName2, -1)
|
||||
|
@ -294,6 +324,58 @@ func TestListMultipartUploads(t *testing.T) {
|
|||
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("check next markers", func(t *testing.T) {
|
||||
t.Run("check both next-key-marker and next-upload-id-marker", func(t *testing.T) {
|
||||
listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1)
|
||||
require.True(t, listUploads.IsTruncated)
|
||||
require.Len(t, listUploads.Uploads, 1)
|
||||
require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker)
|
||||
require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker)
|
||||
require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
|
||||
require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key)
|
||||
|
||||
listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
|
||||
require.True(t, listUploads.IsTruncated)
|
||||
require.Len(t, listUploads.Uploads, 1)
|
||||
require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker)
|
||||
require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker)
|
||||
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID)
|
||||
require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key)
|
||||
|
||||
listUploads = listMultipartUploads(hc, bktName, "", "", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
|
||||
require.False(t, listUploads.IsTruncated)
|
||||
require.Len(t, listUploads.Uploads, 1)
|
||||
require.Empty(t, listUploads.NextUploadIDMarker)
|
||||
require.Empty(t, listUploads.NextKeyMarker)
|
||||
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
|
||||
})
|
||||
|
||||
t.Run("check only next-key-marker", func(t *testing.T) {
|
||||
listUploads := listMultipartUploads(hc, bktName, "", "", "", "", 1)
|
||||
require.True(t, listUploads.IsTruncated)
|
||||
require.Len(t, listUploads.Uploads, 1)
|
||||
require.Equal(t, uploadInfo1.Key, listUploads.NextKeyMarker)
|
||||
require.Equal(t, uploadInfo1.UploadID, listUploads.NextUploadIDMarker)
|
||||
require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
|
||||
require.Equal(t, uploadInfo1.Key, listUploads.Uploads[0].Key)
|
||||
|
||||
listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1)
|
||||
require.True(t, listUploads.IsTruncated)
|
||||
require.Len(t, listUploads.Uploads, 1)
|
||||
require.Equal(t, uploadInfo2.Key, listUploads.NextKeyMarker)
|
||||
require.Equal(t, uploadInfo2.UploadID, listUploads.NextUploadIDMarker)
|
||||
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[0].UploadID)
|
||||
require.Equal(t, uploadInfo2.Key, listUploads.Uploads[0].Key)
|
||||
|
||||
listUploads = listMultipartUploads(hc, bktName, "", "", "", listUploads.NextKeyMarker, 1)
|
||||
require.False(t, listUploads.IsTruncated)
|
||||
require.Len(t, listUploads.Uploads, 1)
|
||||
require.Empty(t, listUploads.NextUploadIDMarker)
|
||||
require.Empty(t, listUploads.NextKeyMarker)
|
||||
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMultipartUploadSize(t *testing.T) {
|
||||
|
|
|
@ -21,6 +21,7 @@ type Cache struct {
|
|||
systemCache *cache.SystemCache
|
||||
accessCache *cache.AccessControlCache
|
||||
networkInfoCache *cache.NetworkInfoCache
|
||||
sessionMultipartCache *cache.ListMultipartSessionCache
|
||||
}
|
||||
|
||||
// CachesConfig contains params for caches.
|
||||
|
@ -33,6 +34,7 @@ type CachesConfig struct {
|
|||
Buckets *cache.Config
|
||||
System *cache.Config
|
||||
AccessControl *cache.Config
|
||||
MultipartList *cache.Config
|
||||
NetworkInfo *cache.NetworkInfoCacheConfig
|
||||
}
|
||||
|
||||
|
@ -48,6 +50,7 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
|
|||
System: cache.DefaultSystemConfig(logger),
|
||||
AccessControl: cache.DefaultAccessControlConfig(logger),
|
||||
NetworkInfo: cache.DefaultNetworkInfoConfig(logger),
|
||||
MultipartList: cache.DefaultListMultipartSessionConfig(logger),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -62,6 +65,7 @@ func NewCache(cfg *CachesConfig) *Cache {
|
|||
systemCache: cache.NewSystemCache(cfg.System),
|
||||
accessCache: cache.NewAccessControlCache(cfg.AccessControl),
|
||||
networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo),
|
||||
sessionMultipartCache: cache.NewListMultipartSessionCache(cfg.MultipartList),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -161,6 +165,14 @@ func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.Li
|
|||
return c.sessionListCache.GetListSession(key)
|
||||
}
|
||||
|
||||
func (c *Cache) GetListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) *data.ListMultipartSession {
|
||||
if !c.accessCache.Get(owner, key.String()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.sessionMultipartCache.GetListMultipartSession(key)
|
||||
}
|
||||
|
||||
func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) {
|
||||
if err := c.sessionListCache.PutListSession(key, session); err != nil {
|
||||
c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err))
|
||||
|
@ -176,6 +188,21 @@ func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) {
|
|||
c.accessCache.Delete(owner, key.String())
|
||||
}
|
||||
|
||||
func (c *Cache) PutListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey, session *data.ListMultipartSession) {
|
||||
if err := c.sessionMultipartCache.PutListMultipartSession(key, session); err != nil {
|
||||
c.logger.Warn(logs.CouldntCacheListMultipartSession, zap.Error(err))
|
||||
}
|
||||
|
||||
if err := c.accessCache.Put(owner, key.String()); err != nil {
|
||||
c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) DeleteListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) {
|
||||
c.sessionMultipartCache.DeleteListMultipartSession(key)
|
||||
c.accessCache.Delete(owner, key.String())
|
||||
}
|
||||
|
||||
func (c *Cache) GetTagging(owner user.ID, key string) map[string]string {
|
||||
if !c.accessCache.Get(owner, key) {
|
||||
return nil
|
||||
|
|
|
@ -79,6 +79,7 @@ type (
|
|||
Prefix string
|
||||
MaxKeys int
|
||||
Marker string
|
||||
// Bookmark contains Marker or ContinuationToken and is used for pagination and as part of a cache key for list session.
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Not only. It also contains Not only. It also contains `Marker` for listing v1 and `ContinuationToken` for listing v2
|
||||
Bookmark string
|
||||
}
|
||||
|
||||
|
@ -193,11 +194,10 @@ func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
|
|||
return nil, nil, nil
|
||||
}
|
||||
|
||||
session, err := n.getListLatestVersionsSession(ctx, p)
|
||||
session, err := n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
|
||||
objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator)
|
||||
if err != nil {
|
||||
|
@ -230,7 +230,7 @@ func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListi
|
|||
return nil, false, nil
|
||||
}
|
||||
|
||||
session, err := n.getListAllVersionsSession(ctx, p)
|
||||
session, err := n.getListVersionsSession(ctx, p, false)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
@ -301,48 +301,31 @@ func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int,
|
|||
}
|
||||
}
|
||||
|
||||
func (n *Layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) {
|
||||
return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
|
||||
}
|
||||
|
||||
func (n *Layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
|
||||
return n.getListVersionsSession(ctx, p, false)
|
||||
}
|
||||
|
||||
func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
|
||||
func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
|
||||
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
|
||||
session := n.cache.GetListSession(owner, cacheKey)
|
||||
if session == nil {
|
||||
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
|
||||
session = n.cache.GetListSession(owner, cacheKey)
|
||||
if session == nil || session.Acquired.Swap(true) {
|
||||
session = n.newSession(ctx)
|
||||
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
|
||||
return session, err
|
||||
}
|
||||
|
||||
if session.Acquired.Swap(true) {
|
||||
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
|
||||
}
|
||||
|
||||
// after reading next object from stream in session
|
||||
// the current cache value already doesn't match with next token in cache key
|
||||
n.cache.DeleteListSession(owner, cacheKey)
|
||||
|
||||
return session, nil
|
||||
}
|
||||
|
||||
func (n *Layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
|
||||
session = &data.ListSession{NamesMap: make(map[string]struct{})}
|
||||
func (n *Layer) newSession(ctx context.Context) *data.ListSession {
|
||||
session := &data.ListSession{NamesMap: make(map[string]struct{})}
|
||||
session.Context, session.Cancel = context.WithCancel(context.Background())
|
||||
|
||||
// save access box data for next requests
|
||||
if bd, err := middleware.GetBoxData(ctx); err == nil {
|
||||
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
|
||||
}
|
||||
|
||||
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return session, nil
|
||||
return session
|
||||
}
|
||||
|
||||
func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
|
||||
|
|
|
@ -2,6 +2,7 @@ package layer
|
|||||
|
||||||
import (
|
||||||
"bytes"
|
||||||
"cmp"
|
||||||
"context"
|
||||||
"crypto/md5"
|
||||||
"encoding/base64"
|
||||||
|
@ -10,17 +11,20 @@ import (
|
|||||
"errors"
|
||||||
"fmt"
|
||||||
"io"
|
||||||
"slices"
|
||||||
"sort"
|
||||||
"strconv"
|
||||||
"strings"
|
||||||
"time"
|
||||||
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
@ -499,47 +503,65 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
|
|||||
return &result, nil
|
||||||
}
|
||||||
|
||||||
multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix)
|
||||||
session, err := n.getListMultipartUploadsSession(ctx, p)
|
||||||
if err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
|
||||||
uploads := make([]*UploadInfo, 0, len(multipartInfos))
|
||||||
uploads := make([]*UploadInfo, 0, p.MaxUploads)
|
||||||
uniqDirs := make(map[string]struct{})
|
||||||
uploadsCount := 0
|
||||||
if session.Next != nil {
|
||||||
upload := uploadInfoFromMultipartInfo(session.Next, p.Prefix, p.Delimiter)
|
||||||
switch {
|
||||||
case upload.IsDir && isUniqDir(upload.Key, uniqDirs):
|
||||||
uniqDirs[upload.Key] = struct{}{}
|
||||||
fallthrough
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why don't just Why don't just `for uploadsCount < p.MaxUploads {` ?
nzinkevich
commented
I need to check next element to determine Truncated state. Extracted this check outside the loop I need to check next element to determine Truncated state. Extracted this check outside the loop
|
||||||
case !upload.IsDir:
|
||||||
uploads = append(uploads, upload)
|
||||||
uploadsCount++
|
||||||
}
|
||||||
dkirillov
commented
I would use I would use `errors.Is(err, io.EOF)`. In `service/tree` also
|
||||||
}
|
||||||
dkirillov
commented
Probably we should return error rather than continue. And consider cancel context (also for regular listing) @alexvanin Probably we should return error rather than continue. And consider cancel context (also for regular listing) @alexvanin
|
||||||
|
||||||
for _, multipartInfo := range multipartInfos {
|
||||||
info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter)
|
||||||
if info != nil {
|
||||||
if info.IsDir {
|
||||||
if _, ok := uniqDirs[info.Key]; ok {
|
||||||
info := session.Next
|
||||||
for uploadsCount < p.MaxUploads {
|
||||||
info, err = session.Stream.Next(ctx)
|
||||||
if err != nil {
|
||||||
if errors.Is(err, io.EOF) {
|
||||||
break
|
||||||
}
|
||||||
n.reqLogger(ctx).Warn(logs.CouldNotGetMultipartUploadInfo, zap.Error(err))
|
||||||
continue
|
||||||
dkirillov
commented
It seems now we can skip checking prefix inside It seems now we can skip checking prefix inside `uploadInfoFromMultipart` and result always be non nil
dkirillov
commented
This isn't changed This isn't changed
nzinkevich
commented
Without checking there is a fail in a test - a call with prefix "/my" returns also item with "/zzz" prefix. Because GetSubTreeStream trims prefix to "/" Without checking there is a fail in a [test](https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/api/handler/multipart_upload_test.go#L262-L267) - a call with prefix "/my" returns also item with "/zzz" prefix. Because GetSubTreeStream trims prefix to "/"
dkirillov
commented
Then we should fix streaming in tree, because we it must return only nodes that have provided prefix. ( Then we should fix streaming in tree, because we it must return only nodes that have provided prefix. (`/my` in this case, and object `/zzz/object/name3` doesn't have such prefix )
|
||||||
}
|
||||||
uniqDirs[info.Key] = struct{}{}
|
||||||
upload := uploadInfoFromMultipartInfo(info, p.Prefix, p.Delimiter)
|
||||||
if upload.IsDir {
|
||||||
if !isUniqDir(upload.Key, uniqDirs) {
|
||||||
continue
|
||||||
}
|
||||||
uploads = append(uploads, info)
|
||||||
uniqDirs[upload.Key] = struct{}{}
|
||||||
}
|
||||||
uploads = append(uploads, upload)
|
||||||
dkirillov
commented
I'm not sure if it's ok to separate this invocation. At least as it's done now.
I'm not sure if it's ok to separate this invocation. At least as it's done now.
See test:
```diff
diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go
index 6836fc5d..dcff5e5b 100644
--- a/api/handler/multipart_upload_test.go
+++ b/api/handler/multipart_upload_test.go
@@ -266,6 +266,30 @@ func TestListMultipartUploads(t *testing.T) {
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID)
})
+ t.Run("check delimiter", func(t *testing.T) {
+ t.Run("not truncated", func(t *testing.T) {
+ listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", -1)
+ require.Len(t, listUploads.Uploads, 0)
+ require.Len(t, listUploads.CommonPrefixes, 2)
+ require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
+ require.Equal(t, "/zzz/", listUploads.CommonPrefixes[1].Prefix)
+ })
+
+ t.Run("truncated", func(t *testing.T) {
+ listUploads := listMultipartUploads(hc, bktName, "/", "/", "", "", 1)
+ require.Len(t, listUploads.Uploads, 0)
+ require.Len(t, listUploads.CommonPrefixes, 1)
+ require.Equal(t, "/my/", listUploads.CommonPrefixes[0].Prefix)
+ require.True(t, listUploads.IsTruncated)
+
+ listUploads = listMultipartUploads(hc, bktName, "/", "/", listUploads.NextUploadIDMarker, listUploads.NextKeyMarker, 1)
+ require.Len(t, listUploads.Uploads, 0)
+ require.Len(t, listUploads.CommonPrefixes, 1)
+ require.Equal(t, "/zzz/", listUploads.CommonPrefixes[0].Prefix)
+ require.False(t, listUploads.IsTruncated)
+ })
+ })
+
```
nzinkevich
commented
Modified a bit this test (because there are two objects in Modified a bit this test (because there are two objects in `/my/` folder so this common prefix will appear twice and only on the third call it will be `/zzz/`. And fixed this scenario. But I'm not quite sure whether I understand the problem with `separate invocations` you proposed, could you clarify a bit?
|
||||||
uploadsCount++
|
||||||
}
|
||||||
|
||||||
sort.Slice(uploads, func(i, j int) bool {
|
||||||
if uploads[i].Key == uploads[j].Key {
|
||||||
return uploads[i].UploadID < uploads[j].UploadID
|
||||||
}
|
||||||
return uploads[i].Key < uploads[j].Key
|
||||||
})
|
||||||
|
||||||
if p.KeyMarker != "" {
|
||||||
if p.UploadIDMarker != "" {
|
||||||
uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads)
|
||||||
isTruncated := true
|
||||||
next, err := session.Stream.Next(ctx)
|
||||||
if err != nil {
|
||||||
if errors.Is(err, io.EOF) {
|
||||||
isTruncated = false
|
||||||
} else {
|
||||||
uploads = trimAfterUploadKey(p.KeyMarker, uploads)
|
||||||
return nil, err
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
It seems here we must use It seems here we must use `next` rather than `info`.
Please add tests for that case and others
nzinkevich
commented
In previous implementation In previous implementation `NextKeyMarker` and `NextUploadIDMarker` was brought from the last element of current list. And the new one acts the same. So using `info` in this case is correct. Anyway, I'm going to write tests for this
dkirillov
commented
Well, maybe I point to different error. Output sometime doesn't contain NextUploadIDMarker
Well, maybe I point to different error. Output sometime doesn't contain NextUploadIDMarker
```
$ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084 list-multipart-uploads --bucket test | grep UploadId
"UploadId": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
"UploadId": "8fd7e720-a5c4-4e56-86c6-dc99146199c7",
"UploadId": "8f60cdb1-7e62-41bb-98c1-567498be6dc2",
"UploadId": "75988c0a-d94f-4667-9087-056af63acefc",
"UploadId": "dda0fa13-cc27-4c52-aba6-2aa561d39b19",
$ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084 list-multipart-uploads --bucket test --max-uploads 1
{
"Bucket": "test",
"KeyMarker": "",
"NextKeyMarker": "dir/dir/obj",
"Prefix": "",
"NextUploadIdMarker": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
"MaxUploads": 1,
"IsTruncated": true,
"Uploads": [
{
"UploadId": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
"Key": "dir/dir/obj",
"Initiated": "2024-11-12T09:34:43+00:00",
"StorageClass": "STANDARD",
"Owner": {
"DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
"ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
},
"Initiator": {
"ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
"DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
}
}
]
}
$ AWS_MAX_ATTEMPTS=1 aws s3api --no-verify-ssl --endpoint http://localhost:8084 list-multipart-uploads --bucket test --max-uploads 1 --key-marker dir/dir/obj --upload-id-marker bd50e12d-fc4b-450b-afb1-ed082f5e2ef9
{
"Bucket": "test",
"KeyMarker": "dir/dir/obj",
"UploadIdMarker": "bd50e12d-fc4b-450b-afb1-ed082f5e2ef9",
"Prefix": "",
"MaxUploads": 1,
"IsTruncated": false,
"Uploads": [
{
"UploadId": "8fd7e720-a5c4-4e56-86c6-dc99146199c7",
"Key": "dir/dir/obj",
"Initiated": "2024-11-08T14:21:51+00:00",
"StorageClass": "STANDARD",
"Owner": {
"DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
"ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
},
"Initiator": {
"ID": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt",
"DisplayName": "NUUb82KR2JrVByHs2YSKgtK29gKnF5q6Vt"
}
}
]
}
```
nzinkevich
commented
Fixed and added test for this scenario Fixed and added test for this scenario
|
||||||
}
|
||||||
}
|
||||||
|
||||||
if len(uploads) > p.MaxUploads {
|
||||||
if isTruncated && info != nil {
|
||||||
// put to session redundant multipart upload which we read to check for EOF
|
||||||
session.Next = next
|
||||||
result.IsTruncated = true
|
||||||
uploads = uploads[:p.MaxUploads]
|
||||||
result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID
|
||||||
result.NextKeyMarker = uploads[len(uploads)-1].Key
|
||||||
result.NextUploadIDMarker = info.UploadID
|
||||||
result.NextKeyMarker = info.Key
|
||||||
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, info.Key, info.UploadID)
|
||||||
n.putListMultipartUploadsSession(ctx, session, cacheKey)
|
||||||
}
|
||||||
|
||||||
for _, ov := range uploads {
|
||||||
dkirillov marked this conversation as resolved
dkirillov
commented
Result must be sorted by upload-id if some uploads have the same key Result must be sorted by upload-id if some uploads have the same key
|
||||||
|
@ -550,9 +572,62 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
|
|||||
}
|
||||||
}
|
||||||
|
||||||
slices.SortFunc(result.Uploads, func(a, b *UploadInfo) int {
|
||||||
keyCmp := cmp.Compare(a.Key, b.Key)
|
||||||
if keyCmp == 0 {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do this if differ from the similar from object listing?
Why do this if differ from the similar from object listing?
https://git.frostfs.info/nzinkevich/frostfs-s3-gw/src/commit/e673d727e9c9486e0b70127fb224ed58b6842fb3/api/layer/listing.go#L309
nzinkevich
commented
Changed method to be more similar Changed method to be more similar
|
||||||
return cmp.Compare(a.UploadID, b.UploadID)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We cannot use We cannot use `ctx` from current request. It will be canceled after first request be finished
|
||||||
}
|
||||||
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We also have to add AccessBox to this context to be able to get access to tree service. Otherwise currently we get
See how this done for regular listing
We also have to add AccessBox to this context to be able to get access to tree service. Otherwise currently we get
```
2024-11-12T11:52:19.895+0300 error request failed {"request_id": "ef743d2a-0f81-4f02-ae61-82afac3db746", "method": "ListMultipartUploads", "bucket": "test", "object": "", "description": "could not list multipart uploads", "user": "NbUgTSFvPmsRxmGeWpuuGeJUoRoi6PErcM", "error": "access denied: address s01.frostfs.devenv:8080: access denied: rpc error: code = Unknown desc = status: code = 2048 message = access to object operation denied", "status": 403}
```
See how this done for regular listing https://git.frostfs.info/nzinkevich/frostfs-s3-gw/src/commit/aaca4f84b8ba29031481c54f6bdcae90909287a0/api/layer/listing.go#L324
nzinkevich
commented
Fixed Fixed
|
||||||
return keyCmp
|
||||||
})
|
||||||
|
||||||
return &result, nil
|
||||||
}
|
||||||
|
||||||
func (n *Layer) putListMultipartUploadsSession(ctx context.Context, session *data.ListMultipartSession, cacheKey cache.ListMultipartSessionKey) {
|
||||||
session.Acquired.Store(false)
|
||||||
n.cache.PutListMultipartSession(n.BearerOwner(ctx), cacheKey, session)
|
||||||
}
|
||||||
|
||||||
func (n *Layer) getListMultipartUploadsSession(ctx context.Context, p *ListMultipartUploadsParams) (session *data.ListMultipartSession, err error) {
|
||||||
owner := n.BearerOwner(ctx)
|
||||||
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, p.KeyMarker, p.UploadIDMarker)
|
||||||
session = n.cache.GetListMultipartSession(owner, cacheKey)
|
||||||
if session == nil || session.Acquired.Swap(true) {
|
||||||
session = newListMultipartSession(ctx)
|
||||||
params := data.MultipartStreamParams{
|
||||||
Prefix: p.Prefix,
|
||||||
KeyMarker: p.KeyMarker,
|
||||||
UploadIDMarker: p.UploadIDMarker,
|
||||||
}
|
||||||
session.Stream, err = n.treeService.GetMultipartUploadsByPrefix(session.Context, p.Bkt, params)
|
||||||
if err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
}
|
||||||
// if after reading next object from stream in session the current cache value already
|
||||||
// doesn't match with next token in cache key
|
||||||
n.cache.DeleteListMultipartSession(owner, cacheKey)
|
||||||
|
||||||
return session, nil
|
||||||
}
|
||||||
|
||||||
func newListMultipartSession(ctx context.Context) *data.ListMultipartSession {
|
||||||
reqCtx, cancel := context.WithCancel(context.Background())
|
||||||
session := &data.ListMultipartSession{
|
||||||
CommonSession: data.CommonSession{
|
||||||
Context: reqCtx,
|
||||||
Cancel: cancel,
|
||||||
},
|
||||||
}
|
||||||
|
||||||
// save access box data for next requests
|
||||||
if bd, err := middleware.GetBoxData(ctx); err == nil {
|
||||||
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
|
||||||
}
|
||||||
return session
|
||||||
}
|
||||||
|
||||||
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
|
||||||
multipartInfo, parts, err := n.getUploadParts(ctx, p)
|
||||||
if err != nil {
|
||||||
|
@ -677,44 +752,10 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
|
|||||
return multipartInfo, res, nil
|
||||||
}
|
||||||
|
||||||
func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo {
|
||||||
var res []*UploadInfo
|
||||||
if len(uploads) != 0 && uploads[len(uploads)-1].Key < key {
|
||||||
return res
|
||||||
}
|
||||||
|
||||||
for _, obj := range uploads {
|
||||||
if obj.Key >= key && obj.UploadID > id {
|
||||||
res = append(res, obj)
|
||||||
}
|
||||||
}
|
||||||
|
||||||
return res
|
||||||
}
|
||||||
|
||||||
func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
|
||||||
var result []*UploadInfo
|
||||||
if len(objects) != 0 && objects[len(objects)-1].Key <= key {
|
||||||
return result
|
||||||
}
|
||||||
for i, obj := range objects {
|
||||||
if obj.Key > key {
|
||||||
result = objects[i:]
|
||||||
break
|
||||||
}
|
||||||
}
|
||||||
|
||||||
return result
|
||||||
}
|
||||||
|
||||||
func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo {
|
||||||
var isDir bool
|
||||||
key := uploadInfo.Key
|
||||||
|
||||||
if !strings.HasPrefix(key, prefix) {
|
||||||
return nil
|
||||||
}
|
||||||
|
||||||
if len(delimiter) > 0 {
|
||||||
tail := strings.TrimPrefix(key, prefix)
|
||||||
index := strings.Index(tail, delimiter)
|
||||||
|
@ -732,3 +773,10 @@ func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimit
|
|||||
Created: uploadInfo.Created,
|
||||||
}
|
||||||
}
|
||||||
|
||||||
func isUniqDir(key string, uniqDirs map[string]struct{}) bool {
|
||||||
if _, ok := uniqDirs[key]; ok {
|
||||||
return false
|
||||||
}
|
||||||
return true
|
||||||
}
|
||||||
|
|
|
@ -1,108 +0,0 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTrimAfterUploadIDAndKey(t *testing.T) {
|
||||
uploads := []*UploadInfo{
|
||||
{Key: "j", UploadID: "k"}, // key < id <
|
||||
{Key: "l", UploadID: "p"}, // key < id >
|
||||
{Key: "n", UploadID: "m"}, // key = id <
|
||||
{Key: "n", UploadID: "o"}, // pivot
|
||||
{Key: "n", UploadID: "q"}, // key = id >
|
||||
{Key: "p", UploadID: "h"}, // key > id <
|
||||
{Key: "q", UploadID: "r"}, // key > id >
|
||||
}
|
||||
expectedUploadsListsIndexes := [][]int{
|
||||
{1, 2, 3, 4, 6},
|
||||
{4, 6},
|
||||
{3, 4, 6},
|
||||
{4, 6},
|
||||
{6},
|
||||
{6},
|
||||
{},
|
||||
}
|
||||
|
||||
sort.Slice(uploads, func(i, j int) bool {
|
||||
if uploads[i].Key == uploads[j].Key {
|
||||
return uploads[i].UploadID < uploads[j].UploadID
|
||||
}
|
||||
return uploads[i].Key < uploads[j].Key
|
||||
})
|
||||
|
||||
length := len(uploads)
|
||||
|
||||
t.Run("the last element's key is less, upload id is less", func(t *testing.T) {
|
||||
keys := trimAfterUploadIDAndKey("z", "a", uploads)
|
||||
require.Empty(t, keys)
|
||||
require.Len(t, uploads, length)
|
||||
})
|
||||
|
||||
t.Run("the last element's key is less, upload id is greater", func(t *testing.T) {
|
||||
keys := trimAfterUploadIDAndKey("z", "a", uploads)
|
||||
require.Empty(t, keys)
|
||||
require.Len(t, uploads, length)
|
||||
})
|
||||
|
||||
t.Run("check for uploads", func(t *testing.T) {
|
||||
for i, u := range uploads {
|
||||
list := trimAfterUploadIDAndKey(u.Key, u.UploadID, uploads)
|
||||
require.Equal(t, len(list), len(expectedUploadsListsIndexes[i]))
|
||||
for j, idx := range expectedUploadsListsIndexes[i] {
|
||||
require.Equal(t, list[j], uploads[idx])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestTrimAfterUploadKey(t *testing.T) {
|
||||
var (
|
||||
uploadKeys = []string{"e", "f", "f", "g", "h", "i"}
|
||||
theSameKeyIdx = []int{1, 2}
|
||||
diffKeyIdx = []int{0, 3}
|
||||
lastIdx = len(uploadKeys) - 1
|
||||
)
|
||||
|
||||
uploadsInfos := make([]*UploadInfo, 0, len(uploadKeys))
|
||||
for _, k := range uploadKeys {
|
||||
uploadsInfos = append(uploadsInfos, &UploadInfo{Key: k})
|
||||
}
|
||||
|
||||
t.Run("empty list", func(t *testing.T) {
|
||||
keys := trimAfterUploadKey("f", []*UploadInfo{})
|
||||
require.Len(t, keys, 0)
|
||||
})
|
||||
|
||||
t.Run("the last element is less than a key", func(t *testing.T) {
|
||||
keys := trimAfterUploadKey("j", uploadsInfos)
|
||||
require.Empty(t, keys)
|
||||
require.Len(t, uploadsInfos, len(uploadKeys))
|
||||
})
|
||||
|
||||
t.Run("different keys in sequence", func(t *testing.T) {
|
||||
for _, i := range diffKeyIdx {
|
||||
keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos)
|
||||
require.Len(t, keys, len(uploadKeys)-i-1)
|
||||
require.Equal(t, keys, uploadsInfos[i+1:])
|
||||
require.Len(t, uploadsInfos, len(uploadKeys))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("the same keys in the sequence first element", func(t *testing.T) {
|
||||
for _, i := range theSameKeyIdx {
|
||||
keys := trimAfterUploadKey(uploadKeys[i], uploadsInfos)
|
||||
require.Len(t, keys, 3)
|
||||
require.Equal(t, keys, uploadsInfos[3:])
|
||||
require.Len(t, uploadsInfos, len(uploadKeys))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("last element", func(t *testing.T) {
|
||||
keys := trimAfterUploadKey(uploadKeys[lastIdx], uploadsInfos)
|
||||
require.Empty(t, keys)
|
||||
})
|
||||
}
|
|
@ -53,7 +53,7 @@ type Service interface {
|
|||
|
||||
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
||||
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
||||
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
|
||||
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error)
|
||||
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
||||
|
||||
// AddPart puts a node to a system tree as a child of appropriate multipart upload
|
||||
|
|
|
@ -328,7 +328,7 @@ func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, string) ([]*data.MultipartInfo, error) {
|
||||
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, data.MultipartStreamParams) (data.MultipartInfoStream, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
|
|
@ -1007,6 +1007,8 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
|
|||
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
|
||||
|
||||
cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime)
|
||||
cacheCfg.MultipartList.Lifetime = fetchCacheLifetime(v, l, cfgMultipartListCacheLifetime, cacheCfg.MultipartList.Lifetime)
|
||||
cacheCfg.MultipartList.Size = fetchCacheSize(v, l, cfgMultipartListCacheSize, cacheCfg.MultipartList.Size)
|
||||
|
||||
return cacheCfg
|
||||
}
|
||||
|
|
|
@ -141,6 +141,8 @@ const ( // Settings.
|
|||
cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime"
|
||||
cfgFrostfsIDCacheSize = "cache.frostfsid.size"
|
||||
cfgNetworkInfoCacheLifetime = "cache.network_info.lifetime"
|
||||
cfgMultipartListCacheLifetime = "cache.multipart_list_session.lifetime"
|
||||
cfgMultipartListCacheSize = "cache.multipart_list_session.size"
|
||||
|
||||
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"
|
||||
|
||||
|
|
|
@ -101,9 +101,12 @@ S3_GW_CACHE_OBJECTS_SIZE=1000000
|
|||
# Cache which keeps lists of objects in buckets
|
||||
S3_GW_CACHE_LIST_LIFETIME=1m
|
||||
S3_GW_CACHE_LIST_SIZE=100000
|
||||
# Cache which keeps listing session
|
||||
# Cache which keeps listing objects session
|
||||
S3_GW_CACHE_LIST_SESSION_LIFETIME=1m
|
||||
S3_GW_CACHE_LIST_SESSION_SIZE=100
|
||||
# Cache which keeps listing multipart uploads session
|
||||
S3_GW_CACHE_MULTIPART_LIST_SESSION_LIFETIME=1m
|
||||
S3_GW_CACHE_MULTIPART_LIST_SESSION_SIZE=100
|
||||
# Cache which contains mapping of bucket name to bucket info
|
||||
S3_GW_CACHE_BUCKETS_LIFETIME=1m
|
||||
S3_GW_CACHE_BUCKETS_SIZE=1000
|
||||
|
|
|
@ -129,6 +129,9 @@ cache:
|
|||
list_session:
|
||||
lifetime: 1m
|
||||
size: 100
|
||||
multipart_list_session:
|
||||
lifetime: 1m
|
||||
size: 100
|
||||
# Cache which contains mapping of nice name to object addresses
|
||||
names:
|
||||
lifetime: 1m
|
||||
|
|
|
@ -426,6 +426,9 @@ cache:
|
|||
list_session:
|
||||
lifetime: 1m
|
||||
size: 100
|
||||
multipart_list_session:
|
||||
lifetime: 1m
|
||||
size: 10000
|
||||
names:
|
||||
lifetime: 1m
|
||||
size: 1000
|
||||
|
@ -453,10 +456,11 @@ cache:
|
|||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
|
||||
|--------------------------|-------------------------------------------------|-----------------------------------|----------------------------------------------------------------|
|
||||
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
|
||||
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
|
||||
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
|
||||
| `multipart_list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing of multipart uploads. |
|
||||
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
|
||||
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
|
||||
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings etc. |
|
||||
|
|
|
@ -8,6 +8,8 @@ const (
|
|||
UpdateAccessCredObjectIntoFrostFS = "update access cred object into FrostFS" // Info in ../../authmate/authmate.go
|
||||
MetricsAreDisabled = "metrics are disabled" // Warn in ../../metrics/app.go
|
||||
FoundMoreThanOneUnversionedNode = "found more than one unversioned node" // Debug in ../../pkg/service/tree/tree.go
|
||||
CouldNotParseTreeNode = "could not parse tree node" // Error in ../../pkg/service/tree/tree.go
|
||||
CouldNotFormFilePath = "could not form file path" // Error in ../../pkg/service/tree/tree.go
|
||||
ServiceIsRunning = "service is running" // Info in ../../cmd/s3-gw/service.go
|
||||
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../cmd/s3-gw/service.go
|
||||
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../cmd/s3-gw/service.go
|
||||
|
@ -67,6 +69,7 @@ const (
|
|||
CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go
|
||||
CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go
|
||||
MismatchedObjEncryptionInfo = "mismatched obj encryptionInfo" // Warn in ../../api/layer/multipart_upload.go
|
||||
CouldNotGetMultipartUploadInfo = "could not get multipart upload info" // Warn in ../../api/layer/multipart_upload.go
|
||||
UploadPart = "upload part" // Debug in ../../api/layer/multipart_upload.go
|
||||
CouldntDeleteOldPartObject = "couldn't delete old part object" // Error in ../../api/layer/multipart_upload.go
|
||||
CouldNotPutCompletedObject = "could not put a completed object (multipart upload)" // Error in ../../api/layer/multipart_upload.go
|
||||
|
@ -89,6 +92,7 @@ const (
|
|||
CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go
|
||||
CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go
|
||||
CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go
|
||||
CouldntCacheListMultipartSession = "couldn't cache list multipart session" // Warn in ../../api/layer/cache.go
|
||||
CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go
|
||||
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
|
||||
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go
|
||||
|
|
|
@ -84,7 +84,9 @@ var (
|
|||||
// ErrGatewayTimeout is returned from ServiceClient service in case of timeout error.
|
||||||
ErrGatewayTimeout = frostfs.ErrGatewayTimeout
|
||||||
|
||||||
errNodeDoesntContainFileName = fmt.Errorf("node doesn't contain FileName")
|
||||||
errNodeDoesntContainFileName = errors.New("node doesn't contain FileName")
|
||||||
|
||||||
errParentPathNotFound = errors.New("couldn't get parent path")
|
||||||
)
|
||||||
|
||||||
const (
|
||||||
|
@ -1061,7 +1063,7 @@ func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *
|
|||||
|
||||||
var filepath string
|
||||||
if !s.intermediateRootID.Equal(trNode.ID) {
|
||||||
if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil {
|
||||||
if filepath, err = formFilePath(trNode, fileName, s.namesMap); err != nil {
|
||||||
return nil, false, fmt.Errorf("invalid node order: %w", err)
|
||||||
}
|
||||||
} else {
|
||||||
|
@ -1165,58 +1167,18 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr
|
|||||
return intermediateNodes, nil
|
||||||
}
|
||||||
|
||||||
func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
|
||||||
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
|
||||||
func (c *Tree) getSubTreeByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, []uint64, error) {
|
||||||
rootID, _, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
|
||||||
if err != nil {
|
||||||
if errors.Is(err, tree.ErrNodeNotFound) {
|
||||||
return nil, "", nil
|
||||||
return nil, nil, nil
|
||||||
}
|
||||||
return nil, "", err
|
||||||
return nil, nil, err
|
||||||
}
|
||||||
|
||||||
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2, false)
|
||||||
if err != nil {
|
||||||
if errors.Is(err, tree.ErrNodeNotFound) {
|
||||||
return nil, "", nil
|
||||||
}
|
||||||
return nil, "", err
|
||||||
}
|
||||||
stream, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
|
||||||
|
||||||
nodesMap := make(map[string][]NodeResponse, len(subTree))
|
||||||
for _, node := range subTree {
|
||||||
if MultiID(rootID).Equal(node.GetNodeID()) {
|
||||||
continue
|
||||||
}
|
||||||
|
||||||
fileName := getFilename(node)
|
||||||
if !strings.HasPrefix(fileName, tailPrefix) {
|
||||||
continue
|
||||||
}
|
||||||
|
||||||
nodes := nodesMap[fileName]
|
||||||
|
||||||
// Add all nodes if flag latestOnly is false.
|
||||||
// Add all intermediate nodes
|
||||||
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
|
||||||
if len(nodes) == 0 {
|
||||||
nodes = []NodeResponse{node}
|
||||||
} else if !latestOnly || isIntermediate(node) {
|
||||||
nodes = append(nodes, node)
|
||||||
} else if isIntermediate(nodes[0]) {
|
||||||
nodes = append([]NodeResponse{node}, nodes...)
|
||||||
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
|
||||||
nodes[0] = node
|
||||||
}
|
||||||
|
||||||
nodesMap[fileName] = nodes
|
||||||
}
|
||||||
|
||||||
result := make([]NodeResponse, 0, len(subTree))
|
||||||
for _, nodes := range nodesMap {
|
||||||
result = append(result, nodes...)
|
||||||
}
|
||||||
|
||||||
return result, strings.TrimSuffix(prefix, tailPrefix), nil
|
||||||
return stream, rootID, err
|
||||||
}
|
||||||
|
||||||
func getFilename(node NodeResponse) string {
|
||||||
|
@ -1237,20 +1199,19 @@ func isIntermediate(node NodeResponse) bool {
|
|||||
return node.GetMeta()[0].GetKey() == FileNameKey
|
||||||
}
|
||||||
|
||||||
func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) {
|
||||||
var filepath string
|
||||||
func formFilePath(node *treeNode, fileName string, namesMap map[uint64]string) (string, error) {
|
||||||
var filePath string
|
||||||
|
||||||
for i, id := range node.GetParentID() {
|
||||||
for i, id := range node.ParentID {
|
||||||
parentPath, ok := namesMap[id]
|
||||||
if !ok {
|
||||||
return "", fmt.Errorf("couldn't get parent path")
|
||||||
return "", errParentPathNotFound
|
||||||
}
|
||||||
filePath = parentPath + separator + fileName
|
||||||
namesMap[node.ID[i]] = filePath
|
||||||
}
|
||||||
|
||||||
filepath = parentPath + separator + fileName
|
||||||
namesMap[node.GetNodeID()[i]] = filepath
|
||||||
}
|
||||||
|
||||||
return filepath, nil
|
||||||
return filePath, nil
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
It's better to use more meaningful name. It's better to use more meaningful name.
By the way why do we need separate function? And why do we use only 0th parentID below (`namesMap[curNode.ParentID[0]]`)?
nzinkevich
commented
It seems to me that multipart info can't be split so I use only first element It seems to me that multipart info can't be split so I use only first element
https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/pkg/service/tree/tree.go#L332-L334
|
||||||
}
|
||||||
|
||||||
func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
|
||||||
|
@ -1267,10 +1228,6 @@ func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
|
|||||
return tNode, fileName, nil
|
||||||
}
|
||||||
|
||||||
func formLatestNodeKey(parentID uint64, fileName string) string {
|
||||||
return strconv.FormatUint(parentID, 10) + "." + fileName
|
||||||
}
|
||||||
|
||||||
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
|
||||||
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
|
||||||
}
|
||||||
|
@ -1313,84 +1270,137 @@ func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketIn
|
|||||
return err
|
||||||
}
|
||||||
|
||||||
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
|
||||||
subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
|
||||||
if err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
|
||||||
var result []*data.MultipartInfo
|
||||||
for _, node := range subTreeNodes {
|
||||||
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix)
|
||||||
if err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
result = append(result, multipartUploads...)
|
||||||
}
|
||||||
|
||||||
return result, nil
|
||||||
type multipartInfoStream struct {
|
||||||
log *zap.Logger
|
||||||
nodePaths map[uint64]string
|
||||||
rootID MultiID
|
||||||
// childStream stream of children nodes of prefix node.
|
||||||
childStream SubTreeStream
|
||||||
// currentStream stream of children's nodes with max depth.
|
||||||
currentStream SubTreeStream
|
||||||
treeService ServiceClient
|
||||||
bktInfo *data.BucketInfo
|
||||||
uploadID string
|
||||||
keyMarker string
|
||||||
headPrefix string
|
||||||
prefix string
|
||||||
}
|
||||||
|
||||||
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
|
||||||
// sorting in getSubTree leads to skipping nodes that doesn't have FileName attribute
|
||||||
// so when we are only interested in multipart nodes, we can set this flag
|
||||||
// (despite we sort multiparts in above layer anyway)
|
||||||
// to skip its children (parts) that don't have FileName
|
||||||
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth, true)
|
||||||
func (m *multipartInfoStream) Next(ctx context.Context) (*data.MultipartInfo, error) {
|
||||||
var tNode *treeNode
|
||||||
var filePath string
|
||||||
|
||||||
if m.currentStream == nil {
|
||||||
var err error
|
||||||
if m.currentStream, err = m.openNewStream(ctx); err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
}
|
||||||
for {
|
||||||
var err error
|
||||||
tNode, err = getTreeNodeFromStream(m.currentStream)
|
||||||
if err != nil {
|
||||||
if errors.Is(err, io.EOF) {
|
||||||
dkirillov
commented
Do we really need new variable
because of Do we really need new variable `err2`?
It seems we can write
```golang
if m.currentStream, err = m.openNewStream(ctx); err != nil {
return nil, err
}
```
because of `continue` below
|
||||||
if m.currentStream, err = m.openNewStream(ctx); err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
continue
|
||||||
}
|
||||||
return nil, err
|
||||||
}
|
||||||
var ok bool
|
||||||
if filePath, ok = m.checkTreeNode(tNode); ok {
|
||||||
dkirillov
commented
Why not just Why not just `fileName, ok := tNode.FileName()` ?
|
||||||
break
|
||||||
}
|
||||||
}
|
||||||
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we use only Why do we use only `tNode.ID[0]`?
|
||||||
return newMultipartInfoFromTreeNode(m.log, filePath, tNode)
|
||||||
dkirillov
commented
Why do we treat this error special? It seems normally we traverse node in right order and if we don't see parent it's a bug in storage node. Root node filepath we can set initially to this map Why do we treat this error special? It seems normally we traverse node in right order and if we don't see parent it's a bug in storage node. Root node filepath we can set initially to this map
|
||||||
}
|
||||||
|
||||||
dkirillov
commented
I suppose we should check if I suppose we should check if `tNode.IsSplit()` and skip if necessary. And only after that use `tNode.ID[0]`
|
||||||
// openNewStream creates subtree stream from childStream`s node.
|
||||||
func (m *multipartInfoStream) openNewStream(ctx context.Context) (SubTreeStream, error) {
|
||||||
node, err := getTreeNodeFromStream(m.childStream)
|
||||||
if err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
|
||||||
var parentPrefix string
|
||||||
if parentFilePath != "" { // The root of subTree can also have a parent
|
||||||
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
|
||||||
if m.rootID.Equal(node.ID) {
|
||||||
// skip root node
|
||||||
return m.openNewStream(ctx)
|
||||||
}
|
||||||
dkirillov
commented
It seems we can simplify this to
It seems we can simplify this to
```golang
if m.keyMarker < filePath || (m.keyMarker == filePath && m.uploadID < id) {
```
|
||||||
|
||||||
var filepath string
|
||||||
namesMap := make(map[uint64]string, len(subTree))
|
||||||
multiparts := make(map[string][]*data.MultipartInfo, len(subTree))
|
||||||
|
||||||
for i, node := range subTree {
|
||||||
tNode, fileName, err := parseTreeNode(node)
|
||||||
stream, err := m.treeService.GetSubTreeStream(ctx, m.bktInfo, systemTree, node.ID, maxGetSubTreeDepth)
|
||||||
if err != nil {
|
||||||
continue
|
||||||
return nil, err
|
||||||
}
|
||||||
return stream, nil
|
||||||
}
|
||||||
|
||||||
if i != 0 {
|
||||||
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
|
||||||
return nil, fmt.Errorf("invalid node order: %w", err)
|
||||||
}
|
||||||
} else {
|
||||||
filepath = parentPrefix + fileName
|
||||||
for _, id := range tNode.ID {
|
||||||
namesMap[id] = filepath
|
||||||
func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error) {
|
||||||
node, err := stream.Next()
|
||||||
if err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
tNode, err := newTreeNode(node)
|
||||||
if err != nil {
|
||||||
return nil, err
|
||||||
dkirillov
commented
Please, use the similar check Line 987 in a12fea8
Please, use the similar check https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/commit/a12fea8a5b1e72cdd8cc10eb7278033305a24a3b/pkg/service/tree/tree.go#L987
|
||||||
}
|
||||||
return tNode, nil
|
||||||
}
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Consider using approach like here
and request will contain Consider using approach like here https://git.frostfs.info/nzinkevich/frostfs-s3-gw/src/commit/aaca4f84b8ba29031481c54f6bdcae90909287a0/pkg/service/tree/tree.go#L1114
Otherwise if we will have objects:
* `dir/objdir1/obj`
* `dir/objdir1/obj2`
* `dir/obj1/obj1`
* ...
* `dir/obj1/obj1000000`
and request will contain `prefix: dir/objdir` we will list that greater than 1000000 objects and just filter them (but we will get them anyway from storage) despite we are interested in only 2 of them
nzinkevich
commented
Changed the implementation. Now, at first, Changed the implementation. Now, at first, `c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)` is called, and then the `GetSubTreeStream` with max depth is applied to the children as needed. Which should prevents from getting redundant nodes
|
||||||
|
||||||
multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode)
|
||||||
if err != nil || multipartInfo.Finished {
|
||||||
continue
|
||||||
}
|
||||||
func (m *multipartInfoStream) checkTreeNode(tNode *treeNode) (string, bool) {
|
||||||
var ok bool
|
||||||
var err error
|
||||||
|
||||||
for _, id := range node.GetParentID() {
|
||||||
key := formLatestNodeKey(id, fileName)
|
||||||
multipartInfos, ok := multiparts[key]
|
||||||
if tNode.IsSplit() {
|
||||||
return "", false
|
||||||
}
|
||||||
dkirillov
commented
This function should be This function should be
`func getTreeNodeFromStream(stream SubTreeStream) (*treeNode, error)`
or
`func (m *multipartInfoStream) getTreeNodeFromStream() (*treeNode, error)`
|
||||||
fileName, ok := tNode.FileName()
|
||||||
if !ok {
|
||||||
multipartInfos = []*data.MultipartInfo{multipartInfo}
|
||||||
} else {
|
||||||
multipartInfos = append(multipartInfos, multipartInfo)
|
||||||
return "", false
|
||||||
}
|
||||||
|
||||||
multiparts[key] = multipartInfos
|
||||||
filePath, err := formFilePath(tNode, fileName, m.nodePaths)
|
||||||
if err != nil {
|
||||||
filePath = fileName
|
||||||
m.nodePaths[tNode.ID[0]] = filePath
|
||||||
}
|
||||||
filePath = m.headPrefix + filePath
|
||||||
if !strings.HasPrefix(filePath, m.prefix) {
|
||||||
return "", false
|
||||||
}
|
||||||
if _, ok = tNode.Meta[finishedKV]; ok {
|
||||||
return "", false
|
||||||
}
|
||||||
if id, ok := tNode.Meta[uploadIDKV]; ok {
|
||||||
if filePath > m.keyMarker || (filePath == m.keyMarker && m.uploadID != "" && id > m.uploadID) {
|
||||||
return filePath, true
|
||||||
}
|
||||||
}
|
||||||
|
||||||
result := make([]*data.MultipartInfo, 0, len(multiparts))
|
||||||
for _, multipartInfo := range multiparts {
|
||||||
result = append(result, multipartInfo...)
|
||||||
return "", false
|
||||||
}
|
||||||
|
||||||
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error) {
|
||||||
stream, rootID, err := c.getSubTreeByPrefixStream(ctx, bktInfo, systemTree, params.Prefix)
|
||||||
if err != nil {
|
||||||
if errors.Is(err, tree.ErrNodeNotFound) {
|
||||||
return nil, nil
|
||||||
}
|
||||||
return nil, err
|
||||||
}
|
||||||
|
||||||
return result, nil
|
||||||
return &multipartInfoStream{
|
||||||
log: c.reqLogger(ctx),
|
||||||
rootID: rootID,
|
||||||
childStream: stream,
|
||||||
nodePaths: make(map[uint64]string),
|
||||||
treeService: c.service,
|
||||||
bktInfo: bktInfo,
|
||||||
uploadID: params.UploadIDMarker,
|
||||||
keyMarker: params.KeyMarker,
|
||||||
prefix: params.Prefix,
|
||||||
headPrefix: strings.TrimRightFunc(params.Prefix, func(r rune) bool {
|
||||||
return r != '/'
|
||||||
}),
|
||||||
}, nil
|
||||||
}
|
||||||
|
||||||
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
|
@ -359,3 +360,127 @@ func TestSplitTreeMultiparts(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Len(t, parts, 1)
|
||||
}
|
||||
|
||||
func TestCheckTreeNode(t *testing.T) {
|
||||
treeNodes := []*treeNode{
|
||||
// foo/
|
||||
{
|
||||
ID: []uint64{1},
|
||||
ParentID: []uint64{0},
|
||||
TimeStamp: []uint64{1},
|
||||
Meta: map[string]string{
|
||||
"FileName": "foo",
|
||||
},
|
||||
},
|
||||
// foo/ant
|
||||
{
|
||||
ID: []uint64{2},
|
||||
ParentID: []uint64{1},
|
||||
TimeStamp: []uint64{1},
|
||||
Meta: map[string]string{
|
||||
"FileName": "ant",
|
||||
"UploadId": "d",
|
||||
},
|
||||
},
|
||||
// foo/bar
|
||||
{
|
||||
ID: []uint64{3},
|
||||
ParentID: []uint64{1},
|
||||
TimeStamp: []uint64{1},
|
||||
Meta: map[string]string{
|
||||
"FileName": "bar",
|
||||
"UploadId": "c",
|
||||
},
|
||||
},
|
||||
// foo/finished
|
||||
{
|
||||
ID: []uint64{4},
|
||||
ParentID: []uint64{1},
|
||||
TimeStamp: []uint64{1},
|
||||
Meta: map[string]string{
|
||||
"FileName": "finished",
|
||||
"UploadId": "e",
|
||||
"Finished": "True",
|
||||
},
|
||||
},
|
||||
// hello/
|
||||
{
|
||||
ID: []uint64{5},
|
||||
ParentID: []uint64{0},
|
||||
TimeStamp: []uint64{1},
|
||||
Meta: map[string]string{
|
||||
"FileName": "hello",
|
||||
},
|
||||
},
|
||||
// hello/world
|
||||
{
|
||||
ID: []uint64{6},
|
||||
ParentID: []uint64{5},
|
||||
TimeStamp: []uint64{1},
|
||||
Meta: map[string]string{
|
||||
"FileName": "world",
|
||||
"UploadId": "a",
|
||||
},
|
||||
},
|
||||
// hello/world
|
||||
{
|
||||
ID: []uint64{7},
|
||||
ParentID: []uint64{5},
|
||||
TimeStamp: []uint64{1},
|
||||
Meta: map[string]string{
|
||||
"FileName": "world",
|
||||
"UploadId": "b",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
info := multipartInfoStream{
|
||||
log: zap.NewNop(),
|
||||
rootID: []uint64{0},
|
||||
}
|
||||
|
||||
t.Run("without markers", func(t *testing.T) {
|
||||
info.nodePaths = make(map[uint64]string)
|
||||
results := make([]bool, 0, len(treeNodes))
|
||||
for _, node := range treeNodes {
|
||||
_, valid := info.checkTreeNode(node)
|
||||
results = append(results, valid)
|
||||
}
|
||||
require.Equal(t, []bool{false, true, true, false, false, true, true}, results)
|
||||
})
|
||||
|
||||
t.Run("with prefix", func(t *testing.T) {
|
||||
info.nodePaths = make(map[uint64]string)
|
||||
info.prefix = "hello"
|
||||
info.headPrefix = ""
|
||||
results := make([]bool, 0, len(treeNodes))
|
||||
for _, node := range treeNodes {
|
||||
_, valid := info.checkTreeNode(node)
|
||||
results = append(results, valid)
|
||||
}
|
||||
require.Equal(t, []bool{false, false, false, false, false, true, true}, results)
|
||||
})
|
||||
|
||||
t.Run("with key marker", func(t *testing.T) {
|
||||
info.nodePaths = make(map[uint64]string)
|
||||
info.keyMarker = "foo/bar"
|
||||
results := make([]bool, 0, len(treeNodes))
|
||||
for _, node := range treeNodes {
|
||||
_, valid := info.checkTreeNode(node)
|
||||
results = append(results, valid)
|
||||
}
|
||||
require.Equal(t, []bool{false, false, false, false, false, true, true}, results)
|
||||
})
|
||||
|
||||
t.Run("with key and upload id markers", func(t *testing.T) {
|
||||
info.nodePaths = make(map[uint64]string)
|
||||
info.keyMarker = "hello/world"
|
||||
info.uploadID = "a"
|
||||
results := make([]bool, 0, len(treeNodes))
|
||||
for _, node := range treeNodes {
|
||||
_, valid := info.checkTreeNode(node)
|
||||
results = append(results, valid)
|
||||
}
|
||||
require.Equal(t, []bool{false, false, false, false, false, false, true}, results)
|
||||
})
|
||||
}
|
||||
|
|
If we add new cache we also should be able configure this
Added config params
Please mention these params in https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/config/config.yaml and https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/branch/master/config/config.env
Also we can write
time.Minute
instead oftime.Second * 60