package layer import ( "bytes" "context" "crypto/md5" "encoding/base64" "encoding/hex" "encoding/json" "errors" "fmt" "io" "sort" "strconv" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/minio/sio" "go.uber.org/zap" ) const ( UploadIDAttributeName = "S3-Upload-Id" UploadPartNumberAttributeName = "S3-Upload-Part-Number" UploadCompletedParts = "S3-Completed-Parts" // MultipartObjectSize contains the real object size if object is combined (payload contains list of parts). // This header is used to determine if object is combined. MultipartObjectSize = "S3-Multipart-Object-Size" metaPrefix = "meta-" MaxSizeUploadsList = 1000 MaxSizePartsList = 1000 UploadMinPartNumber = 1 UploadMaxPartNumber = 10000 UploadMinSize = 5 * 1024 * 1024 // 5MB UploadMaxSize = 1024 * UploadMinSize // 5GB ) type ( UploadInfoParams struct { UploadID string Bkt *data.BucketInfo Key string Encryption encryption.Params } CreateMultipartParams struct { Info *UploadInfoParams Header map[string]string Data *UploadData CopiesNumbers []uint32 } UploadData struct { TagSet map[string]string } UploadPartParams struct { Info *UploadInfoParams PartNumber int Size uint64 Reader io.Reader ContentMD5 string ContentSHA256Hash string } UploadCopyParams struct { Versioned bool Info *UploadInfoParams SrcObjInfo *data.ObjectInfo SrcBktInfo *data.BucketInfo SrcEncryption encryption.Params PartNumber int Range *RangeParams } CompleteMultipartParams struct { Info *UploadInfoParams Parts []*CompletedPart } CompletedPart struct { ETag string PartNumber int } EncryptedPart struct { Part EncryptedSize int64 } Part struct { ETag string LastModified string PartNumber int Size uint64 } ListMultipartUploadsParams struct { Bkt *data.BucketInfo Delimiter string EncodingType string KeyMarker string MaxUploads int Prefix string UploadIDMarker string } ListPartsParams struct { Info *UploadInfoParams MaxParts int PartNumberMarker int } ListPartsInfo struct { Parts []*Part Owner user.ID NextPartNumberMarker int IsTruncated bool } ListMultipartUploadsInfo struct { Prefixes []string Uploads []*UploadInfo IsTruncated bool NextKeyMarker string NextUploadIDMarker string } UploadInfo struct { IsDir bool Key string UploadID string Owner user.ID Created time.Time } ) func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error { metaSize := len(p.Header) if p.Data != nil { metaSize += len(p.Data.TagSet) } networkInfo, err := n.GetNetworkInfo(ctx) if err != nil { return err } info := &data.MultipartInfo{ Key: p.Info.Key, UploadID: p.Info.UploadID, Owner: n.gateOwner, Created: TimeNow(ctx), Meta: make(map[string]string, metaSize), CopiesNumbers: p.CopiesNumbers, CreationEpoch: networkInfo.CurrentEpoch(), } for key, val := range p.Header { info.Meta[metaPrefix+key] = val } if p.Data != nil { for key, val := range p.Data.TagSet { info.Meta[tagPrefix+key] = val } } if p.Info.Encryption.Enabled() { if err := addEncryptionHeaders(info.Meta, p.Info.Encryption); err != nil { return fmt.Errorf("add encryption header: %w", err) } } return n.treeService.CreateMultipartUpload(ctx, p.Info.Bkt, info) } func (n *Layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID) if err != nil { if errors.Is(err, tree.ErrNodeNotFound) { return "", fmt.Errorf("%w: %s", apierr.GetAPIError(apierr.ErrNoSuchUpload), err.Error()) } return "", err } if p.Size > UploadMaxSize { return "", fmt.Errorf("%w: %d/%d", apierr.GetAPIError(apierr.ErrEntityTooLarge), p.Size, UploadMaxSize) } objInfo, err := n.uploadPart(ctx, multipartInfo, p) if err != nil { return "", err } return objInfo.ETag(n.features.MD5Enabled()), nil } func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) { encInfo := FormEncryptionInfo(multipartInfo.Meta) if err := p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil { n.reqLogger(ctx).Warn(logs.MismatchedObjEncryptionInfo, zap.Error(err)) return nil, apierr.GetAPIError(apierr.ErrInvalidEncryptionParameters) } bktInfo := p.Info.Bkt prm := frostfs.PrmObjectCreate{ Container: bktInfo.CID, Attributes: make([][2]string, 2), Payload: p.Reader, CreationTime: TimeNow(ctx), CopiesNumber: multipartInfo.CopiesNumbers, } decSize := p.Size md5Hash := md5.New() if p.Info.Encryption.Enabled() { rr := wrapReader(p.Reader, 64*1024, func(buf []byte) { md5Hash.Write(buf) }) r, encSize, err := encryptionReader(rr, p.Size, p.Info.Encryption.Key()) if err != nil { return nil, fmt.Errorf("failed to create ecnrypted reader: %w", err) } prm.Attributes = append(prm.Attributes, [2]string{AttributeDecryptedSize, strconv.FormatUint(p.Size, 10)}) prm.Payload = r p.Size = encSize } prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber) createdObj, err := n.objectPutAndHash(ctx, prm, bktInfo) if err != nil { return nil, err } if len(p.ContentMD5) > 0 { hashBytes, err := base64.StdEncoding.DecodeString(p.ContentMD5) if err != nil { return nil, apierr.GetAPIError(apierr.ErrInvalidDigest) } match := bytes.Equal(hashBytes, createdObj.MD5Sum) if p.Info.Encryption.Enabled() { match = bytes.Equal(hashBytes, md5Hash.Sum(nil)) } if !match { prm := frostfs.PrmObjectDelete{ Object: createdObj.ID, Container: bktInfo.CID, } n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) err = n.frostFS.DeleteObject(ctx, prm) if err != nil { n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID)) } return nil, apierr.GetAPIError(apierr.ErrInvalidDigest) } } if p.Info.Encryption.Enabled() { createdObj.Size = decSize } if !p.Info.Encryption.Enabled() && len(p.ContentSHA256Hash) > 0 && !auth.IsStandardContentSHA256(p.ContentSHA256Hash) { contentHashBytes, err := hex.DecodeString(p.ContentSHA256Hash) if err != nil { return nil, apierr.GetAPIError(apierr.ErrContentSHA256Mismatch) } if !bytes.Equal(contentHashBytes, createdObj.HashSum) { err = n.objectDelete(ctx, bktInfo, createdObj.ID) if err != nil { n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID)) } return nil, apierr.GetAPIError(apierr.ErrContentSHA256Mismatch) } } n.reqLogger(ctx).Debug(logs.UploadPart, zap.String("multipart upload", p.Info.UploadID), zap.Int("part number", p.PartNumber), zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", createdObj.ID)) partInfo := &data.PartInfo{ Key: p.Info.Key, UploadID: p.Info.UploadID, Number: p.PartNumber, OID: createdObj.ID, Size: createdObj.Size, ETag: hex.EncodeToString(createdObj.HashSum), Created: prm.CreationTime, MD5: hex.EncodeToString(createdObj.MD5Sum), } oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) oldPartIDNotFound := errors.Is(err, tree.ErrNoNodeToRemove) if err != nil && !oldPartIDNotFound { return nil, err } if !oldPartIDNotFound { for _, oldPartID := range oldPartIDs { if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err), zap.String("cid", bktInfo.CID.EncodeToString()), zap.String("oid", oldPartID.EncodeToString())) } } } objInfo := &data.ObjectInfo{ ID: createdObj.ID, CID: bktInfo.CID, Owner: bktInfo.Owner, Bucket: bktInfo.Name, Size: partInfo.Size, Created: partInfo.Created, HashSum: partInfo.ETag, MD5Sum: partInfo.MD5, } return objInfo, nil } func (n *Layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID) if err != nil { if errors.Is(err, tree.ErrNodeNotFound) { return nil, fmt.Errorf("%w: %s", apierr.GetAPIError(apierr.ErrNoSuchUpload), err.Error()) } return nil, err } size := p.SrcObjInfo.Size srcObjectSize := p.SrcObjInfo.Size if objSize, err := GetObjectSize(p.SrcObjInfo); err == nil { srcObjectSize = objSize size = objSize } if p.Range != nil { size = p.Range.End - p.Range.Start + 1 if p.Range.End > srcObjectSize { return nil, fmt.Errorf("%w: %d-%d/%d", apierr.GetAPIError(apierr.ErrInvalidCopyPartRangeSource), p.Range.Start, p.Range.End, srcObjectSize) } } if size > UploadMaxSize { return nil, fmt.Errorf("%w: %d/%d", apierr.GetAPIError(apierr.ErrEntityTooLarge), size, UploadMaxSize) } objPayload, err := n.GetObject(ctx, &GetObjectParams{ ObjectInfo: p.SrcObjInfo, Versioned: p.Versioned, Range: p.Range, BucketInfo: p.SrcBktInfo, Encryption: p.SrcEncryption, }) if err != nil { return nil, fmt.Errorf("get object to upload copy: %w", err) } params := &UploadPartParams{ Info: p.Info, PartNumber: p.PartNumber, Size: size, Reader: objPayload, } return n.uploadPart(ctx, multipartInfo, params) } func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) { for i := 1; i < len(p.Parts); i++ { if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber { return nil, nil, apierr.GetAPIError(apierr.ErrInvalidPartOrder) } } multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info) if err != nil { return nil, nil, err } encInfo := FormEncryptionInfo(multipartInfo.Meta) if len(partsInfo) < len(p.Parts) { return nil, nil, fmt.Errorf("%w: found %d parts, need %d", apierr.GetAPIError(apierr.ErrInvalidPart), len(partsInfo), len(p.Parts)) } var multipartObjetSize uint64 parts := make([]*data.PartInfoExtended, 0, len(p.Parts)) var completedPartsHeader strings.Builder md5Hash := md5.New() for i, part := range p.Parts { partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled()) if partInfo == nil { return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", apierr.GetAPIError(apierr.ErrInvalidPart), part.PartNumber) } // for the last part we have no minimum size limit if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize { return nil, nil, fmt.Errorf("%w: %d/%d", apierr.GetAPIError(apierr.ErrEntityTooSmall), partInfo.Size, UploadMinSize) } parts = append(parts, partInfo) multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted) if encInfo.Enabled { _, err := sio.EncryptedSize(partInfo.Size) if err != nil { return nil, nil, fmt.Errorf("compute encrypted size: %w", err) } } partInfoStr := partInfo.ToHeaderString() if i != len(p.Parts)-1 { partInfoStr += "," } if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil { return nil, nil, err } bytesHash, err := hex.DecodeString(partInfo.MD5) if err != nil { return nil, nil, fmt.Errorf("couldn't decode MD5 checksum of part: %w", err) } md5Hash.Write(bytesHash) } initMetadata := make(map[string]string, len(multipartInfo.Meta)+1) initMetadata[UploadCompletedParts] = completedPartsHeader.String() initMetadata[MultipartObjectSize] = strconv.FormatUint(multipartObjetSize, 10) uploadData := &UploadData{ TagSet: make(map[string]string), } for key, val := range multipartInfo.Meta { if strings.HasPrefix(key, metaPrefix) { initMetadata[strings.TrimPrefix(key, metaPrefix)] = val } else if strings.HasPrefix(key, tagPrefix) { uploadData.TagSet[strings.TrimPrefix(key, tagPrefix)] = val } } if encInfo.Enabled { initMetadata[AttributeEncryptionAlgorithm] = encInfo.Algorithm initMetadata[AttributeHMACKey] = encInfo.HMACKey initMetadata[AttributeHMACSalt] = encInfo.HMACSalt initMetadata[AttributeDecryptedSize] = strconv.FormatUint(multipartObjetSize, 10) } partsData, err := json.Marshal(parts) if err != nil { return nil, nil, fmt.Errorf("marshal parst for combined object: %w", err) } extObjInfo, err := n.PutObject(ctx, &PutObjectParams{ BktInfo: p.Info.Bkt, Object: p.Info.Key, Reader: bytes.NewReader(partsData), Header: initMetadata, Size: &multipartObjetSize, Encryption: p.Info.Encryption, CopiesNumbers: multipartInfo.CopiesNumbers, CompleteMD5Hash: hex.EncodeToString(md5Hash.Sum(nil)) + "-" + strconv.Itoa(len(p.Parts)), }) if err != nil { n.reqLogger(ctx).Error(logs.CouldNotPutCompletedObject, zap.String("uploadID", p.Info.UploadID), zap.String("uploadKey", p.Info.Key), zap.Error(err)) return nil, nil, apierr.GetAPIError(apierr.ErrInternalError) } var addr oid.Address addr.SetContainer(p.Info.Bkt.CID) for _, prts := range partsInfo { for _, partInfo := range prts { if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart, zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID), zap.Error(err)) } addr.SetObject(partInfo.OID) n.cache.DeleteObject(addr) } } return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo) } func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) { var result ListMultipartUploadsInfo if p.MaxUploads == 0 { return &result, nil } multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, p.Bkt, p.Prefix) if err != nil { return nil, err } uploads := make([]*UploadInfo, 0, len(multipartInfos)) uniqDirs := make(map[string]struct{}) for _, multipartInfo := range multipartInfos { info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter) if info != nil { if info.IsDir { if _, ok := uniqDirs[info.Key]; ok { continue } uniqDirs[info.Key] = struct{}{} } uploads = append(uploads, info) } } sort.Slice(uploads, func(i, j int) bool { if uploads[i].Key == uploads[j].Key { return uploads[i].UploadID < uploads[j].UploadID } return uploads[i].Key < uploads[j].Key }) if p.KeyMarker != "" { if p.UploadIDMarker != "" { uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads) } else { uploads = trimAfterUploadKey(p.KeyMarker, uploads) } } if len(uploads) > p.MaxUploads { result.IsTruncated = true uploads = uploads[:p.MaxUploads] result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID result.NextKeyMarker = uploads[len(uploads)-1].Key } for _, ov := range uploads { if ov.IsDir { result.Prefixes = append(result.Prefixes, ov.Key) } else { result.Uploads = append(result.Uploads, ov) } } return &result, nil } func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error { multipartInfo, parts, err := n.getUploadParts(ctx, p) if err != nil { return err } networkInfo, err := n.GetNetworkInfo(ctx) if err != nil { return fmt.Errorf("get network info: %w", err) } n.deleteUploadedParts(ctx, p.Bkt, parts, networkInfo) return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo) } func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, parts PartsInfo, networkInfo netmap.NetworkInfo) { members := make([]oid.ID, 0) tokens := prepareTokensParameter(ctx, bkt.Owner) for _, infos := range parts { for _, info := range infos { oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, info.OID, tokens) if err != nil { n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()), zap.String("oid", info.OID.EncodeToString()), zap.Error(err)) } members = append(members, append(oids, info.OID)...) } } n.putTombstones(ctx, bkt, networkInfo, members) } func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) { var res ListPartsInfo multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info) if err != nil { return nil, err } encInfo := FormEncryptionInfo(multipartInfo.Meta) if err = p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil { n.reqLogger(ctx).Warn(logs.MismatchedObjEncryptionInfo, zap.Error(err)) return nil, apierr.GetAPIError(apierr.ErrInvalidEncryptionParameters) } res.Owner = multipartInfo.Owner parts := make([]*Part, 0, len(partsInfo)) for _, infos := range partsInfo { sort.Slice(infos, func(i, j int) bool { return infos[i].Timestamp < infos[j].Timestamp }) partInfo := infos[len(infos)-1] parts = append(parts, &Part{ ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())), LastModified: partInfo.Created.UTC().Format(time.RFC3339), PartNumber: partInfo.Number, Size: partInfo.Size, }) } sort.Slice(parts, func(i, j int) bool { return parts[i].PartNumber < parts[j].PartNumber }) if len(parts) == 0 || p.PartNumberMarker >= parts[len(parts)-1].PartNumber { res.Parts = make([]*Part, 0) return &res, nil } if p.PartNumberMarker != 0 { for i, part := range parts { if part.PartNumber > p.PartNumberMarker { parts = parts[i:] break } } } if len(parts) > p.MaxParts { res.IsTruncated = true parts = parts[:p.MaxParts] } res.NextPartNumberMarker = parts[len(parts)-1].PartNumber res.Parts = parts return &res, nil } type PartsInfo map[int][]*data.PartInfoExtended func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended { parts := p[part] for i, info := range parts { if info.GetETag(md5Enabled) == etag { p[part] = append(parts[:i], parts[i+1:]...) return info } } return nil } func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID) if err != nil { if errors.Is(err, tree.ErrNodeNotFound) { return nil, nil, fmt.Errorf("%w: %s", apierr.GetAPIError(apierr.ErrNoSuchUpload), err.Error()) } return nil, nil, err } parts, err := n.treeService.GetParts(ctx, p.Bkt, multipartInfo.ID) if err != nil { return nil, nil, err } res := make(map[int][]*data.PartInfoExtended, len(parts)) partsNumbers := make([]int, len(parts)) oids := make([]string, len(parts)) for i, part := range parts { res[part.Number] = append(res[part.Number], part) partsNumbers[i] = part.Number oids[i] = part.OID.EncodeToString() } n.reqLogger(ctx).Debug(logs.PartDetails, zap.Stringer("cid", p.Bkt.CID), zap.String("upload id", p.UploadID), zap.Ints("part numbers", partsNumbers), zap.Strings("oids", oids)) return multipartInfo, res, nil } func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo { var res []*UploadInfo if len(uploads) != 0 && uploads[len(uploads)-1].Key < key { return res } for _, obj := range uploads { if obj.Key >= key && obj.UploadID > id { res = append(res, obj) } } return res } func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo { var result []*UploadInfo if len(objects) != 0 && objects[len(objects)-1].Key <= key { return result } for i, obj := range objects { if obj.Key > key { result = objects[i:] break } } return result } func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo { var isDir bool key := uploadInfo.Key if !strings.HasPrefix(key, prefix) { return nil } if len(delimiter) > 0 { tail := strings.TrimPrefix(key, prefix) index := strings.Index(tail, delimiter) if index >= 0 { isDir = true key = prefix + tail[:index+1] } } return &UploadInfo{ IsDir: isDir, Key: key, UploadID: uploadInfo.UploadID, Owner: uploadInfo.Owner, Created: uploadInfo.Created, } }