frostfs-s3-gw/api/layer/multipart_upload.go
Roman Loginov 6f9ee3da76
All checks were successful
/ Vulncheck (pull_request) Successful in 1m13s
/ DCO (pull_request) Successful in 1m43s
/ Builds (1.20) (pull_request) Successful in 2m49s
/ Builds (1.21) (pull_request) Successful in 1m36s
/ Lint (pull_request) Successful in 3m33s
/ Tests (1.20) (pull_request) Successful in 2m34s
/ Tests (1.21) (pull_request) Successful in 2m29s
[#275] Change logic delete multipart upload
In order not to accidentally take outdated
information about downloaded parts from other
nodes, now when the multipart is abort or complete,
the root node of the multipart upload with the
finish flag remains in the tree.

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
2023-12-27 13:06:45 +03:00

711 lines
20 KiB
Go

package layer
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"sort"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/minio/sio"
"go.uber.org/zap"
)
const (
UploadIDAttributeName = "S3-Upload-Id"
UploadPartNumberAttributeName = "S3-Upload-Part-Number"
UploadCompletedParts = "S3-Completed-Parts"
// MultipartObjectSize contains the real object size if object is combined (payload contains list of parts).
// This header is used to determine if object is combined.
MultipartObjectSize = "S3-Multipart-Object-Size"
metaPrefix = "meta-"
aclPrefix = "acl-"
MaxSizeUploadsList = 1000
MaxSizePartsList = 1000
UploadMinPartNumber = 1
UploadMaxPartNumber = 10000
UploadMinSize = 5 * 1024 * 1024 // 5MB
UploadMaxSize = 1024 * UploadMinSize // 5GB
)
type (
UploadInfoParams struct {
UploadID string
Bkt *data.BucketInfo
Key string
Encryption encryption.Params
}
CreateMultipartParams struct {
Info *UploadInfoParams
Header map[string]string
Data *UploadData
CopiesNumbers []uint32
}
UploadData struct {
TagSet map[string]string
ACLHeaders map[string]string
}
UploadPartParams struct {
Info *UploadInfoParams
PartNumber int
Size uint64
Reader io.Reader
ContentMD5 string
ContentSHA256Hash string
}
UploadCopyParams struct {
Versioned bool
Info *UploadInfoParams
SrcObjInfo *data.ObjectInfo
SrcBktInfo *data.BucketInfo
SrcEncryption encryption.Params
PartNumber int
Range *RangeParams
}
CompleteMultipartParams struct {
Info *UploadInfoParams
Parts []*CompletedPart
}
CompletedPart struct {
ETag string
PartNumber int
}
EncryptedPart struct {
Part
EncryptedSize int64
}
Part struct {
ETag string
LastModified string
PartNumber int
Size uint64
}
ListMultipartUploadsParams struct {
Bkt *data.BucketInfo
Delimiter string
EncodingType string
KeyMarker string
MaxUploads int
Prefix string
UploadIDMarker string
}
ListPartsParams struct {
Info *UploadInfoParams
MaxParts int
PartNumberMarker int
}
ListPartsInfo struct {
Parts []*Part
Owner user.ID
NextPartNumberMarker int
IsTruncated bool
}
ListMultipartUploadsInfo struct {
Prefixes []string
Uploads []*UploadInfo
IsTruncated bool
NextKeyMarker string
NextUploadIDMarker string
}
UploadInfo struct {
IsDir bool
Key string
UploadID string
Owner user.ID
Created time.Time
}
)
func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error {
metaSize := len(p.Header)
if p.Data != nil {
metaSize += len(p.Data.ACLHeaders)
metaSize += len(p.Data.TagSet)
}
info := &data.MultipartInfo{
Key: p.Info.Key,
UploadID: p.Info.UploadID,
Owner: n.gateOwner,
Created: TimeNow(ctx),
Meta: make(map[string]string, metaSize),
CopiesNumbers: p.CopiesNumbers,
}
for key, val := range p.Header {
info.Meta[metaPrefix+key] = val
}
if p.Data != nil {
for key, val := range p.Data.ACLHeaders {
info.Meta[aclPrefix+key] = val
}
for key, val := range p.Data.TagSet {
info.Meta[tagPrefix+key] = val
}
}
if p.Info.Encryption.Enabled() {
if err := addEncryptionHeaders(info.Meta, p.Info.Encryption); err != nil {
return fmt.Errorf("add encryption header: %w", err)
}
}
return n.treeService.CreateMultipartUpload(ctx, p.Info.Bkt, info)
}
func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return "", fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchUpload), err.Error())
}
return "", err
}
if p.Size > UploadMaxSize {
return "", fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), p.Size, UploadMaxSize)
}
objInfo, err := n.uploadPart(ctx, multipartInfo, p)
if err != nil {
return "", err
}
return objInfo.ETag(n.features.MD5Enabled()), nil
}
func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
encInfo := FormEncryptionInfo(multipartInfo.Meta)
if err := p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil {
n.reqLogger(ctx).Warn(logs.MismatchedObjEncryptionInfo, zap.Error(err))
return nil, s3errors.GetAPIError(s3errors.ErrInvalidEncryptionParameters)
}
bktInfo := p.Info.Bkt
prm := PrmObjectCreate{
Container: bktInfo.CID,
Attributes: make([][2]string, 2),
Payload: p.Reader,
CreationTime: TimeNow(ctx),
CopiesNumber: multipartInfo.CopiesNumbers,
}
decSize := p.Size
if p.Info.Encryption.Enabled() {
r, encSize, err := encryptionReader(p.Reader, p.Size, p.Info.Encryption.Key())
if err != nil {
return nil, fmt.Errorf("failed to create ecnrypted reader: %w", err)
}
prm.Attributes = append(prm.Attributes, [2]string{AttributeDecryptedSize, strconv.FormatUint(p.Size, 10)})
prm.Payload = r
p.Size = encSize
}
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
if err != nil {
return nil, err
}
if len(p.ContentMD5) > 0 {
hashBytes, err := base64.StdEncoding.DecodeString(p.ContentMD5)
if err != nil {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
}
if hex.EncodeToString(hashBytes) != hex.EncodeToString(md5Hash) {
prm := PrmObjectDelete{
Object: id,
Container: bktInfo.CID,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
err = n.frostFS.DeleteObject(ctx, prm)
if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
}
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
}
}
if p.Info.Encryption.Enabled() {
size = decSize
}
if !p.Info.Encryption.Enabled() && len(p.ContentSHA256Hash) > 0 && !auth.IsStandardContentSHA256(p.ContentSHA256Hash) {
contentHashBytes, err := hex.DecodeString(p.ContentSHA256Hash)
if err != nil {
return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch)
}
if !bytes.Equal(contentHashBytes, hash) {
err = n.objectDelete(ctx, bktInfo, id)
if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
}
return nil, s3errors.GetAPIError(s3errors.ErrContentSHA256Mismatch)
}
}
n.reqLogger(ctx).Debug(logs.UploadPart,
zap.String("multipart upload", p.Info.UploadID), zap.Int("part number", p.PartNumber),
zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
partInfo := &data.PartInfo{
Key: p.Info.Key,
UploadID: p.Info.UploadID,
Number: p.PartNumber,
OID: id,
Size: size,
ETag: hex.EncodeToString(hash),
Created: prm.CreationTime,
MD5: hex.EncodeToString(md5Hash),
}
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !oldPartIDNotFound {
return nil, err
}
if !oldPartIDNotFound {
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
zap.String("cid", bktInfo.CID.EncodeToString()),
zap.String("oid", oldPartID.EncodeToString()))
}
}
objInfo := &data.ObjectInfo{
ID: id,
CID: bktInfo.CID,
Owner: bktInfo.Owner,
Bucket: bktInfo.Name,
Size: partInfo.Size,
Created: partInfo.Created,
HashSum: partInfo.ETag,
MD5Sum: partInfo.MD5,
}
return objInfo, nil
}
func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchUpload), err.Error())
}
return nil, err
}
size := p.SrcObjInfo.Size
srcObjectSize := p.SrcObjInfo.Size
if objSize, err := GetObjectSize(p.SrcObjInfo); err == nil {
srcObjectSize = objSize
size = objSize
}
if p.Range != nil {
size = p.Range.End - p.Range.Start + 1
if p.Range.End > srcObjectSize {
return nil, fmt.Errorf("%w: %d-%d/%d", s3errors.GetAPIError(s3errors.ErrInvalidCopyPartRangeSource), p.Range.Start, p.Range.End, srcObjectSize)
}
}
if size > UploadMaxSize {
return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, UploadMaxSize)
}
objPayload, err := n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObjInfo,
Versioned: p.Versioned,
Range: p.Range,
BucketInfo: p.SrcBktInfo,
Encryption: p.SrcEncryption,
})
if err != nil {
return nil, fmt.Errorf("get object to upload copy: %w", err)
}
params := &UploadPartParams{
Info: p.Info,
PartNumber: p.PartNumber,
Size: size,
Reader: objPayload,
}
return n.uploadPart(ctx, multipartInfo, params)
}
func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) {
for i := 1; i < len(p.Parts); i++ {
if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber {
return nil, nil, s3errors.GetAPIError(s3errors.ErrInvalidPartOrder)
}
}
multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info)
if err != nil {
return nil, nil, err
}
encInfo := FormEncryptionInfo(multipartInfo.Meta)
if len(partsInfo) < len(p.Parts) {
return nil, nil, fmt.Errorf("%w: found %d parts, need %d", s3errors.GetAPIError(s3errors.ErrInvalidPart), len(partsInfo), len(p.Parts))
}
var multipartObjetSize uint64
var encMultipartObjectSize uint64
parts := make([]*data.PartInfo, 0, len(p.Parts))
var completedPartsHeader strings.Builder
md5Hash := md5.New()
for i, part := range p.Parts {
partInfo := partsInfo[part.PartNumber]
if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) {
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
}
delete(partsInfo, part.PartNumber)
// for the last part we have no minimum size limit
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
return nil, nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooSmall), partInfo.Size, UploadMinSize)
}
parts = append(parts, partInfo)
multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted)
if encInfo.Enabled {
encPartSize, err := sio.EncryptedSize(partInfo.Size)
if err != nil {
return nil, nil, fmt.Errorf("compute encrypted size: %w", err)
}
encMultipartObjectSize += encPartSize
}
partInfoStr := partInfo.ToHeaderString()
if i != len(p.Parts)-1 {
partInfoStr += ","
}
if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil {
return nil, nil, err
}
bytesHash, err := hex.DecodeString(partInfo.MD5)
if err != nil {
return nil, nil, fmt.Errorf("couldn't decode MD5 checksum of part: %w", err)
}
md5Hash.Write(bytesHash)
}
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
initMetadata[UploadCompletedParts] = completedPartsHeader.String()
initMetadata[MultipartObjectSize] = strconv.FormatUint(multipartObjetSize, 10)
uploadData := &UploadData{
TagSet: make(map[string]string),
ACLHeaders: make(map[string]string),
}
for key, val := range multipartInfo.Meta {
if strings.HasPrefix(key, metaPrefix) {
initMetadata[strings.TrimPrefix(key, metaPrefix)] = val
} else if strings.HasPrefix(key, tagPrefix) {
uploadData.TagSet[strings.TrimPrefix(key, tagPrefix)] = val
} else if strings.HasPrefix(key, aclPrefix) {
uploadData.ACLHeaders[strings.TrimPrefix(key, aclPrefix)] = val
}
}
if encInfo.Enabled {
initMetadata[AttributeEncryptionAlgorithm] = encInfo.Algorithm
initMetadata[AttributeHMACKey] = encInfo.HMACKey
initMetadata[AttributeHMACSalt] = encInfo.HMACSalt
initMetadata[AttributeDecryptedSize] = strconv.FormatUint(multipartObjetSize, 10)
multipartObjetSize = encMultipartObjectSize
}
partsData, err := json.Marshal(parts)
if err != nil {
return nil, nil, fmt.Errorf("marshal parst for combined object: %w", err)
}
extObjInfo, err := n.PutObject(ctx, &PutObjectParams{
BktInfo: p.Info.Bkt,
Object: p.Info.Key,
Reader: bytes.NewReader(partsData),
Header: initMetadata,
Size: multipartObjetSize,
Encryption: p.Info.Encryption,
CopiesNumbers: multipartInfo.CopiesNumbers,
CompleteMD5Hash: hex.EncodeToString(md5Hash.Sum(nil)) + "-" + strconv.Itoa(len(p.Parts)),
})
if err != nil {
n.reqLogger(ctx).Error(logs.CouldNotPutCompletedObject,
zap.String("uploadID", p.Info.UploadID),
zap.String("uploadKey", p.Info.Key),
zap.Error(err))
return nil, nil, s3errors.GetAPIError(s3errors.ErrInternalError)
}
var addr oid.Address
addr.SetContainer(p.Info.Bkt.CID)
for _, partInfo := range partsInfo {
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
zap.Error(err))
}
addr.SetObject(partInfo.OID)
n.cache.DeleteObject(addr)
}
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
}
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
var result ListMultipartUploadsInfo
if p.MaxUploads == 0 {
return &result, nil
}
multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix)
if err != nil {
return nil, err
}
uploads := make([]*UploadInfo, 0, len(multipartInfos))
uniqDirs := make(map[string]struct{})
for _, multipartInfo := range multipartInfos {
info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter)
if info != nil {
if info.IsDir {
if _, ok := uniqDirs[info.Key]; ok {
continue
}
uniqDirs[info.Key] = struct{}{}
}
uploads = append(uploads, info)
}
}
sort.Slice(uploads, func(i, j int) bool {
if uploads[i].Key == uploads[j].Key {
return uploads[i].UploadID < uploads[j].UploadID
}
return uploads[i].Key < uploads[j].Key
})
if p.KeyMarker != "" {
if p.UploadIDMarker != "" {
uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads)
} else {
uploads = trimAfterUploadKey(p.KeyMarker, uploads)
}
}
if len(uploads) > p.MaxUploads {
result.IsTruncated = true
uploads = uploads[:p.MaxUploads]
result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID
result.NextKeyMarker = uploads[len(uploads)-1].Key
}
for _, ov := range uploads {
if ov.IsDir {
result.Prefixes = append(result.Prefixes, ov.Key)
} else {
result.Uploads = append(result.Uploads, ov)
}
}
return &result, nil
}
func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
multipartInfo, parts, err := n.getUploadParts(ctx, p)
if err != nil {
return err
}
for _, info := range parts {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
}
}
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
}
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
var res ListPartsInfo
multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info)
if err != nil {
return nil, err
}
encInfo := FormEncryptionInfo(multipartInfo.Meta)
if err = p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil {
n.reqLogger(ctx).Warn(logs.MismatchedObjEncryptionInfo, zap.Error(err))
return nil, s3errors.GetAPIError(s3errors.ErrInvalidEncryptionParameters)
}
res.Owner = multipartInfo.Owner
parts := make([]*Part, 0, len(partsInfo))
for _, partInfo := range partsInfo {
parts = append(parts, &Part{
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
LastModified: partInfo.Created.UTC().Format(time.RFC3339),
PartNumber: partInfo.Number,
Size: partInfo.Size,
})
}
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
})
if len(parts) == 0 || p.PartNumberMarker >= parts[len(parts)-1].PartNumber {
res.Parts = make([]*Part, 0)
return &res, nil
}
if p.PartNumberMarker != 0 {
for i, part := range parts {
if part.PartNumber > p.PartNumberMarker {
parts = parts[i:]
break
}
}
}
if len(parts) > p.MaxParts {
res.IsTruncated = true
res.NextPartNumberMarker = parts[p.MaxParts-1].PartNumber
parts = parts[:p.MaxParts]
}
res.Parts = parts
return &res, nil
}
func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchUpload), err.Error())
}
return nil, nil, err
}
parts, err := n.treeService.GetParts(ctx, p.Bkt, multipartInfo.ID)
if err != nil {
return nil, nil, err
}
res := make(map[int]*data.PartInfo, len(parts))
partsNumbers := make([]int, len(parts))
oids := make([]string, len(parts))
for i, part := range parts {
res[part.Number] = part
partsNumbers[i] = part.Number
oids[i] = part.OID.EncodeToString()
}
n.reqLogger(ctx).Debug(logs.PartDetails,
zap.Stringer("cid", p.Bkt.CID),
zap.String("upload id", p.UploadID),
zap.Ints("part numbers", partsNumbers),
zap.Strings("oids", oids))
return multipartInfo, res, nil
}
func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo {
var res []*UploadInfo
if len(uploads) != 0 && uploads[len(uploads)-1].Key < key {
return res
}
for _, obj := range uploads {
if obj.Key >= key && obj.UploadID > id {
res = append(res, obj)
}
}
return res
}
func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
var result []*UploadInfo
if len(objects) != 0 && objects[len(objects)-1].Key <= key {
return result
}
for i, obj := range objects {
if obj.Key > key {
result = objects[i:]
break
}
}
return result
}
func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo {
var isDir bool
key := uploadInfo.Key
if !strings.HasPrefix(key, prefix) {
return nil
}
if len(delimiter) > 0 {
tail := strings.TrimPrefix(key, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
isDir = true
key = prefix + tail[:index+1]
}
}
return &UploadInfo{
IsDir: isDir,
Key: key,
UploadID: uploadInfo.UploadID,
Owner: uploadInfo.Owner,
Created: uploadInfo.Created,
}
}