778 lines
22 KiB
778 lines
22 KiB
package layer
import (
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
const (
UploadIDAttributeName = "S3-Upload-Id"
UploadPartNumberAttributeName = "S3-Upload-Part-Number"
UploadCompletedParts = "S3-Completed-Parts"
// MultipartObjectSize contains the real object size if object is combined (payload contains list of parts).
// This header is used to determine if object is combined.
MultipartObjectSize = "S3-Multipart-Object-Size"
metaPrefix = "meta-"
MaxSizeUploadsList = 1000
MaxSizePartsList = 1000
UploadMinPartNumber = 1
UploadMaxPartNumber = 10000
UploadMinSize = 5 * 1024 * 1024 // 5MB
UploadMaxSize = 1024 * UploadMinSize // 5GB
type (
UploadInfoParams struct {
UploadID string
Bkt *data.BucketInfo
Key string
Encryption encryption.Params
CreateMultipartParams struct {
Info *UploadInfoParams
Header map[string]string
Data *UploadData
CopiesNumbers []uint32
UploadData struct {
TagSet map[string]string
UploadPartParams struct {
Info *UploadInfoParams
PartNumber int
Size uint64
Reader io.Reader
ContentMD5 string
ContentSHA256Hash string
UploadCopyParams struct {
Versioned bool
Info *UploadInfoParams
SrcObjInfo *data.ObjectInfo
SrcBktInfo *data.BucketInfo
SrcEncryption encryption.Params
PartNumber int
Range *RangeParams
CompleteMultipartParams struct {
Info *UploadInfoParams
Parts []*CompletedPart
CompletedPart struct {
ETag string
PartNumber int
EncryptedPart struct {
EncryptedSize int64
Part struct {
ETag string
LastModified string
PartNumber int
Size uint64
ListMultipartUploadsParams struct {
Bkt *data.BucketInfo
Delimiter string
EncodingType string
KeyMarker string
MaxUploads int
Prefix string
UploadIDMarker string
ListPartsParams struct {
Info *UploadInfoParams
MaxParts int
PartNumberMarker int
ListPartsInfo struct {
Parts []*Part
Owner user.ID
NextPartNumberMarker int
IsTruncated bool
ListMultipartUploadsInfo struct {
Prefixes []string
Uploads []*UploadInfo
IsTruncated bool
NextKeyMarker string
NextUploadIDMarker string
UploadInfo struct {
IsDir bool
Key string
UploadID string
Owner user.ID
Created time.Time
func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error {
metaSize := len(p.Header)
if p.Data != nil {
metaSize += len(p.Data.TagSet)
networkInfo, err := n.GetNetworkInfo(ctx)
if err != nil {
return err
info := &data.MultipartInfo{
Key: p.Info.Key,
UploadID: p.Info.UploadID,
Owner: n.gateOwner,
Created: TimeNow(ctx),
Meta: make(map[string]string, metaSize),
CopiesNumbers: p.CopiesNumbers,
CreationEpoch: networkInfo.CurrentEpoch(),
for key, val := range p.Header {
info.Meta[metaPrefix+key] = val
if p.Data != nil {
for key, val := range p.Data.TagSet {
info.Meta[tagPrefix+key] = val
if p.Info.Encryption.Enabled() {
if err := addEncryptionHeaders(info.Meta, p.Info.Encryption); err != nil {
return fmt.Errorf("add encryption header: %w", err)
return n.treeService.CreateMultipartUpload(ctx, p.Info.Bkt, info)
func (n *Layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return "", fmt.Errorf("%w: %s", apierr.GetAPIError(apierr.ErrNoSuchUpload), err.Error())
return "", err
if p.Size > UploadMaxSize {
return "", fmt.Errorf("%w: %d/%d", apierr.GetAPIError(apierr.ErrEntityTooLarge), p.Size, UploadMaxSize)
objInfo, err := n.uploadPart(ctx, multipartInfo, p)
if err != nil {
return "", err
return objInfo.ETag(n.features.MD5Enabled()), nil
func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
encInfo := FormEncryptionInfo(multipartInfo.Meta)
if err := p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil {
n.reqLogger(ctx).Warn(logs.MismatchedObjEncryptionInfo, zap.Error(err))
return nil, apierr.GetAPIError(apierr.ErrInvalidEncryptionParameters)
bktInfo := p.Info.Bkt
prm := frostfs.PrmObjectCreate{
Container: bktInfo.CID,
Attributes: make([][2]string, 2),
Payload: p.Reader,
CreationTime: TimeNow(ctx),
CopiesNumber: multipartInfo.CopiesNumbers,
decSize := p.Size
if p.Info.Encryption.Enabled() {
r, encSize, err := encryptionReader(p.Reader, p.Size, p.Info.Encryption.Key())
if err != nil {
return nil, fmt.Errorf("failed to create ecnrypted reader: %w", err)
prm.Attributes = append(prm.Attributes, [2]string{AttributeDecryptedSize, strconv.FormatUint(p.Size, 10)})
prm.Payload = r
p.Size = encSize
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
createdObj, err := n.objectPutAndHash(ctx, prm, bktInfo)
if err != nil {
return nil, err
if len(p.ContentMD5) > 0 {
hashBytes, err := base64.StdEncoding.DecodeString(p.ContentMD5)
if err != nil {
return nil, apierr.GetAPIError(apierr.ErrInvalidDigest)
if hex.EncodeToString(hashBytes) != hex.EncodeToString(createdObj.MD5Sum) {
prm := frostfs.PrmObjectDelete{
Object: createdObj.ID,
Container: bktInfo.CID,
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
err = n.frostFS.DeleteObject(ctx, prm)
if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
return nil, apierr.GetAPIError(apierr.ErrInvalidDigest)
if p.Info.Encryption.Enabled() {
createdObj.Size = decSize
if !p.Info.Encryption.Enabled() && len(p.ContentSHA256Hash) > 0 && !auth.IsStandardContentSHA256(p.ContentSHA256Hash) {
contentHashBytes, err := hex.DecodeString(p.ContentSHA256Hash)
if err != nil {
return nil, apierr.GetAPIError(apierr.ErrContentSHA256Mismatch)
if !bytes.Equal(contentHashBytes, createdObj.HashSum) {
err = n.objectDelete(ctx, bktInfo, createdObj.ID)
if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
return nil, apierr.GetAPIError(apierr.ErrContentSHA256Mismatch)
zap.String("multipart upload", p.Info.UploadID), zap.Int("part number", p.PartNumber),
zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID))
partInfo := &data.PartInfo{
Key: p.Info.Key,
UploadID: p.Info.UploadID,
Number: p.PartNumber,
OID: createdObj.ID,
Size: createdObj.Size,
ETag: hex.EncodeToString(createdObj.HashSum),
Created: prm.CreationTime,
MD5: hex.EncodeToString(createdObj.MD5Sum),
oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
oldPartIDNotFound := errors.Is(err, tree.ErrNoNodeToRemove)
if err != nil && !oldPartIDNotFound {
return nil, err
if !oldPartIDNotFound {
for _, oldPartID := range oldPartIDs {
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
zap.String("cid", bktInfo.CID.EncodeToString()),
zap.String("oid", oldPartID.EncodeToString()))
objInfo := &data.ObjectInfo{
ID: createdObj.ID,
CID: bktInfo.CID,
Owner: bktInfo.Owner,
Bucket: bktInfo.Name,
Size: partInfo.Size,
Created: partInfo.Created,
HashSum: partInfo.ETag,
MD5Sum: partInfo.MD5,
return objInfo, nil
func (n *Layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, fmt.Errorf("%w: %s", apierr.GetAPIError(apierr.ErrNoSuchUpload), err.Error())
return nil, err
size := p.SrcObjInfo.Size
srcObjectSize := p.SrcObjInfo.Size
if objSize, err := GetObjectSize(p.SrcObjInfo); err == nil {
srcObjectSize = objSize
size = objSize
if p.Range != nil {
size = p.Range.End - p.Range.Start + 1
if p.Range.End > srcObjectSize {
return nil, fmt.Errorf("%w: %d-%d/%d", apierr.GetAPIError(apierr.ErrInvalidCopyPartRangeSource), p.Range.Start, p.Range.End, srcObjectSize)
if size > UploadMaxSize {
return nil, fmt.Errorf("%w: %d/%d", apierr.GetAPIError(apierr.ErrEntityTooLarge), size, UploadMaxSize)
objPayload, err := n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObjInfo,
Versioned: p.Versioned,
Range: p.Range,
BucketInfo: p.SrcBktInfo,
Encryption: p.SrcEncryption,
if err != nil {
return nil, fmt.Errorf("get object to upload copy: %w", err)
params := &UploadPartParams{
Info: p.Info,
PartNumber: p.PartNumber,
Size: size,
Reader: objPayload,
return n.uploadPart(ctx, multipartInfo, params)
func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) {
for i := 1; i < len(p.Parts); i++ {
if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber {
return nil, nil, apierr.GetAPIError(apierr.ErrInvalidPartOrder)
multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info)
if err != nil {
return nil, nil, err
encInfo := FormEncryptionInfo(multipartInfo.Meta)
if len(partsInfo) < len(p.Parts) {
return nil, nil, fmt.Errorf("%w: found %d parts, need %d", apierr.GetAPIError(apierr.ErrInvalidPart), len(partsInfo), len(p.Parts))
var multipartObjetSize uint64
var encMultipartObjectSize uint64
parts := make([]*data.PartInfoExtended, 0, len(p.Parts))
var completedPartsHeader strings.Builder
md5Hash := md5.New()
for i, part := range p.Parts {
partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled())
if partInfo == nil {
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", apierr.GetAPIError(apierr.ErrInvalidPart), part.PartNumber)
// for the last part we have no minimum size limit
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
return nil, nil, fmt.Errorf("%w: %d/%d", apierr.GetAPIError(apierr.ErrEntityTooSmall), partInfo.Size, UploadMinSize)
parts = append(parts, partInfo)
multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted)
if encInfo.Enabled {
encPartSize, err := sio.EncryptedSize(partInfo.Size)
if err != nil {
return nil, nil, fmt.Errorf("compute encrypted size: %w", err)
encMultipartObjectSize += encPartSize
partInfoStr := partInfo.ToHeaderString()
if i != len(p.Parts)-1 {
partInfoStr += ","
if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil {
return nil, nil, err
bytesHash, err := hex.DecodeString(partInfo.MD5)
if err != nil {
return nil, nil, fmt.Errorf("couldn't decode MD5 checksum of part: %w", err)
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
initMetadata[UploadCompletedParts] = completedPartsHeader.String()
initMetadata[MultipartObjectSize] = strconv.FormatUint(multipartObjetSize, 10)
uploadData := &UploadData{
TagSet: make(map[string]string),
for key, val := range multipartInfo.Meta {
if strings.HasPrefix(key, metaPrefix) {
initMetadata[strings.TrimPrefix(key, metaPrefix)] = val
} else if strings.HasPrefix(key, tagPrefix) {
uploadData.TagSet[strings.TrimPrefix(key, tagPrefix)] = val
if encInfo.Enabled {
initMetadata[AttributeEncryptionAlgorithm] = encInfo.Algorithm
initMetadata[AttributeHMACKey] = encInfo.HMACKey
initMetadata[AttributeHMACSalt] = encInfo.HMACSalt
initMetadata[AttributeDecryptedSize] = strconv.FormatUint(multipartObjetSize, 10)
multipartObjetSize = encMultipartObjectSize
partsData, err := json.Marshal(parts)
if err != nil {
return nil, nil, fmt.Errorf("marshal parst for combined object: %w", err)
extObjInfo, err := n.PutObject(ctx, &PutObjectParams{
BktInfo: p.Info.Bkt,
Object: p.Info.Key,
Reader: bytes.NewReader(partsData),
Header: initMetadata,
Size: &multipartObjetSize,
Encryption: p.Info.Encryption,
CopiesNumbers: multipartInfo.CopiesNumbers,
CompleteMD5Hash: hex.EncodeToString(md5Hash.Sum(nil)) + "-" + strconv.Itoa(len(p.Parts)),
if err != nil {
zap.String("uploadID", p.Info.UploadID),
zap.String("uploadKey", p.Info.Key),
return nil, nil, apierr.GetAPIError(apierr.ErrInternalError)
var addr oid.Address
for _, prts := range partsInfo {
for _, partInfo := range prts {
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
var result ListMultipartUploadsInfo
if p.MaxUploads == 0 {
return &result, nil
session, err := n.getListMultipartUploadsSession(ctx, p)
if err != nil {
return nil, err
uploads := make([]*UploadInfo, 0, p.MaxUploads)
uniqDirs := make(map[string]struct{})
uploadsCount := 0
if session.Next != nil {
uploads = append(uploads, uploadInfoFromMultipartInfo(session.Next, p.Prefix, p.Delimiter))
isTruncated := true
var last data.MultipartInfo
var info *data.MultipartInfo
for uploadsCount <= p.MaxUploads {
info, err = session.Stream.Next(p.KeyMarker, p.UploadIDMarker)
if err != nil {
if err == io.EOF {
isTruncated = false
n.log.Warn(logs.CouldNotGetMultipartUploadInfo, zap.Error(err))
if uploadsCount == p.MaxUploads-1 {
// last upload from which is used for setting NextKeyMarker and NextUploadIDMarker
last = *info
upload := uploadInfoFromMultipartInfo(info, p.Prefix, p.Delimiter)
if upload != nil {
if upload.IsDir {
if _, ok := uniqDirs[upload.Key]; ok {
uniqDirs[upload.Key] = struct{}{}
uploads = append(uploads, upload)
if isTruncated {
// put to session redundant multipart upload which we read to check for EOF
session.Next = info
uploads = uploads[:p.MaxUploads]
result.IsTruncated = true
result.NextUploadIDMarker = last.UploadID
result.NextKeyMarker = last.Key
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, last.Key, last.UploadID)
n.putListMultipartUploadsSession(ctx, session, cacheKey)
for _, ov := range uploads {
if ov.IsDir {
result.Prefixes = append(result.Prefixes, ov.Key)
} else {
result.Uploads = append(result.Uploads, ov)
return &result, nil
func (n *Layer) putListMultipartUploadsSession(ctx context.Context, session *data.ListMultipartSession, cacheKey cache.ListMultipartSessionKey) {
n.cache.PutListMultipartSession(n.BearerOwner(ctx), cacheKey, session)
func (n *Layer) getListMultipartUploadsSession(ctx context.Context, p *ListMultipartUploadsParams) (session *data.ListMultipartSession, err error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListMultipartSessionCacheKey(p.Bkt.CID, p.Prefix, p.KeyMarker, p.UploadIDMarker)
session = n.cache.GetListMultipartSession(owner, cacheKey)
if session == nil {
ctx, cancel := context.WithCancel(ctx)
session = &data.ListMultipartSession{
CommonSession: data.CommonSession{
Context: ctx,
Cancel: cancel,
} else if !session.Acquired.Swap(true) {
// if
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListMultipartSession(owner, cacheKey)
return session, nil
session.Stream, err = n.treeService.GetMultipartUploadsByPrefix(session.Context, p.Bkt, p.Prefix)
if err != nil {
return nil, err
return session, err
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
multipartInfo, parts, err := n.getUploadParts(ctx, p)
if err != nil {
return err
for _, infos := range parts {
for _, info := range infos {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
var res ListPartsInfo
multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info)
if err != nil {
return nil, err
encInfo := FormEncryptionInfo(multipartInfo.Meta)
if err = p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil {
n.reqLogger(ctx).Warn(logs.MismatchedObjEncryptionInfo, zap.Error(err))
return nil, apierr.GetAPIError(apierr.ErrInvalidEncryptionParameters)
res.Owner = multipartInfo.Owner
parts := make([]*Part, 0, len(partsInfo))
for _, infos := range partsInfo {
sort.Slice(infos, func(i, j int) bool {
return infos[i].Timestamp < infos[j].Timestamp
partInfo := infos[len(infos)-1]
parts = append(parts, &Part{
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
LastModified: partInfo.Created.UTC().Format(time.RFC3339),
PartNumber: partInfo.Number,
Size: partInfo.Size,
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
if len(parts) == 0 || p.PartNumberMarker >= parts[len(parts)-1].PartNumber {
res.Parts = make([]*Part, 0)
return &res, nil
if p.PartNumberMarker != 0 {
for i, part := range parts {
if part.PartNumber > p.PartNumberMarker {
parts = parts[i:]
if len(parts) > p.MaxParts {
res.IsTruncated = true
parts = parts[:p.MaxParts]
res.NextPartNumberMarker = parts[len(parts)-1].PartNumber
res.Parts = parts
return &res, nil
type PartsInfo map[int][]*data.PartInfoExtended
func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended {
parts := p[part]
for i, info := range parts {
if info.GetETag(md5Enabled) == etag {
p[part] = append(parts[:i], parts[i+1:]...)
return info
return nil
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, nil, fmt.Errorf("%w: %s", apierr.GetAPIError(apierr.ErrNoSuchUpload), err.Error())
return nil, nil, err
parts, err := n.treeService.GetParts(ctx, p.Bkt, multipartInfo.ID)
if err != nil {
return nil, nil, err
res := make(map[int][]*data.PartInfoExtended, len(parts))
partsNumbers := make([]int, len(parts))
oids := make([]string, len(parts))
for i, part := range parts {
res[part.Number] = append(res[part.Number], part)
partsNumbers[i] = part.Number
oids[i] = part.OID.EncodeToString()
zap.Stringer("cid", p.Bkt.CID),
zap.String("upload id", p.UploadID),
zap.Ints("part numbers", partsNumbers),
zap.Strings("oids", oids))
return multipartInfo, res, nil
func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo {
var res []*UploadInfo
if len(uploads) != 0 && uploads[len(uploads)-1].Key < key {
return res
for _, obj := range uploads {
if obj.Key >= key && obj.UploadID > id {
res = append(res, obj)
return res
func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
var result []*UploadInfo
if len(objects) != 0 && objects[len(objects)-1].Key <= key {
return result
for i, obj := range objects {
if obj.Key > key {
result = objects[i:]
return result
func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo {
var isDir bool
key := uploadInfo.Key
if !strings.HasPrefix(key, prefix) {
return nil
if len(delimiter) > 0 {
tail := strings.TrimPrefix(key, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
isDir = true
key = prefix + tail[:index+1]
return &UploadInfo{
IsDir: isDir,
Key: key,
UploadID: uploadInfo.UploadID,
Owner: uploadInfo.Owner,
Created: uploadInfo.Created,