[#469] List multipart uploads streaming

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
Nikita Zinkevich 2024-10-30 10:04:53 +03:00 committed by Alexey Vanin
parent a7ce40d745
commit c85f619f48
21 changed files with 673 additions and 383 deletions

View file

@ -2,6 +2,7 @@ package layer
import (
"bytes"
"cmp"
"context"
"crypto/md5"
"encoding/base64"
@ -10,6 +11,7 @@ import (
"errors"
"fmt"
"io"
"slices"
"sort"
"strconv"
"strings"
@ -17,11 +19,13 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -525,56 +529,63 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
}
func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (ListMultipartUploadsInfo, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "layer.ListMultipartUploads")
defer span.End()
var result ListMultipartUploadsInfo
if p.MaxUploads == 0 {
return &result, nil
return ListMultipartUploadsInfo{}, nil
}
multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix)
session, err := n.getListMultipartUploadsSession(ctx, p)
if err != nil {
return nil, err
if errors.Is(err, tree.ErrNodeNotFound) {
return ListMultipartUploadsInfo{}, nil
}
return ListMultipartUploadsInfo{}, err
}
uploads := make([]*UploadInfo, 0, len(multipartInfos))
uploads := make([]*UploadInfo, 0, p.MaxUploads)
uniqDirs := make(map[string]struct{})
for _, multipartInfo := range multipartInfos {
info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter)
if info != nil {
if info.IsDir {
if _, ok := uniqDirs[info.Key]; ok {
continue
}
uniqDirs[info.Key] = struct{}{}
}
uploads = append(uploads, info)
if session.Next != nil {
upload := uploadInfoFromMultipartInfo(session.Next, p.Prefix, p.Delimiter)
if upload.IsDir {
uniqDirs[upload.Key] = struct{}{}
}
uploads = append(uploads, upload)
}
sort.Slice(uploads, func(i, j int) bool {
if uploads[i].Key == uploads[j].Key {
return uploads[i].UploadID < uploads[j].UploadID
var next *data.MultipartInfo
// +1 in order to check for truncated output
for len(uploads) < p.MaxUploads+1 {
next, err = session.Stream.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
n.reqLogger(ctx).Warn(logs.CouldntGetMultipartUploadInfo, zap.Error(err), logs.TagField(logs.TagDatapath))
return ListMultipartUploadsInfo{}, err
}
return uploads[i].Key < uploads[j].Key
})
if p.KeyMarker != "" {
if p.UploadIDMarker != "" {
uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads)
} else {
uploads = trimAfterUploadKey(p.KeyMarker, uploads)
upload := uploadInfoFromMultipartInfo(next, p.Prefix, p.Delimiter)
if upload.IsDir {
if !isUniqDir(upload.Key, uniqDirs) {
continue
}
uniqDirs[upload.Key] = struct{}{}
}
uploads = append(uploads, upload)
}
if len(uploads) > p.MaxUploads {
result.IsTruncated = true
// put to session redundant multipart upload which was read to check for EOF
session.Next = next
uploads = uploads[:p.MaxUploads]
result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID
result.NextKeyMarker = uploads[len(uploads)-1].Key
result.IsTruncated = true
result.NextUploadIDMarker = uploads[p.MaxUploads-1].UploadID
result.NextKeyMarker = uploads[p.MaxUploads-1].Key
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, result.NextKeyMarker, result.NextUploadIDMarker)
n.putListMultipartUploadsSession(ctx, session, cacheKey)
}
for _, ov := range uploads {
@ -585,7 +596,60 @@ func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
}
}
return &result, nil
slices.SortFunc(result.Uploads, func(a, b *UploadInfo) int {
keyCmp := cmp.Compare(a.Key, b.Key)
if keyCmp == 0 {
return cmp.Compare(a.UploadID, b.UploadID)
}
return keyCmp
})
return result, nil
}
func (n *Layer) putListMultipartUploadsSession(ctx context.Context, session *data.ListMultipartSession, cacheKey cache.ListMultipartSessionKey) {
session.Acquired.Store(false)
n.cache.PutListMultipartSession(n.BearerOwner(ctx), cacheKey, session)
}
func (n *Layer) getListMultipartUploadsSession(ctx context.Context, p *ListMultipartUploadsParams) (session *data.ListMultipartSession, err error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, p.KeyMarker, p.UploadIDMarker)
session = n.cache.GetListMultipartSession(owner, cacheKey)
if session == nil || session.Acquired.Swap(true) {
session = newListMultipartSession(ctx)
params := data.MultipartStreamParams{
Prefix: p.Prefix,
KeyMarker: p.KeyMarker,
UploadIDMarker: p.UploadIDMarker,
}
session.Stream, err = n.treeService.GetMultipartUploadsByPrefix(session.Context, p.Bkt, params)
if err != nil {
return nil, err
}
}
// if after reading next object from stream in session the current cache value already
// doesn't match with next token in cache key
n.cache.DeleteListMultipartSession(owner, cacheKey)
return session, nil
}
func newListMultipartSession(ctx context.Context) *data.ListMultipartSession {
reqCtx, cancel := context.WithCancel(context.Background())
session := &data.ListMultipartSession{
CommonSession: data.CommonSession{
Context: reqCtx,
Cancel: cancel,
},
}
// save access box data for next requests
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd})
}
return session
}
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
@ -738,44 +802,10 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
return multipartInfo, res, nil
}
func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo {
var res []*UploadInfo
if len(uploads) != 0 && uploads[len(uploads)-1].Key < key {
return res
}
for _, obj := range uploads {
if obj.Key >= key && obj.UploadID > id {
res = append(res, obj)
}
}
return res
}
func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
var result []*UploadInfo
if len(objects) != 0 && objects[len(objects)-1].Key <= key {
return result
}
for i, obj := range objects {
if obj.Key > key {
result = objects[i:]
break
}
}
return result
}
func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo {
var isDir bool
key := uploadInfo.Key
if !strings.HasPrefix(key, prefix) {
return nil
}
if len(delimiter) > 0 {
tail := strings.TrimPrefix(key, prefix)
index := strings.Index(tail, delimiter)
@ -793,3 +823,10 @@ func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimit
Created: uploadInfo.Created,
}
}
func isUniqDir(key string, uniqDirs map[string]struct{}) bool {
if _, ok := uniqDirs[key]; ok {
return false
}
return true
}