frostfs-s3-gw/api/handler/multipart_upload.go
Nikita Zinkevich 6970f52d2d
Some checks failed
/ DCO (pull_request) Successful in 1m18s
/ Vulncheck (pull_request) Successful in 1m32s
/ Builds (pull_request) Successful in 1m25s
/ Lint (pull_request) Failing after 1m5s
/ Tests (pull_request) Successful in 1m29s
[#469] List multipart uploads streaming
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
2024-11-01 09:44:03 +03:00

700 lines
21 KiB
Go

package handler
import (
"encoding/xml"
"fmt"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"github.com/google/uuid"
"go.uber.org/zap"
)
type (
InitiateMultipartUploadResponse struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult" json:"-"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
}
CompleteMultipartUploadResponse struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
ETag string `xml:"ETag"`
Location string `xml:"Location"`
}
ListMultipartUploadsResponse struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult" json:"-"`
Bucket string `xml:"Bucket"`
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"`
Delimiter string `xml:"Delimiter,omitempty"`
EncodingType string `xml:"EncodingType,omitempty"`
IsTruncated bool `xml:"IsTruncated"`
KeyMarker string `xml:"KeyMarker"`
MaxUploads int `xml:"MaxUploads"`
NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
NextUploadIDMarker string `xml:"NextUploadIdMarker,omitempty"`
Prefix string `xml:"Prefix"`
Uploads []MultipartUpload `xml:"Upload"`
UploadIDMarker string `xml:"UploadIdMarker,omitempty"`
}
ListPartsResponse struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListPartsResult" json:"-"`
Bucket string `xml:"Bucket"`
Initiator Initiator `xml:"Initiator"`
IsTruncated bool `xml:"IsTruncated"`
Key string `xml:"Key"`
MaxParts int `xml:"MaxParts"`
NextPartNumberMarker int `xml:"NextPartNumberMarker"`
Owner Owner `xml:"Owner"`
Parts []*layer.Part `xml:"Part"`
PartNumberMarker int `xml:"PartNumberMarker"`
StorageClass string `xml:"StorageClass"`
UploadID string `xml:"UploadId"`
}
MultipartUpload struct {
Initiated string `xml:"Initiated"`
Initiator Initiator `xml:"Initiator"`
Key string `xml:"Key"`
Owner Owner `xml:"Owner"`
StorageClass string `xml:"StorageClass"`
UploadID string `xml:"UploadId"`
}
Initiator struct {
ID string `xml:"ID"`
DisplayName string `xml:"DisplayName"`
}
CompleteMultipartUpload struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUpload"`
Parts []*layer.CompletedPart `xml:"Part"`
}
UploadPartCopyResponse struct {
ETag string `xml:"ETag"`
LastModified string `xml:"LastModified"`
}
)
const (
uploadIDHeaderName = "uploadId"
partNumberHeaderName = "partNumber"
prefixQueryName = "prefix"
delimiterQueryName = "delimiter"
maxUploadsQueryName = "max-uploads"
encodingTypeQueryName = "encoding-type"
keyMarkerQueryName = "key-marker"
uploadIDMarkerQueryName = "upload-id-marker"
)
func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := middleware.GetReqInfo(r.Context())
uploadID := uuid.New()
cannedACLStatus := aclHeadersStatus(r)
additional := []zap.Field{zap.String("uploadID", uploadID.String())}
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
if cannedACLStatus == aclStatusYes {
h.logAndSendError(w, "acl not supported for this bucket", reqInfo, errors.GetAPIError(errors.ErrAccessControlListNotSupported))
return
}
p := &layer.CreateMultipartParams{
Info: &layer.UploadInfoParams{
UploadID: uploadID.String(),
Bkt: bktInfo,
Key: reqInfo.ObjectName,
},
Data: &layer.UploadData{},
}
if len(r.Header.Get(api.AmzTagging)) > 0 {
p.Data.TagSet, err = parseTaggingHeader(r.Header)
if err != nil {
h.logAndSendError(w, "could not parse tagging", reqInfo, err, additional...)
return
}
}
p.Info.Encryption, err = formEncryptionParams(r)
if err != nil {
h.logAndSendError(w, "invalid sse headers", reqInfo, err, additional...)
return
}
p.Header = parseMetadata(r)
if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 {
p.Header[api.ContentType] = contentType
}
if contentLanguage := r.Header.Get(api.ContentLanguage); len(contentLanguage) > 0 {
p.Header[api.ContentLanguage] = contentLanguage
}
p.CopiesNumbers, err = h.pickCopiesNumbers(p.Header, reqInfo.Namespace, bktInfo.LocationConstraint)
if err != nil {
h.logAndSendError(w, "invalid copies number", reqInfo, err, additional...)
return
}
if err = h.obj.CreateMultipartUpload(r.Context(), p); err != nil {
h.logAndSendError(w, "could create multipart upload", reqInfo, err, additional...)
return
}
if p.Info.Encryption.Enabled() {
addSSECHeaders(w.Header(), r.Header)
}
resp := InitiateMultipartUploadResponse{
Bucket: reqInfo.BucketName,
Key: reqInfo.ObjectName,
UploadID: uploadID.String(),
}
if err = middleware.EncodeToResponse(w, resp); err != nil {
h.logAndSendError(w, "could not encode InitiateMultipartUploadResponse to response", reqInfo, err, additional...)
return
}
}
func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := middleware.GetReqInfo(r.Context())
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
var (
queryValues = r.URL.Query()
uploadID = queryValues.Get(uploadIDHeaderName)
partNumStr = queryValues.Get(partNumberHeaderName)
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("partNumber", partNumStr)}
)
partNumber, err := strconv.Atoi(partNumStr)
if err != nil || partNumber < layer.UploadMinPartNumber || partNumber > layer.UploadMaxPartNumber {
h.logAndSendError(w, "invalid part number", reqInfo, errors.GetAPIError(errors.ErrInvalidPartNumber), additional...)
return
}
body, err := h.getBodyReader(r)
if err != nil {
h.logAndSendError(w, "failed to get body reader", reqInfo, err, additional...)
return
}
size := h.getPutPayloadSize(r)
p := &layer.UploadPartParams{
Info: &layer.UploadInfoParams{
UploadID: uploadID,
Bkt: bktInfo,
Key: reqInfo.ObjectName,
},
PartNumber: partNumber,
Size: size,
Reader: body,
ContentMD5: r.Header.Get(api.ContentMD5),
ContentSHA256Hash: r.Header.Get(api.AmzContentSha256),
}
p.Info.Encryption, err = formEncryptionParams(r)
if err != nil {
h.logAndSendError(w, "invalid sse headers", reqInfo, err, additional...)
return
}
hash, err := h.obj.UploadPart(r.Context(), p)
if err != nil {
h.logAndSendError(w, "could not upload a part", reqInfo, err, additional...)
return
}
if p.Info.Encryption.Enabled() {
addSSECHeaders(w.Header(), r.Header)
}
w.Header().Set(api.ETag, data.Quote(hash))
if err = middleware.WriteSuccessResponseHeadersOnly(w); err != nil {
h.logAndSendError(w, "write response", reqInfo, err)
return
}
}
func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
var (
versionID string
ctx = r.Context()
reqInfo = middleware.GetReqInfo(ctx)
queryValues = reqInfo.URL.Query()
uploadID = queryValues.Get(uploadIDHeaderName)
partNumStr = queryValues.Get(partNumberHeaderName)
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("partNumber", partNumStr)}
)
partNumber, err := strconv.Atoi(partNumStr)
if err != nil || partNumber < layer.UploadMinPartNumber || partNumber > layer.UploadMaxPartNumber {
h.logAndSendError(w, "invalid part number", reqInfo, errors.GetAPIError(errors.ErrInvalidPartNumber), additional...)
return
}
src := r.Header.Get(api.AmzCopySource)
if u, err := url.Parse(src); err == nil {
versionID = u.Query().Get(api.QueryVersionID)
src = u.Path
}
srcBucket, srcObject, err := path2BucketObject(src)
if err != nil {
h.logAndSendError(w, "invalid source copy", reqInfo, err, additional...)
return
}
srcRange, err := parseRange(r.Header.Get(api.AmzCopySourceRange))
if err != nil {
h.logAndSendError(w, "could not parse copy range", reqInfo,
errors.GetAPIError(errors.ErrInvalidCopyPartRange), additional...)
return
}
srcBktInfo, err := h.getBucketAndCheckOwner(r, srcBucket, api.AmzSourceExpectedBucketOwner)
if err != nil {
h.logAndSendError(w, "could not get source bucket info", reqInfo, err, additional...)
return
}
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get target bucket info", reqInfo, err, additional...)
return
}
headPrm := &layer.HeadObjectParams{
BktInfo: srcBktInfo,
Object: srcObject,
VersionID: versionID,
}
srcInfo, err := h.obj.GetObjectInfo(ctx, headPrm)
if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" {
h.logAndSendError(w, "could not head source object version", reqInfo,
errors.GetAPIError(errors.ErrBadRequest), additional...)
return
}
h.logAndSendError(w, "could not head source object", reqInfo, err, additional...)
return
}
args, err := parseCopyObjectArgs(r.Header)
if err != nil {
h.logAndSendError(w, "could not parse copy object args", reqInfo,
errors.GetAPIError(errors.ErrInvalidCopyPartRange), additional...)
return
}
if err = checkPreconditions(srcInfo, args.Conditional, h.cfg.MD5Enabled()); err != nil {
h.logAndSendError(w, "precondition failed", reqInfo, errors.GetAPIError(errors.ErrPreconditionFailed),
additional...)
return
}
srcEncryptionParams, err := formCopySourceEncryptionParams(r)
if err != nil {
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
return
}
if err = srcEncryptionParams.MatchObjectEncryption(layer.FormEncryptionInfo(srcInfo.Headers)); err != nil {
h.logAndSendError(w, "encryption doesn't match object", reqInfo, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrBadRequest), err), additional...)
return
}
p := &layer.UploadCopyParams{
Versioned: headPrm.Versioned(),
Info: &layer.UploadInfoParams{
UploadID: uploadID,
Bkt: bktInfo,
Key: reqInfo.ObjectName,
},
SrcObjInfo: srcInfo,
SrcBktInfo: srcBktInfo,
SrcEncryption: srcEncryptionParams,
PartNumber: partNumber,
Range: srcRange,
}
p.Info.Encryption, err = formEncryptionParams(r)
if err != nil {
h.logAndSendError(w, "invalid sse headers", reqInfo, err, additional...)
return
}
info, err := h.obj.UploadPartCopy(ctx, p)
if err != nil {
h.logAndSendError(w, "could not upload part copy", reqInfo, err, additional...)
return
}
response := UploadPartCopyResponse{
LastModified: info.Created.UTC().Format(time.RFC3339),
ETag: data.Quote(info.ETag(h.cfg.MD5Enabled())),
}
if p.Info.Encryption.Enabled() {
addSSECHeaders(w.Header(), r.Header)
}
if err = middleware.EncodeToResponse(w, response); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err, additional...)
}
}
func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := middleware.GetReqInfo(r.Context())
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
return
}
var (
uploadID = r.URL.Query().Get(uploadIDHeaderName)
uploadInfo = &layer.UploadInfoParams{
UploadID: uploadID,
Bkt: bktInfo,
Key: reqInfo.ObjectName,
}
additional = []zap.Field{zap.String("uploadID", uploadID)}
)
reqBody := new(CompleteMultipartUpload)
if err = h.cfg.NewXMLDecoder(r.Body).Decode(reqBody); err != nil {
h.logAndSendError(w, "could not read complete multipart upload xml", reqInfo,
fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrMalformedXML), err.Error()), additional...)
return
}
if len(reqBody.Parts) == 0 {
h.logAndSendError(w, "invalid xml with parts", reqInfo, errors.GetAPIError(errors.ErrMalformedXML), additional...)
return
}
c := &layer.CompleteMultipartParams{
Info: uploadInfo,
Parts: reqBody.Parts,
}
// Start complete multipart upload which may take some time to fetch object
// and re-upload it part by part.
objInfo, err := h.completeMultipartUpload(r, c, bktInfo)
if err != nil {
h.logAndSendError(w, "complete multipart error", reqInfo, err, additional...)
return
}
response := CompleteMultipartUploadResponse{
Bucket: objInfo.Bucket,
Key: objInfo.Name,
ETag: data.Quote(objInfo.ETag(h.cfg.MD5Enabled())),
Location: getObjectLocation(r, reqInfo.BucketName, reqInfo.ObjectName, reqInfo.RequestVHSEnabled),
}
if settings.VersioningEnabled() {
w.Header().Set(api.AmzVersionID, objInfo.VersionID())
}
if err = middleware.EncodeToResponse(w, response); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err, additional...)
}
}
// returns "https" if the tls boolean is true, "http" otherwise.
func getURLScheme(r *http.Request) string {
if r.TLS != nil {
return "https"
}
return "http"
}
// getObjectLocation gets the fully qualified URL of an object.
func getObjectLocation(r *http.Request, bucket, object string, vhsEnabled bool) string {
proto := middleware.GetSourceScheme(r)
if proto == "" {
proto = getURLScheme(r)
}
u := &url.URL{
Host: r.Host,
Path: path.Join("/", bucket, object),
Scheme: proto,
}
// If vhs enabled then we need to use bucket DNS style.
if vhsEnabled {
u.Path = path.Join("/", object)
}
return u.String()
}
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo) (*data.ObjectInfo, error) {
ctx := r.Context()
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c)
if err != nil {
return nil, fmt.Errorf("could not complete multipart upload: %w", err)
}
objInfo := extendedObjInfo.ObjectInfo
if len(uploadData.TagSet) != 0 {
tagPrm := &data.PutObjectTaggingParams{
ObjectVersion: &data.ObjectVersion{
BktInfo: bktInfo,
ObjectName: objInfo.Name,
VersionID: objInfo.VersionID(),
},
TagSet: uploadData.TagSet,
NodeVersion: extendedObjInfo.NodeVersion,
}
if err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err)
}
}
return objInfo, nil
}
func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := middleware.GetReqInfo(r.Context())
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
var (
queryValues = reqInfo.URL.Query()
maxUploadsStr = queryValues.Get(maxUploadsQueryName)
maxUploads = layer.MaxSizeUploadsList
)
if maxUploadsStr != "" {
val, err := strconv.Atoi(maxUploadsStr)
if err != nil || val < 1 || val > maxObjectList {
h.logAndSendError(w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
return
}
maxUploads = val
}
p := &layer.ListMultipartUploadsParams{
Bkt: bktInfo,
Delimiter: queryValues.Get(delimiterQueryName),
EncodingType: queryValues.Get(encodingTypeQueryName),
KeyMarker: queryValues.Get(keyMarkerQueryName),
MaxUploads: maxUploads,
Prefix: queryValues.Get(prefixQueryName),
UploadIDMarker: queryValues.Get(uploadIDMarkerQueryName),
}
if p.EncodingType != "" && strings.ToLower(p.EncodingType) != urlEncodingType {
h.logAndSendError(w, "invalid encoding type", reqInfo, errors.GetAPIError(errors.ErrInvalidEncodingMethod))
return
}
list, err := h.obj.ListMultipartUploads(r.Context(), p)
if err != nil {
h.logAndSendError(w, "could not list multipart uploads", reqInfo, err)
return
}
if err = middleware.EncodeToResponse(w, encodeListMultipartUploadsToResponse(list, p)); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err)
}
}
func (h *handler) ListPartsHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := middleware.GetReqInfo(r.Context())
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
var (
partNumberMarker int
queryValues = reqInfo.URL.Query()
uploadID = queryValues.Get(uploadIDHeaderName)
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
maxParts = layer.MaxSizePartsList
)
if queryValues.Get("max-parts") != "" {
val, err := strconv.Atoi(queryValues.Get("max-parts"))
if err != nil || val < 0 {
h.logAndSendError(w, "invalid MaxParts", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxParts), additional...)
return
}
if val < layer.MaxSizePartsList {
maxParts = val
}
}
if queryValues.Get("part-number-marker") != "" {
if partNumberMarker, err = strconv.Atoi(queryValues.Get("part-number-marker")); err != nil || partNumberMarker < 0 {
h.logAndSendError(w, "invalid PartNumberMarker", reqInfo, err, additional...)
return
}
}
p := &layer.ListPartsParams{
Info: &layer.UploadInfoParams{
UploadID: uploadID,
Bkt: bktInfo,
Key: reqInfo.ObjectName,
},
MaxParts: maxParts,
PartNumberMarker: partNumberMarker,
}
p.Info.Encryption, err = formEncryptionParams(r)
if err != nil {
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
return
}
list, err := h.obj.ListParts(r.Context(), p)
if err != nil {
h.logAndSendError(w, "could not list parts", reqInfo, err, additional...)
return
}
if err = middleware.EncodeToResponse(w, encodeListPartsToResponse(list, p)); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err)
}
}
func (h *handler) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := middleware.GetReqInfo(r.Context())
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
uploadID := reqInfo.URL.Query().Get(uploadIDHeaderName)
additional := []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
p := &layer.UploadInfoParams{
UploadID: uploadID,
Bkt: bktInfo,
Key: reqInfo.ObjectName,
}
p.Encryption, err = formEncryptionParams(r)
if err != nil {
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
return
}
if err = h.obj.AbortMultipartUpload(r.Context(), p); err != nil {
h.logAndSendError(w, "could not abort multipart upload", reqInfo, err, additional...)
return
}
w.WriteHeader(http.StatusNoContent)
}
func encodeListMultipartUploadsToResponse(info *layer.ListMultipartUploadsInfo, params *layer.ListMultipartUploadsParams) *ListMultipartUploadsResponse {
res := ListMultipartUploadsResponse{
Bucket: params.Bkt.Name,
CommonPrefixes: fillPrefixes(info.Prefixes, params.EncodingType),
Delimiter: s3PathEncode(params.Delimiter, params.EncodingType),
EncodingType: params.EncodingType,
IsTruncated: info.IsTruncated,
KeyMarker: s3PathEncode(params.KeyMarker, params.EncodingType),
MaxUploads: params.MaxUploads,
NextKeyMarker: s3PathEncode(info.NextKeyMarker, params.EncodingType),
NextUploadIDMarker: info.NextUploadIDMarker,
Prefix: s3PathEncode(params.Prefix, params.EncodingType),
UploadIDMarker: params.UploadIDMarker,
}
uploads := make([]MultipartUpload, 0, len(info.Uploads))
for _, u := range info.Uploads {
m := MultipartUpload{
Initiated: u.Created.UTC().Format(time.RFC3339),
Initiator: Initiator{
ID: u.Owner.String(),
DisplayName: u.Owner.String(),
},
Key: s3PathEncode(u.Key, params.EncodingType),
Owner: Owner{
ID: u.Owner.String(),
DisplayName: u.Owner.String(),
},
UploadID: u.UploadID,
StorageClass: api.DefaultStorageClass,
}
uploads = append(uploads, m)
}
res.Uploads = uploads
return &res
}
func encodeListPartsToResponse(info *layer.ListPartsInfo, params *layer.ListPartsParams) *ListPartsResponse {
return &ListPartsResponse{
XMLName: xml.Name{},
Bucket: params.Info.Bkt.Name,
Initiator: Initiator{
ID: info.Owner.String(),
DisplayName: info.Owner.String(),
},
IsTruncated: info.IsTruncated,
Key: params.Info.Key,
MaxParts: params.MaxParts,
NextPartNumberMarker: info.NextPartNumberMarker,
Owner: Owner{
ID: info.Owner.String(),
DisplayName: info.Owner.String(),
},
PartNumberMarker: params.PartNumberMarker,
UploadID: params.Info.UploadID,
Parts: info.Parts,
StorageClass: api.DefaultStorageClass,
}
}