Denis Kirillov
056f168d77
Previously after tree split we can have duplicated parts (several objects and tree node referred to the same part number). Some of them couldn't be deleted after abort or compete action. Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
732 lines
20 KiB
Go
732 lines
20 KiB
Go
package layer
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/minio/sio"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
UploadIDAttributeName = "S3-Upload-Id"
|
|
UploadPartNumberAttributeName = "S3-Upload-Part-Number"
|
|
UploadCompletedParts = "S3-Completed-Parts"
|
|
|
|
// MultipartObjectSize contains the real object size if object is combined (payload contains list of parts).
|
|
// This header is used to determine if object is combined.
|
|
MultipartObjectSize = "S3-Multipart-Object-Size"
|
|
|
|
metaPrefix = "meta-"
|
|
|
|
MaxSizeUploadsList = 1000
|
|
MaxSizePartsList = 1000
|
|
UploadMinPartNumber = 1
|
|
UploadMaxPartNumber = 10000
|
|
UploadMinSize = 5 * 1024 * 1024 // 5MB
|
|
UploadMaxSize = 1024 * UploadMinSize // 5GB
|
|
)
|
|
|
|
type (
|
|
UploadInfoParams struct {
|
|
UploadID string
|
|
Bkt *data.BucketInfo
|
|
Key string
|
|
Encryption encryption.Params
|
|
}
|
|
|
|
CreateMultipartParams struct {
|
|
Info *UploadInfoParams
|
|
Header map[string]string
|
|
Data *UploadData
|
|
CopiesNumbers []uint32
|
|
}
|
|
|
|
UploadData struct {
|
|
TagSet map[string]string
|
|
}
|
|
|
|
UploadPartParams struct {
|
|
Info *UploadInfoParams
|
|
PartNumber int
|
|
Size uint64
|
|
Reader io.Reader
|
|
ContentMD5 string
|
|
ContentSHA256Hash string
|
|
}
|
|
|
|
UploadCopyParams struct {
|
|
Versioned bool
|
|
Info *UploadInfoParams
|
|
SrcObjInfo *data.ObjectInfo
|
|
SrcBktInfo *data.BucketInfo
|
|
SrcEncryption encryption.Params
|
|
PartNumber int
|
|
Range *RangeParams
|
|
}
|
|
|
|
CompleteMultipartParams struct {
|
|
Info *UploadInfoParams
|
|
Parts []*CompletedPart
|
|
}
|
|
|
|
CompletedPart struct {
|
|
ETag string
|
|
PartNumber int
|
|
}
|
|
|
|
EncryptedPart struct {
|
|
Part
|
|
EncryptedSize int64
|
|
}
|
|
|
|
Part struct {
|
|
ETag string
|
|
LastModified string
|
|
PartNumber int
|
|
Size uint64
|
|
}
|
|
|
|
ListMultipartUploadsParams struct {
|
|
Bkt *data.BucketInfo
|
|
Delimiter string
|
|
EncodingType string
|
|
KeyMarker string
|
|
MaxUploads int
|
|
Prefix string
|
|
UploadIDMarker string
|
|
}
|
|
|
|
ListPartsParams struct {
|
|
Info *UploadInfoParams
|
|
MaxParts int
|
|
PartNumberMarker int
|
|
}
|
|
|
|
ListPartsInfo struct {
|
|
Parts []*Part
|
|
Owner user.ID
|
|
NextPartNumberMarker int
|
|
IsTruncated bool
|
|
}
|
|
|
|
ListMultipartUploadsInfo struct {
|
|
Prefixes []string
|
|
Uploads []*UploadInfo
|
|
IsTruncated bool
|
|
NextKeyMarker string
|
|
NextUploadIDMarker string
|
|
}
|
|
UploadInfo struct {
|
|
IsDir bool
|
|
Key string
|
|
UploadID string
|
|
Owner user.ID
|
|
Created time.Time
|
|
}
|
|
)
|
|
|
|
func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error {
|
|
metaSize := len(p.Header)
|
|
if p.Data != nil {
|
|
metaSize += len(p.Data.TagSet)
|
|
}
|
|
|
|
networkInfo, err := n.frostFS.NetworkInfo(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("get network info: %w", err)
|
|
}
|
|
|
|
info := &data.MultipartInfo{
|
|
Key: p.Info.Key,
|
|
UploadID: p.Info.UploadID,
|
|
Owner: n.gateOwner,
|
|
Created: TimeNow(ctx),
|
|
Meta: make(map[string]string, metaSize),
|
|
CopiesNumbers: p.CopiesNumbers,
|
|
CreationEpoch: networkInfo.CurrentEpoch(),
|
|
}
|
|
|
|
for key, val := range p.Header {
|
|
info.Meta[metaPrefix+key] = val
|
|
}
|
|
|
|
if p.Data != nil {
|
|
for key, val := range p.Data.TagSet {
|
|
info.Meta[tagPrefix+key] = val
|
|
}
|
|
}
|
|
|
|
if p.Info.Encryption.Enabled() {
|
|
if err := addEncryptionHeaders(info.Meta, p.Info.Encryption); err != nil {
|
|
return fmt.Errorf("add encryption header: %w", err)
|
|
}
|
|
}
|
|
|
|
return n.treeService.CreateMultipartUpload(ctx, p.Info.Bkt, info)
|
|
}
|
|
|
|
func (n *Layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) {
|
|
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID)
|
|
if err != nil {
|
|
if errors.Is(err, ErrNodeNotFound) {
|
|
return "", fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchUpload), err.Error())
|
|
}
|
|
return "", err
|
|
}
|
|
|
|
if p.Size > UploadMaxSize {
|
|
return "", fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), p.Size, UploadMaxSize)
|
|
}
|
|
|
|
objInfo, err := n.uploadPart(ctx, multipartInfo, p)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return objInfo.ETag(n.features.MD5Enabled()), nil
|
|
}
|
|
|
|
func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
|
|
encInfo := FormEncryptionInfo(multipartInfo.Meta)
|
|
if err := p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil {
|
|
n.reqLogger(ctx).Warn(logs.MismatchedObjEncryptionInfo, zap.Error(err))
|
|
return nil, s3errors.GetAPIError(s3errors.ErrInvalidEncryptionParameters)
|
|
}
|
|
|
|
bktInfo := p.Info.Bkt
|
|
prm := PrmObjectCreate{
|
|
Container: bktInfo.CID,
|
|
Attributes: make([][2]string, 2),
|
|
Payload: p.Reader,
|
|
CreationTime: TimeNow(ctx),
|
|
CopiesNumber: multipartInfo.CopiesNumbers,
|
|
}
|
|
|
|
decSize := p.Size
|
|
if p.Info.Encryption.Enabled() {
|
|
r, encSize, err := encryptionReader(p.Reader, p.Size, p.Info.Encryption.Key())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create ecnrypted reader: %w", err)
|
|
}
|
|
prm.Attributes = append(prm.Attributes, [2]string{AttributeDecryptedSize, strconv.FormatUint(p.Size, 10)})
|
|
prm.Payload = r
|
|
p.Size = encSize
|
|
}
|
|
|
|
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
|
|
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
|
|
|
|
createdObj, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(p.ContentMD5) > 0 {
|
|
hashBytes, err := base64.StdEncoding.DecodeString(p.ContentMD5)
|
|
if err != nil {
|
|
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
|
|
}
|
|
if hex.EncodeToString(hashBytes) != hex.EncodeToString(createdObj.MD5Sum) {
|
|
prm := PrmObjectDelete{
|
|
Object: createdObj.ID,
|
|
Container: bktInfo.CID,
|
|
}
|
|
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
|
err = n.frostFS.DeleteObject(ctx, prm)
|
|
if err != nil {
|
|
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
|
|
}
|
|
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
|
|
}
|
|
}
|
|
if p.Info.Encryption.Enabled() {
|
|
createdObj.Size = decSize
|
|
}
|
|
|
|
if !p.Info.Encryption.Enabled() && len(p.ContentSHA256Hash) > 0 && !auth.IsStandardContentSHA256(p.ContentSHA256Hash) {
|
|
contentHashBytes, err := hex.DecodeString(p.ContentSHA256Hash)
|
|
if err != nil {
|
|
return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch)
|
|
}
|
|
if !bytes.Equal(contentHashBytes, createdObj.HashSum) {
|
|
err = n.objectDelete(ctx, bktInfo, createdObj.ID)
|
|
if err != nil {
|
|
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
|
|
}
|
|
return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch)
|
|
}
|
|
}
|
|
|
|
n.reqLogger(ctx).Debug(logs.UploadPart,
|
|
zap.String("multipart upload", p.Info.UploadID), zap.Int("part number", p.PartNumber),
|
|
zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
|
|
|
|
partInfo := &data.PartInfo{
|
|
Key: p.Info.Key,
|
|
UploadID: p.Info.UploadID,
|
|
Number: p.PartNumber,
|
|
OID: createdObj.ID,
|
|
Size: createdObj.Size,
|
|
ETag: hex.EncodeToString(createdObj.HashSum),
|
|
Created: prm.CreationTime,
|
|
MD5: hex.EncodeToString(createdObj.MD5Sum),
|
|
}
|
|
|
|
oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
|
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
|
|
if err != nil && !oldPartIDNotFound {
|
|
return nil, err
|
|
}
|
|
if !oldPartIDNotFound {
|
|
for _, oldPartID := range oldPartIDs {
|
|
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
|
|
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
|
|
zap.String("cid", bktInfo.CID.EncodeToString()),
|
|
zap.String("oid", oldPartID.EncodeToString()))
|
|
}
|
|
}
|
|
}
|
|
|
|
objInfo := &data.ObjectInfo{
|
|
ID: createdObj.ID,
|
|
CID: bktInfo.CID,
|
|
|
|
Owner: bktInfo.Owner,
|
|
Bucket: bktInfo.Name,
|
|
Size: partInfo.Size,
|
|
Created: partInfo.Created,
|
|
HashSum: partInfo.ETag,
|
|
MD5Sum: partInfo.MD5,
|
|
}
|
|
|
|
return objInfo, nil
|
|
}
|
|
|
|
func (n *Layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) {
|
|
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID)
|
|
if err != nil {
|
|
if errors.Is(err, ErrNodeNotFound) {
|
|
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchUpload), err.Error())
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
size := p.SrcObjInfo.Size
|
|
srcObjectSize := p.SrcObjInfo.Size
|
|
|
|
if objSize, err := GetObjectSize(p.SrcObjInfo); err == nil {
|
|
srcObjectSize = objSize
|
|
size = objSize
|
|
}
|
|
|
|
if p.Range != nil {
|
|
size = p.Range.End - p.Range.Start + 1
|
|
if p.Range.End > srcObjectSize {
|
|
return nil, fmt.Errorf("%w: %d-%d/%d", s3errors.GetAPIError(s3errors.ErrInvalidCopyPartRangeSource), p.Range.Start, p.Range.End, srcObjectSize)
|
|
}
|
|
}
|
|
if size > UploadMaxSize {
|
|
return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, UploadMaxSize)
|
|
}
|
|
|
|
objPayload, err := n.GetObject(ctx, &GetObjectParams{
|
|
ObjectInfo: p.SrcObjInfo,
|
|
Versioned: p.Versioned,
|
|
Range: p.Range,
|
|
BucketInfo: p.SrcBktInfo,
|
|
Encryption: p.SrcEncryption,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get object to upload copy: %w", err)
|
|
}
|
|
|
|
params := &UploadPartParams{
|
|
Info: p.Info,
|
|
PartNumber: p.PartNumber,
|
|
Size: size,
|
|
Reader: objPayload,
|
|
}
|
|
|
|
return n.uploadPart(ctx, multipartInfo, params)
|
|
}
|
|
|
|
func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) {
|
|
for i := 1; i < len(p.Parts); i++ {
|
|
if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber {
|
|
return nil, nil, s3errors.GetAPIError(s3errors.ErrInvalidPartOrder)
|
|
}
|
|
}
|
|
|
|
multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
encInfo := FormEncryptionInfo(multipartInfo.Meta)
|
|
|
|
if len(partsInfo) < len(p.Parts) {
|
|
return nil, nil, fmt.Errorf("%w: found %d parts, need %d", s3errors.GetAPIError(s3errors.ErrInvalidPart), len(partsInfo), len(p.Parts))
|
|
}
|
|
|
|
var multipartObjetSize uint64
|
|
var encMultipartObjectSize uint64
|
|
parts := make([]*data.PartInfoExtended, 0, len(p.Parts))
|
|
|
|
var completedPartsHeader strings.Builder
|
|
md5Hash := md5.New()
|
|
for i, part := range p.Parts {
|
|
partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled())
|
|
if partInfo == nil {
|
|
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
|
|
}
|
|
|
|
// for the last part we have no minimum size limit
|
|
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
|
|
return nil, nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooSmall), partInfo.Size, UploadMinSize)
|
|
}
|
|
parts = append(parts, partInfo)
|
|
multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted)
|
|
|
|
if encInfo.Enabled {
|
|
encPartSize, err := sio.EncryptedSize(partInfo.Size)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("compute encrypted size: %w", err)
|
|
}
|
|
encMultipartObjectSize += encPartSize
|
|
}
|
|
|
|
partInfoStr := partInfo.ToHeaderString()
|
|
if i != len(p.Parts)-1 {
|
|
partInfoStr += ","
|
|
}
|
|
if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
bytesHash, err := hex.DecodeString(partInfo.MD5)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("couldn't decode MD5 checksum of part: %w", err)
|
|
}
|
|
md5Hash.Write(bytesHash)
|
|
}
|
|
|
|
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
|
|
initMetadata[UploadCompletedParts] = completedPartsHeader.String()
|
|
initMetadata[MultipartObjectSize] = strconv.FormatUint(multipartObjetSize, 10)
|
|
|
|
uploadData := &UploadData{
|
|
TagSet: make(map[string]string),
|
|
}
|
|
for key, val := range multipartInfo.Meta {
|
|
if strings.HasPrefix(key, metaPrefix) {
|
|
initMetadata[strings.TrimPrefix(key, metaPrefix)] = val
|
|
} else if strings.HasPrefix(key, tagPrefix) {
|
|
uploadData.TagSet[strings.TrimPrefix(key, tagPrefix)] = val
|
|
}
|
|
}
|
|
|
|
if encInfo.Enabled {
|
|
initMetadata[AttributeEncryptionAlgorithm] = encInfo.Algorithm
|
|
initMetadata[AttributeHMACKey] = encInfo.HMACKey
|
|
initMetadata[AttributeHMACSalt] = encInfo.HMACSalt
|
|
initMetadata[AttributeDecryptedSize] = strconv.FormatUint(multipartObjetSize, 10)
|
|
multipartObjetSize = encMultipartObjectSize
|
|
}
|
|
|
|
partsData, err := json.Marshal(parts)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("marshal parst for combined object: %w", err)
|
|
}
|
|
|
|
extObjInfo, err := n.PutObject(ctx, &PutObjectParams{
|
|
BktInfo: p.Info.Bkt,
|
|
Object: p.Info.Key,
|
|
Reader: bytes.NewReader(partsData),
|
|
Header: initMetadata,
|
|
Size: multipartObjetSize,
|
|
Encryption: p.Info.Encryption,
|
|
CopiesNumbers: multipartInfo.CopiesNumbers,
|
|
CompleteMD5Hash: hex.EncodeToString(md5Hash.Sum(nil)) + "-" + strconv.Itoa(len(p.Parts)),
|
|
})
|
|
if err != nil {
|
|
n.reqLogger(ctx).Error(logs.CouldNotPutCompletedObject,
|
|
zap.String("uploadID", p.Info.UploadID),
|
|
zap.String("uploadKey", p.Info.Key),
|
|
zap.Error(err))
|
|
|
|
return nil, nil, s3errors.GetAPIError(s3errors.ErrInternalError)
|
|
}
|
|
|
|
var addr oid.Address
|
|
addr.SetContainer(p.Info.Bkt.CID)
|
|
for _, prts := range partsInfo {
|
|
for _, partInfo := range prts {
|
|
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
|
|
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
|
|
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
|
|
zap.Error(err))
|
|
}
|
|
addr.SetObject(partInfo.OID)
|
|
n.cache.DeleteObject(addr)
|
|
}
|
|
}
|
|
|
|
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
|
|
}
|
|
|
|
func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
|
|
var result ListMultipartUploadsInfo
|
|
if p.MaxUploads == 0 {
|
|
return &result, nil
|
|
}
|
|
|
|
multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
uploads := make([]*UploadInfo, 0, len(multipartInfos))
|
|
uniqDirs := make(map[string]struct{})
|
|
|
|
for _, multipartInfo := range multipartInfos {
|
|
info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter)
|
|
if info != nil {
|
|
if info.IsDir {
|
|
if _, ok := uniqDirs[info.Key]; ok {
|
|
continue
|
|
}
|
|
uniqDirs[info.Key] = struct{}{}
|
|
}
|
|
uploads = append(uploads, info)
|
|
}
|
|
}
|
|
|
|
sort.Slice(uploads, func(i, j int) bool {
|
|
if uploads[i].Key == uploads[j].Key {
|
|
return uploads[i].UploadID < uploads[j].UploadID
|
|
}
|
|
return uploads[i].Key < uploads[j].Key
|
|
})
|
|
|
|
if p.KeyMarker != "" {
|
|
if p.UploadIDMarker != "" {
|
|
uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads)
|
|
} else {
|
|
uploads = trimAfterUploadKey(p.KeyMarker, uploads)
|
|
}
|
|
}
|
|
|
|
if len(uploads) > p.MaxUploads {
|
|
result.IsTruncated = true
|
|
uploads = uploads[:p.MaxUploads]
|
|
result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID
|
|
result.NextKeyMarker = uploads[len(uploads)-1].Key
|
|
}
|
|
|
|
for _, ov := range uploads {
|
|
if ov.IsDir {
|
|
result.Prefixes = append(result.Prefixes, ov.Key)
|
|
} else {
|
|
result.Uploads = append(result.Uploads, ov)
|
|
}
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
|
|
multipartInfo, parts, err := n.getUploadParts(ctx, p)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, infos := range parts {
|
|
for _, info := range infos {
|
|
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
|
|
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
|
|
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
|
|
}
|
|
|
|
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
|
var res ListPartsInfo
|
|
multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
encInfo := FormEncryptionInfo(multipartInfo.Meta)
|
|
if err = p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil {
|
|
n.reqLogger(ctx).Warn(logs.MismatchedObjEncryptionInfo, zap.Error(err))
|
|
return nil, s3errors.GetAPIError(s3errors.ErrInvalidEncryptionParameters)
|
|
}
|
|
|
|
res.Owner = multipartInfo.Owner
|
|
|
|
parts := make([]*Part, 0, len(partsInfo))
|
|
|
|
for _, infos := range partsInfo {
|
|
sort.Slice(infos, func(i, j int) bool {
|
|
return infos[i].Timestamp < infos[j].Timestamp
|
|
})
|
|
|
|
partInfo := infos[len(infos)-1]
|
|
parts = append(parts, &Part{
|
|
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
|
|
LastModified: partInfo.Created.UTC().Format(time.RFC3339),
|
|
PartNumber: partInfo.Number,
|
|
Size: partInfo.Size,
|
|
})
|
|
}
|
|
|
|
sort.Slice(parts, func(i, j int) bool {
|
|
return parts[i].PartNumber < parts[j].PartNumber
|
|
})
|
|
|
|
if len(parts) == 0 || p.PartNumberMarker >= parts[len(parts)-1].PartNumber {
|
|
res.Parts = make([]*Part, 0)
|
|
return &res, nil
|
|
}
|
|
if p.PartNumberMarker != 0 {
|
|
for i, part := range parts {
|
|
if part.PartNumber > p.PartNumberMarker {
|
|
parts = parts[i:]
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(parts) > p.MaxParts {
|
|
res.IsTruncated = true
|
|
parts = parts[:p.MaxParts]
|
|
}
|
|
|
|
res.NextPartNumberMarker = parts[len(parts)-1].PartNumber
|
|
res.Parts = parts
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
type PartsInfo map[int][]*data.PartInfoExtended
|
|
|
|
func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended {
|
|
parts := p[part]
|
|
|
|
for i, info := range parts {
|
|
if info.GetETag(md5Enabled) == etag {
|
|
p[part] = append(parts[:i], parts[i+1:]...)
|
|
return info
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) {
|
|
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
|
|
if err != nil {
|
|
if errors.Is(err, ErrNodeNotFound) {
|
|
return nil, nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchUpload), err.Error())
|
|
}
|
|
return nil, nil, err
|
|
}
|
|
|
|
parts, err := n.treeService.GetParts(ctx, p.Bkt, multipartInfo.ID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
res := make(map[int][]*data.PartInfoExtended, len(parts))
|
|
partsNumbers := make([]int, len(parts))
|
|
oids := make([]string, len(parts))
|
|
for i, part := range parts {
|
|
res[part.Number] = append(res[part.Number], part)
|
|
partsNumbers[i] = part.Number
|
|
oids[i] = part.OID.EncodeToString()
|
|
}
|
|
|
|
n.reqLogger(ctx).Debug(logs.PartDetails,
|
|
zap.Stringer("cid", p.Bkt.CID),
|
|
zap.String("upload id", p.UploadID),
|
|
zap.Ints("part numbers", partsNumbers),
|
|
zap.Strings("oids", oids))
|
|
|
|
return multipartInfo, res, nil
|
|
}
|
|
|
|
func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo {
|
|
var res []*UploadInfo
|
|
if len(uploads) != 0 && uploads[len(uploads)-1].Key < key {
|
|
return res
|
|
}
|
|
|
|
for _, obj := range uploads {
|
|
if obj.Key >= key && obj.UploadID > id {
|
|
res = append(res, obj)
|
|
}
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
|
|
var result []*UploadInfo
|
|
if len(objects) != 0 && objects[len(objects)-1].Key <= key {
|
|
return result
|
|
}
|
|
for i, obj := range objects {
|
|
if obj.Key > key {
|
|
result = objects[i:]
|
|
break
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo {
|
|
var isDir bool
|
|
key := uploadInfo.Key
|
|
|
|
if !strings.HasPrefix(key, prefix) {
|
|
return nil
|
|
}
|
|
|
|
if len(delimiter) > 0 {
|
|
tail := strings.TrimPrefix(key, prefix)
|
|
index := strings.Index(tail, delimiter)
|
|
if index >= 0 {
|
|
isDir = true
|
|
key = prefix + tail[:index+1]
|
|
}
|
|
}
|
|
|
|
return &UploadInfo{
|
|
IsDir: isDir,
|
|
Key: key,
|
|
UploadID: uploadInfo.UploadID,
|
|
Owner: uploadInfo.Owner,
|
|
Created: uploadInfo.Created,
|
|
}
|
|
}
|