[#469] List multipart uploads streaming
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
17d40245de
commit
aaca4f84b8
13 changed files with 374 additions and 213 deletions
109
api/cache/listmultipart.go
vendored
Normal file
109
api/cache/listmultipart.go
vendored
Normal file
|
@ -0,0 +1,109 @@
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"github.com/bluele/gcache"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
// ListMultipartSessionCache contains cache for list multiparts session (during pagination).
|
||||||
|
ListMultipartSessionCache struct {
|
||||||
|
cache gcache.Cache
|
||||||
|
logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListMultipartSessionKey is a key to find a ListMultipartSessionCache's entry.
|
||||||
|
ListMultipartSessionKey struct {
|
||||||
|
cid cid.ID
|
||||||
|
prefix string
|
||||||
|
marker string
|
||||||
|
uploadID string
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DefaultListMultipartSessionCacheLifetime is a default lifetime of entries in cache of ListMultipartUploads.
|
||||||
|
DefaultListMultipartSessionCacheLifetime = time.Second * 60
|
||||||
|
// DefaultListMultipartSessionCacheSize is a default size of cache of ListMultipartUploads.
|
||||||
|
DefaultListMultipartSessionCacheSize = 100
|
||||||
|
)
|
||||||
|
|
||||||
|
// DefaultListMultipartSessionConfig returns new default cache expiration values.
|
||||||
|
func DefaultListMultipartSessionConfig(logger *zap.Logger) *Config {
|
||||||
|
return &Config{
|
||||||
|
Size: DefaultListMultipartSessionCacheSize,
|
||||||
|
Lifetime: DefaultListMultipartSessionCacheLifetime,
|
||||||
|
Logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *ListMultipartSessionKey) String() string {
|
||||||
|
return k.cid.EncodeToString() + k.prefix + k.marker + k.uploadID
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewListMultipartSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries.
|
||||||
|
func NewListMultipartSessionCache(config *Config) *ListMultipartSessionCache {
|
||||||
|
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(_ interface{}, val interface{}) {
|
||||||
|
session, ok := val.(*data.ListMultipartSession)
|
||||||
|
if !ok {
|
||||||
|
config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)),
|
||||||
|
zap.String("expected", fmt.Sprintf("%T", session)))
|
||||||
|
}
|
||||||
|
|
||||||
|
if !session.Acquired.Load() {
|
||||||
|
session.Cancel()
|
||||||
|
}
|
||||||
|
}).Build()
|
||||||
|
return &ListMultipartSessionCache{cache: gc, logger: config.Logger}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetListMultipartSession returns a session of ListMultipartUploads request.
|
||||||
|
func (l *ListMultipartSessionCache) GetListMultipartSession(key ListMultipartSessionKey) *data.ListMultipartSession {
|
||||||
|
entry, err := l.cache.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
result, ok := entry.(*data.ListMultipartSession)
|
||||||
|
if !ok {
|
||||||
|
l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||||
|
zap.String("expected", fmt.Sprintf("%T", result)))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutListMultipartSession puts a ListMultipartUploads session info to cache.
|
||||||
|
func (l *ListMultipartSessionCache) PutListMultipartSession(key ListMultipartSessionKey, session *data.ListMultipartSession) error {
|
||||||
|
s := l.GetListMultipartSession(key)
|
||||||
|
if s != nil && s != session {
|
||||||
|
if !s.Acquired.Load() {
|
||||||
|
s.Cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return l.cache.Set(key, session)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteListMultipartSession removes key from cache.
|
||||||
|
func (l *ListMultipartSessionCache) DeleteListMultipartSession(key ListMultipartSessionKey) {
|
||||||
|
l.cache.Remove(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateListMultipartSessionCacheKey returns ListMultipartSessionKey with the given CID, prefix, marker and uploadID.
|
||||||
|
func CreateListMultipartSessionCacheKey(cnr cid.ID, prefix, marker, uploadID string) ListMultipartSessionKey {
|
||||||
|
p := ListMultipartSessionKey{
|
||||||
|
cid: cnr,
|
||||||
|
prefix: prefix,
|
||||||
|
marker: marker,
|
||||||
|
uploadID: uploadID,
|
||||||
|
}
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
|
@ -9,11 +9,25 @@ type VersionsStream interface {
|
||||||
Next(ctx context.Context) (*NodeVersion, error)
|
Next(ctx context.Context) (*NodeVersion, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ListSession struct {
|
type CommonSession struct {
|
||||||
Next []*ExtendedNodeVersion
|
|
||||||
Stream VersionsStream
|
|
||||||
NamesMap map[string]struct{}
|
|
||||||
Context context.Context
|
Context context.Context
|
||||||
Cancel context.CancelFunc
|
Cancel context.CancelFunc
|
||||||
Acquired atomic.Bool
|
Acquired atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ListSession struct {
|
||||||
|
CommonSession
|
||||||
|
Next []*ExtendedNodeVersion
|
||||||
|
Stream VersionsStream
|
||||||
|
NamesMap map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type MultipartInfoStream interface {
|
||||||
|
Next() (*MultipartInfo, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListMultipartSession struct {
|
||||||
|
CommonSession
|
||||||
|
Next *MultipartInfo
|
||||||
|
Stream MultipartInfoStream
|
||||||
|
}
|
||||||
|
|
|
@ -207,3 +207,9 @@ func (l LockInfo) UntilDate() string {
|
||||||
func (l LockInfo) IsCompliance() bool {
|
func (l LockInfo) IsCompliance() bool {
|
||||||
return l.isCompliance
|
return l.isCompliance
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MultipartStreamParams struct {
|
||||||
|
Prefix string
|
||||||
|
KeyMarker string
|
||||||
|
UploadIDMarker string
|
||||||
|
}
|
||||||
|
|
|
@ -244,6 +244,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
|
||||||
Buckets: minCacheCfg,
|
Buckets: minCacheCfg,
|
||||||
System: minCacheCfg,
|
System: minCacheCfg,
|
||||||
AccessControl: minCacheCfg,
|
AccessControl: minCacheCfg,
|
||||||
|
MultipartList: minCacheCfg,
|
||||||
NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime},
|
NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -513,7 +513,7 @@ func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Req
|
||||||
|
|
||||||
if maxUploadsStr != "" {
|
if maxUploadsStr != "" {
|
||||||
val, err := strconv.Atoi(maxUploadsStr)
|
val, err := strconv.Atoi(maxUploadsStr)
|
||||||
if err != nil || val < 1 || val > 1000 {
|
if err != nil || val < 1 || val > maxObjectList {
|
||||||
h.logAndSendError(ctx, w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
|
h.logAndSendError(ctx, w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,15 +12,16 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Cache struct {
|
type Cache struct {
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
listsCache *cache.ObjectsListCache
|
listsCache *cache.ObjectsListCache
|
||||||
sessionListCache *cache.ListSessionCache
|
sessionListCache *cache.ListSessionCache
|
||||||
objCache *cache.ObjectsCache
|
objCache *cache.ObjectsCache
|
||||||
namesCache *cache.ObjectsNameCache
|
namesCache *cache.ObjectsNameCache
|
||||||
bucketCache *cache.BucketCache
|
bucketCache *cache.BucketCache
|
||||||
systemCache *cache.SystemCache
|
systemCache *cache.SystemCache
|
||||||
accessCache *cache.AccessControlCache
|
accessCache *cache.AccessControlCache
|
||||||
networkInfoCache *cache.NetworkInfoCache
|
networkInfoCache *cache.NetworkInfoCache
|
||||||
|
sessionMultipartCache *cache.ListMultipartSessionCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// CachesConfig contains params for caches.
|
// CachesConfig contains params for caches.
|
||||||
|
@ -33,6 +34,7 @@ type CachesConfig struct {
|
||||||
Buckets *cache.Config
|
Buckets *cache.Config
|
||||||
System *cache.Config
|
System *cache.Config
|
||||||
AccessControl *cache.Config
|
AccessControl *cache.Config
|
||||||
|
MultipartList *cache.Config
|
||||||
NetworkInfo *cache.NetworkInfoCacheConfig
|
NetworkInfo *cache.NetworkInfoCacheConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,20 +50,22 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
|
||||||
System: cache.DefaultSystemConfig(logger),
|
System: cache.DefaultSystemConfig(logger),
|
||||||
AccessControl: cache.DefaultAccessControlConfig(logger),
|
AccessControl: cache.DefaultAccessControlConfig(logger),
|
||||||
NetworkInfo: cache.DefaultNetworkInfoConfig(logger),
|
NetworkInfo: cache.DefaultNetworkInfoConfig(logger),
|
||||||
|
MultipartList: cache.DefaultListMultipartSessionConfig(logger),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCache(cfg *CachesConfig) *Cache {
|
func NewCache(cfg *CachesConfig) *Cache {
|
||||||
return &Cache{
|
return &Cache{
|
||||||
logger: cfg.Logger,
|
logger: cfg.Logger,
|
||||||
listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
|
listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
|
||||||
sessionListCache: cache.NewListSessionCache(cfg.SessionList),
|
sessionListCache: cache.NewListSessionCache(cfg.SessionList),
|
||||||
objCache: cache.New(cfg.Objects),
|
objCache: cache.New(cfg.Objects),
|
||||||
namesCache: cache.NewObjectsNameCache(cfg.Names),
|
namesCache: cache.NewObjectsNameCache(cfg.Names),
|
||||||
bucketCache: cache.NewBucketCache(cfg.Buckets),
|
bucketCache: cache.NewBucketCache(cfg.Buckets),
|
||||||
systemCache: cache.NewSystemCache(cfg.System),
|
systemCache: cache.NewSystemCache(cfg.System),
|
||||||
accessCache: cache.NewAccessControlCache(cfg.AccessControl),
|
accessCache: cache.NewAccessControlCache(cfg.AccessControl),
|
||||||
networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo),
|
networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo),
|
||||||
|
sessionMultipartCache: cache.NewListMultipartSessionCache(cfg.MultipartList),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,6 +165,14 @@ func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.Li
|
||||||
return c.sessionListCache.GetListSession(key)
|
return c.sessionListCache.GetListSession(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cache) GetListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) *data.ListMultipartSession {
|
||||||
|
if !c.accessCache.Get(owner, key.String()) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.sessionMultipartCache.GetListMultipartSession(key)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) {
|
func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) {
|
||||||
if err := c.sessionListCache.PutListSession(key, session); err != nil {
|
if err := c.sessionListCache.PutListSession(key, session); err != nil {
|
||||||
c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err))
|
c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err))
|
||||||
|
@ -176,6 +188,21 @@ func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) {
|
||||||
c.accessCache.Delete(owner, key.String())
|
c.accessCache.Delete(owner, key.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cache) PutListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey, session *data.ListMultipartSession) {
|
||||||
|
if err := c.sessionMultipartCache.PutListMultipartSession(key, session); err != nil {
|
||||||
|
c.logger.Warn(logs.CouldntCacheListMultipartSession, zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.accessCache.Put(owner, key.String()); err != nil {
|
||||||
|
c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache) DeleteListMultipartSession(owner user.ID, key cache.ListMultipartSessionKey) {
|
||||||
|
c.sessionMultipartCache.DeleteListMultipartSession(key)
|
||||||
|
c.accessCache.Delete(owner, key.String())
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cache) GetTagging(owner user.ID, key string) map[string]string {
|
func (c *Cache) GetTagging(owner user.ID, key string) map[string]string {
|
||||||
if !c.accessCache.Get(owner, key) {
|
if !c.accessCache.Get(owner, key) {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -814,7 +814,7 @@ func (n *Layer) ResolveBucket(ctx context.Context, zone, name string) (cid.ID, e
|
||||||
|
|
||||||
func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
|
func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
|
||||||
if !p.SkipCheck {
|
if !p.SkipCheck {
|
||||||
res, _, err := n.getAllObjectsVersions(ctx, commonVersionsListingParams{
|
res, _, err := n.getAllObjectsVersions(ctx, commonListingParams{
|
||||||
BktInfo: p.BktInfo,
|
BktInfo: p.BktInfo,
|
||||||
MaxKeys: 1,
|
MaxKeys: 1,
|
||||||
})
|
})
|
||||||
|
|
|
@ -73,17 +73,18 @@ type (
|
||||||
VersionIDMarker string
|
VersionIDMarker string
|
||||||
}
|
}
|
||||||
|
|
||||||
commonVersionsListingParams struct {
|
commonListingParams struct {
|
||||||
BktInfo *data.BucketInfo
|
BktInfo *data.BucketInfo
|
||||||
Delimiter string
|
Delimiter string
|
||||||
Prefix string
|
Prefix string
|
||||||
MaxKeys int
|
MaxKeys int
|
||||||
Marker string
|
Marker string
|
||||||
Bookmark string
|
// key to store session in cache
|
||||||
|
Bookmark string
|
||||||
}
|
}
|
||||||
|
|
||||||
commonLatestVersionsListingParams struct {
|
commonLatestVersionsListingParams struct {
|
||||||
commonVersionsListingParams
|
commonListingParams
|
||||||
ListType ListType
|
ListType ListType
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -100,7 +101,7 @@ func (n *Layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis
|
||||||
var result ListObjectsInfoV1
|
var result ListObjectsInfoV1
|
||||||
|
|
||||||
prm := commonLatestVersionsListingParams{
|
prm := commonLatestVersionsListingParams{
|
||||||
commonVersionsListingParams: commonVersionsListingParams{
|
commonListingParams: commonListingParams{
|
||||||
BktInfo: p.BktInfo,
|
BktInfo: p.BktInfo,
|
||||||
Delimiter: p.Delimiter,
|
Delimiter: p.Delimiter,
|
||||||
Prefix: p.Prefix,
|
Prefix: p.Prefix,
|
||||||
|
@ -131,7 +132,7 @@ func (n *Layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
|
||||||
var result ListObjectsInfoV2
|
var result ListObjectsInfoV2
|
||||||
|
|
||||||
prm := commonLatestVersionsListingParams{
|
prm := commonLatestVersionsListingParams{
|
||||||
commonVersionsListingParams: commonVersionsListingParams{
|
commonListingParams: commonListingParams{
|
||||||
BktInfo: p.BktInfo,
|
BktInfo: p.BktInfo,
|
||||||
Delimiter: p.Delimiter,
|
Delimiter: p.Delimiter,
|
||||||
Prefix: p.Prefix,
|
Prefix: p.Prefix,
|
||||||
|
@ -158,7 +159,7 @@ func (n *Layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
|
func (n *Layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
|
||||||
prm := commonVersionsListingParams{
|
prm := commonListingParams{
|
||||||
BktInfo: p.BktInfo,
|
BktInfo: p.BktInfo,
|
||||||
Delimiter: p.Delimiter,
|
Delimiter: p.Delimiter,
|
||||||
Prefix: p.Prefix,
|
Prefix: p.Prefix,
|
||||||
|
@ -193,13 +194,12 @@ func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
|
||||||
return nil, nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
session, err := n.getListLatestVersionsSession(ctx, p)
|
session, err := n.getListVersionsSession(ctx, p.commonListingParams, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
generator, errorCh := nodesGeneratorStream(ctx, p.commonListingParams, session)
|
||||||
generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
|
objOutCh, err := n.initWorkerPool(ctx, 2, p.commonListingParams, generator)
|
||||||
objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
|
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -225,12 +225,12 @@ func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListingParams) ([]*data.ExtendedNodeVersion, bool, error) {
|
func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonListingParams) ([]*data.ExtendedNodeVersion, bool, error) {
|
||||||
if p.MaxKeys == 0 {
|
if p.MaxKeys == 0 {
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
session, err := n.getListAllVersionsSession(ctx, p)
|
session, err := n.getListVersionsSession(ctx, p, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,7 @@ func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListi
|
||||||
return allObjects, isTruncated, nil
|
return allObjects, isTruncated, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p commonVersionsListingParams, session *data.ListSession) []*data.ExtendedNodeVersion {
|
func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p commonListingParams, session *data.ListSession) []*data.ExtendedNodeVersion {
|
||||||
var lastName string
|
var lastName string
|
||||||
var listRowStartIndex int
|
var listRowStartIndex int
|
||||||
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
|
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
|
||||||
|
@ -301,48 +301,31 @@ func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) {
|
func (n *Layer) getListVersionsSession(ctx context.Context, p commonListingParams, latestOnly bool) (session *data.ListSession, err error) {
|
||||||
return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
|
|
||||||
return n.getListVersionsSession(ctx, p, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
|
|
||||||
owner := n.BearerOwner(ctx)
|
owner := n.BearerOwner(ctx)
|
||||||
|
|
||||||
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
|
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
|
||||||
session := n.cache.GetListSession(owner, cacheKey)
|
session = n.cache.GetListSession(owner, cacheKey)
|
||||||
if session == nil {
|
if session == nil || session.Acquired.Swap(true) {
|
||||||
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
|
session = n.newSession(ctx)
|
||||||
|
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
|
||||||
|
return session, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if session.Acquired.Swap(true) {
|
|
||||||
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
|
|
||||||
}
|
|
||||||
|
|
||||||
// after reading next object from stream in session
|
|
||||||
// the current cache value already doesn't match with next token in cache key
|
|
||||||
n.cache.DeleteListSession(owner, cacheKey)
|
n.cache.DeleteListSession(owner, cacheKey)
|
||||||
|
|
||||||
return session, nil
|
return session, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
|
func (n *Layer) newSession(ctx context.Context) *data.ListSession {
|
||||||
session = &data.ListSession{NamesMap: make(map[string]struct{})}
|
session := &data.ListSession{NamesMap: make(map[string]struct{})}
|
||||||
session.Context, session.Cancel = context.WithCancel(context.Background())
|
session.Context, session.Cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// save access box data for next requests
|
||||||
if bd, err := middleware.GetBoxData(ctx); err == nil {
|
if bd, err := middleware.GetBoxData(ctx); err == nil {
|
||||||
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
|
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
|
||||||
}
|
}
|
||||||
|
|
||||||
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
|
return session
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return session, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
|
func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
|
||||||
|
@ -366,7 +349,7 @@ func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatest
|
||||||
n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
|
n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) putListAllVersionsSession(ctx context.Context, p commonVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
|
func (n *Layer) putListAllVersionsSession(ctx context.Context, p commonListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
|
||||||
if len(allObjects) <= p.MaxKeys {
|
if len(allObjects) <= p.MaxKeys {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -383,7 +366,7 @@ func (n *Layer) putListAllVersionsSession(ctx context.Context, p commonVersionsL
|
||||||
n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
|
n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
|
||||||
}
|
}
|
||||||
|
|
||||||
func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) {
|
func nodesGeneratorStream(ctx context.Context, p commonListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) {
|
||||||
nodeCh := make(chan *data.ExtendedNodeVersion, 1000)
|
nodeCh := make(chan *data.ExtendedNodeVersion, 1000)
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
existed := stream.NamesMap
|
existed := stream.NamesMap
|
||||||
|
@ -439,7 +422,7 @@ func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, st
|
||||||
return nodeCh, errCh
|
return nodeCh, errCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) {
|
func nodesGeneratorVersions(ctx context.Context, p commonListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) {
|
||||||
nodeCh := make(chan *data.ExtendedNodeVersion, 1000)
|
nodeCh := make(chan *data.ExtendedNodeVersion, 1000)
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
existed := stream.NamesMap
|
existed := stream.NamesMap
|
||||||
|
@ -498,7 +481,7 @@ func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams,
|
||||||
return nodeCh, errCh
|
return nodeCh, errCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) initWorkerPool(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
|
func (n *Layer) initWorkerPool(ctx context.Context, size int, p commonListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
|
||||||
reqLog := n.reqLogger(ctx)
|
reqLog := n.reqLogger(ctx)
|
||||||
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
|
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -567,7 +550,7 @@ func (n *Layer) initWorkerPool(ctx context.Context, size int, p commonVersionsLi
|
||||||
return objCh, nil
|
return objCh, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldSkip(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool {
|
func shouldSkip(node *data.ExtendedNodeVersion, p commonListingParams, existed map[string]struct{}) bool {
|
||||||
if node.NodeVersion.IsDeleteMarker {
|
if node.NodeVersion.IsDeleteMarker {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -598,7 +581,7 @@ func shouldSkip(node *data.ExtendedNodeVersion, p commonVersionsListingParams, e
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldSkipVersions(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool {
|
func shouldSkipVersions(node *data.ExtendedNodeVersion, p commonListingParams, existed map[string]struct{}) bool {
|
||||||
filePath := node.NodeVersion.FilePath
|
filePath := node.NodeVersion.FilePath
|
||||||
if node.DirName != "" {
|
if node.DirName != "" {
|
||||||
filePath = node.DirName
|
filePath = node.DirName
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||||
|
@ -499,47 +500,60 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix)
|
session, err := n.getListMultipartUploadsSession(ctx, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
uploads := make([]*UploadInfo, 0, len(multipartInfos))
|
uploads := make([]*UploadInfo, 0, p.MaxUploads)
|
||||||
uniqDirs := make(map[string]struct{})
|
uniqDirs := make(map[string]struct{})
|
||||||
|
uploadsCount := 0
|
||||||
|
if session.Next != nil {
|
||||||
|
uploads = append(uploads, uploadInfoFromMultipartInfo(session.Next, p.Prefix, p.Delimiter))
|
||||||
|
uploadsCount++
|
||||||
|
}
|
||||||
|
|
||||||
for _, multipartInfo := range multipartInfos {
|
var info *data.MultipartInfo
|
||||||
info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter)
|
for uploadsCount < p.MaxUploads {
|
||||||
if info != nil {
|
info, err = session.Stream.Next()
|
||||||
if info.IsDir {
|
if err != nil {
|
||||||
if _, ok := uniqDirs[info.Key]; ok {
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
n.reqLogger(ctx).Warn(logs.CouldNotGetMultipartUploadInfo, zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
upload := uploadInfoFromMultipartInfo(info, p.Prefix, p.Delimiter)
|
||||||
|
if upload != nil {
|
||||||
|
if upload.IsDir {
|
||||||
|
if _, ok := uniqDirs[upload.Key]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
uniqDirs[info.Key] = struct{}{}
|
uniqDirs[upload.Key] = struct{}{}
|
||||||
}
|
}
|
||||||
uploads = append(uploads, info)
|
uploads = append(uploads, upload)
|
||||||
|
uploadsCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(uploads, func(i, j int) bool {
|
isTruncated := true
|
||||||
if uploads[i].Key == uploads[j].Key {
|
next, err := session.Stream.Next()
|
||||||
return uploads[i].UploadID < uploads[j].UploadID
|
if err != nil {
|
||||||
}
|
if err == io.EOF {
|
||||||
return uploads[i].Key < uploads[j].Key
|
isTruncated = false
|
||||||
})
|
|
||||||
|
|
||||||
if p.KeyMarker != "" {
|
|
||||||
if p.UploadIDMarker != "" {
|
|
||||||
uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads)
|
|
||||||
} else {
|
} else {
|
||||||
uploads = trimAfterUploadKey(p.KeyMarker, uploads)
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(uploads) > p.MaxUploads {
|
if isTruncated && info != nil {
|
||||||
|
// put to session redundant multipart upload which we read to check for EOF
|
||||||
|
session.Next = next
|
||||||
result.IsTruncated = true
|
result.IsTruncated = true
|
||||||
uploads = uploads[:p.MaxUploads]
|
result.NextUploadIDMarker = info.UploadID
|
||||||
result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID
|
result.NextKeyMarker = info.Key
|
||||||
result.NextKeyMarker = uploads[len(uploads)-1].Key
|
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, info.Key, info.UploadID)
|
||||||
|
n.putListMultipartUploadsSession(ctx, session, cacheKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ov := range uploads {
|
for _, ov := range uploads {
|
||||||
|
@ -553,6 +567,40 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Layer) putListMultipartUploadsSession(ctx context.Context, session *data.ListMultipartSession, cacheKey cache.ListMultipartSessionKey) {
|
||||||
|
session.Acquired.Store(false)
|
||||||
|
n.cache.PutListMultipartSession(n.BearerOwner(ctx), cacheKey, session)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) getListMultipartUploadsSession(ctx context.Context, p *ListMultipartUploadsParams) (session *data.ListMultipartSession, err error) {
|
||||||
|
owner := n.BearerOwner(ctx)
|
||||||
|
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, p.KeyMarker, p.UploadIDMarker)
|
||||||
|
session = n.cache.GetListMultipartSession(owner, cacheKey)
|
||||||
|
if session == nil || session.Acquired.Swap(true) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
session = &data.ListMultipartSession{
|
||||||
|
CommonSession: data.CommonSession{
|
||||||
|
Context: ctx,
|
||||||
|
Cancel: cancel,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
params := data.MultipartStreamParams{
|
||||||
|
Prefix: p.Prefix,
|
||||||
|
KeyMarker: p.KeyMarker,
|
||||||
|
UploadIDMarker: p.UploadIDMarker,
|
||||||
|
}
|
||||||
|
session.Stream, err = n.treeService.GetMultipartUploadsByPrefix(session.Context, p.Bkt, params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// if after reading next object from stream in session the current cache value already
|
||||||
|
// doesn't match with next token in cache key
|
||||||
|
n.cache.DeleteListMultipartSession(owner, cacheKey)
|
||||||
|
|
||||||
|
return session, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
|
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
|
||||||
multipartInfo, parts, err := n.getUploadParts(ctx, p)
|
multipartInfo, parts, err := n.getUploadParts(ctx, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -53,7 +53,7 @@ type Service interface {
|
||||||
|
|
||||||
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
||||||
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
||||||
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
|
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error)
|
||||||
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
||||||
|
|
||||||
// AddPart puts a node to a system tree as a child of appropriate multipart upload
|
// AddPart puts a node to a system tree as a child of appropriate multipart upload
|
||||||
|
|
|
@ -328,7 +328,7 @@ func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, string) ([]*data.MultipartInfo, error) {
|
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(context.Context, *data.BucketInfo, data.MultipartStreamParams) (data.MultipartInfoStream, error) {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,8 @@ const (
|
||||||
UpdateAccessCredObjectIntoFrostFS = "update access cred object into FrostFS" // Info in ../../authmate/authmate.go
|
UpdateAccessCredObjectIntoFrostFS = "update access cred object into FrostFS" // Info in ../../authmate/authmate.go
|
||||||
MetricsAreDisabled = "metrics are disabled" // Warn in ../../metrics/app.go
|
MetricsAreDisabled = "metrics are disabled" // Warn in ../../metrics/app.go
|
||||||
FoundMoreThanOneUnversionedNode = "found more than one unversioned node" // Debug in ../../pkg/service/tree/tree.go
|
FoundMoreThanOneUnversionedNode = "found more than one unversioned node" // Debug in ../../pkg/service/tree/tree.go
|
||||||
|
CouldNotParseTreeNode = "could not parse tree node" // Error in ../../pkg/service/tree/tree.go
|
||||||
|
CouldNotFormFilePath = "could not form file path" // Error in ../../pkg/service/tree/tree.go
|
||||||
ServiceIsRunning = "service is running" // Info in ../../cmd/s3-gw/service.go
|
ServiceIsRunning = "service is running" // Info in ../../cmd/s3-gw/service.go
|
||||||
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../cmd/s3-gw/service.go
|
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../cmd/s3-gw/service.go
|
||||||
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../cmd/s3-gw/service.go
|
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../cmd/s3-gw/service.go
|
||||||
|
@ -67,6 +69,7 @@ const (
|
||||||
CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go
|
CouldNotListUserContainers = "could not list user containers" // Error in ../../api/layer/container.go
|
||||||
CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go
|
CouldNotFetchContainerInfo = "could not fetch container info" // Error in ../../api/layer/container.go
|
||||||
MismatchedObjEncryptionInfo = "mismatched obj encryptionInfo" // Warn in ../../api/layer/multipart_upload.go
|
MismatchedObjEncryptionInfo = "mismatched obj encryptionInfo" // Warn in ../../api/layer/multipart_upload.go
|
||||||
|
CouldNotGetMultipartUploadInfo = "could not get multipart upload info" // Warn in ../../api/layer/multipart_upload.go
|
||||||
UploadPart = "upload part" // Debug in ../../api/layer/multipart_upload.go
|
UploadPart = "upload part" // Debug in ../../api/layer/multipart_upload.go
|
||||||
CouldntDeleteOldPartObject = "couldn't delete old part object" // Error in ../../api/layer/multipart_upload.go
|
CouldntDeleteOldPartObject = "couldn't delete old part object" // Error in ../../api/layer/multipart_upload.go
|
||||||
CouldNotPutCompletedObject = "could not put a completed object (multipart upload)" // Error in ../../api/layer/multipart_upload.go
|
CouldNotPutCompletedObject = "could not put a completed object (multipart upload)" // Error in ../../api/layer/multipart_upload.go
|
||||||
|
@ -89,6 +92,7 @@ const (
|
||||||
CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go
|
CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go
|
||||||
CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go
|
CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go
|
||||||
CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go
|
CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go
|
||||||
|
CouldntCacheListMultipartSession = "couldn't cache list multipart session" // Warn in ../../api/layer/cache.go
|
||||||
CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go
|
CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go
|
||||||
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
|
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
|
||||||
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go
|
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go
|
||||||
|
|
|
@ -1165,58 +1165,16 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr
|
||||||
return intermediateNodes, nil
|
return intermediateNodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
|
func (c *Tree) getSubTreeByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, error) {
|
||||||
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
|
rootID, _, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, tree.ErrNodeNotFound) {
|
if errors.Is(err, tree.ErrNodeNotFound) {
|
||||||
return nil, "", nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return nil, "", err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2, false)
|
return c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, maxGetSubTreeDepth)
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, tree.ErrNodeNotFound) {
|
|
||||||
return nil, "", nil
|
|
||||||
}
|
|
||||||
return nil, "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
nodesMap := make(map[string][]NodeResponse, len(subTree))
|
|
||||||
for _, node := range subTree {
|
|
||||||
if MultiID(rootID).Equal(node.GetNodeID()) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
fileName := getFilename(node)
|
|
||||||
if !strings.HasPrefix(fileName, tailPrefix) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
nodes := nodesMap[fileName]
|
|
||||||
|
|
||||||
// Add all nodes if flag latestOnly is false.
|
|
||||||
// Add all intermediate nodes
|
|
||||||
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
|
|
||||||
if len(nodes) == 0 {
|
|
||||||
nodes = []NodeResponse{node}
|
|
||||||
} else if !latestOnly || isIntermediate(node) {
|
|
||||||
nodes = append(nodes, node)
|
|
||||||
} else if isIntermediate(nodes[0]) {
|
|
||||||
nodes = append([]NodeResponse{node}, nodes...)
|
|
||||||
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
|
|
||||||
nodes[0] = node
|
|
||||||
}
|
|
||||||
|
|
||||||
nodesMap[fileName] = nodes
|
|
||||||
}
|
|
||||||
|
|
||||||
result := make([]NodeResponse, 0, len(subTree))
|
|
||||||
for _, nodes := range nodesMap {
|
|
||||||
result = append(result, nodes...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, strings.TrimSuffix(prefix, tailPrefix), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getFilename(node NodeResponse) string {
|
func getFilename(node NodeResponse) string {
|
||||||
|
@ -1253,6 +1211,26 @@ func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string
|
||||||
return filepath, nil
|
return filepath, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func formFilePathV2(node treeNode, filename string, namesMap map[uint64]*treeNode) (string, error) {
|
||||||
|
var parentPath string
|
||||||
|
curNode := &node
|
||||||
|
for {
|
||||||
|
parentNode, ok := namesMap[curNode.ParentID[0]]
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
parentFileName, ok := parentNode.FileName()
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("couldn't get parent file name")
|
||||||
|
}
|
||||||
|
|
||||||
|
parentPath = parentFileName + separator + parentPath
|
||||||
|
curNode = parentNode
|
||||||
|
}
|
||||||
|
|
||||||
|
return parentPath + filename, nil
|
||||||
|
}
|
||||||
|
|
||||||
func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
|
func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
|
||||||
tNode, err := newTreeNode(node)
|
tNode, err := newTreeNode(node)
|
||||||
if err != nil { // invalid OID attribute
|
if err != nil { // invalid OID attribute
|
||||||
|
@ -1267,10 +1245,6 @@ func parseTreeNode(node NodeResponse) (*treeNode, string, error) {
|
||||||
return tNode, fileName, nil
|
return tNode, fileName, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func formLatestNodeKey(parentID uint64, fileName string) string {
|
|
||||||
return strconv.FormatUint(parentID, 10) + "." + fileName
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
|
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
|
||||||
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
|
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
|
||||||
}
|
}
|
||||||
|
@ -1313,84 +1287,79 @@ func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketIn
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
|
type multipartInfoStream struct {
|
||||||
subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
|
log *zap.Logger
|
||||||
if err != nil {
|
nodeNames map[uint64]*treeNode
|
||||||
return nil, err
|
stream SubTreeStream
|
||||||
}
|
uploadID string
|
||||||
|
keyMarker string
|
||||||
|
pathPrefix string
|
||||||
|
}
|
||||||
|
|
||||||
var result []*data.MultipartInfo
|
func (m multipartInfoStream) Next() (*data.MultipartInfo, error) {
|
||||||
for _, node := range subTreeNodes {
|
var tNode *treeNode
|
||||||
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix)
|
var filename, filepath string
|
||||||
|
|
||||||
|
for {
|
||||||
|
node, err := m.stream.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
result = append(result, multipartUploads...)
|
tNode, filename, err = parseTreeNode(node)
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, errNodeDoesntContainFileName) {
|
||||||
|
m.log.Error(logs.CouldNotParseTreeNode, zap.Error(err))
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
m.nodeNames[tNode.ID[0]] = tNode
|
||||||
|
if _, ok := tNode.Meta[finishedKV]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if id, ok := tNode.Meta[uploadIDKV]; ok {
|
||||||
|
filepath, err = formFilePathV2(*tNode, filename, m.nodeNames)
|
||||||
|
if err != nil {
|
||||||
|
m.log.Error(logs.CouldNotFormFilePath, zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
filepath = m.pathPrefix + filepath
|
||||||
|
if m.keyMarker == "" || filepath > m.keyMarker || (filepath == m.keyMarker && m.uploadID != "" && id > m.uploadID) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return newMultipartInfoFromTreeNode(m.log, filepath, tNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
|
func trimPrefix(prefix string) string {
|
||||||
// sorting in getSubTree leads to skipping nodes that doesn't have FileName attribute
|
trimmedPrefix := strings.TrimRightFunc(prefix, func(r rune) bool {
|
||||||
// so when we are only interested in multipart nodes, we can set this flag
|
return r != '/'
|
||||||
// (despite we sort multiparts in above layer anyway)
|
})
|
||||||
// to skip its children (parts) that don't have FileName
|
trimmedPrefix = strings.TrimRight(trimmedPrefix, separator)
|
||||||
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth, true)
|
return strings.TrimRightFunc(trimmedPrefix, func(r rune) bool {
|
||||||
|
return r != '/'
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, params data.MultipartStreamParams) (data.MultipartInfoStream, error) {
|
||||||
|
stream, err := c.getSubTreeByPrefixStream(ctx, bktInfo, systemTree, params.Prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, tree.ErrNodeNotFound) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var parentPrefix string
|
return &multipartInfoStream{
|
||||||
if parentFilePath != "" { // The root of subTree can also have a parent
|
log: c.reqLogger(ctx),
|
||||||
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
|
nodeNames: make(map[uint64]*treeNode),
|
||||||
}
|
stream: stream,
|
||||||
|
uploadID: params.UploadIDMarker,
|
||||||
var filepath string
|
keyMarker: params.KeyMarker,
|
||||||
namesMap := make(map[uint64]string, len(subTree))
|
pathPrefix: trimPrefix(params.Prefix),
|
||||||
multiparts := make(map[string][]*data.MultipartInfo, len(subTree))
|
}, nil
|
||||||
|
|
||||||
for i, node := range subTree {
|
|
||||||
tNode, fileName, err := parseTreeNode(node)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if i != 0 {
|
|
||||||
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid node order: %w", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
filepath = parentPrefix + fileName
|
|
||||||
for _, id := range tNode.ID {
|
|
||||||
namesMap[id] = filepath
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, tNode)
|
|
||||||
if err != nil || multipartInfo.Finished {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, id := range node.GetParentID() {
|
|
||||||
key := formLatestNodeKey(id, fileName)
|
|
||||||
multipartInfos, ok := multiparts[key]
|
|
||||||
if !ok {
|
|
||||||
multipartInfos = []*data.MultipartInfo{multipartInfo}
|
|
||||||
} else {
|
|
||||||
multipartInfos = append(multipartInfos, multipartInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
multiparts[key] = multipartInfos
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
result := make([]*data.MultipartInfo, 0, len(multiparts))
|
|
||||||
for _, multipartInfo := range multiparts {
|
|
||||||
result = append(result, multipartInfo...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {
|
func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) {
|
||||||
|
|
Loading…
Reference in a new issue