package handler import ( "encoding/xml" "fmt" "net/http" "net/url" "strconv" "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "github.com/google/uuid" "go.uber.org/zap" ) type ( InitiateMultipartUploadResponse struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult" json:"-"` Bucket string `xml:"Bucket"` Key string `xml:"Key"` UploadID string `xml:"UploadId"` } CompleteMultipartUploadResponse struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"` Bucket string `xml:"Bucket"` Key string `xml:"Key"` ETag string `xml:"ETag"` } ListMultipartUploadsResponse struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult" json:"-"` Bucket string `xml:"Bucket"` CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"` Delimiter string `xml:"Delimiter,omitempty"` EncodingType string `xml:"EncodingType,omitempty"` IsTruncated bool `xml:"IsTruncated"` KeyMarker string `xml:"KeyMarker"` MaxUploads int `xml:"MaxUploads"` NextKeyMarker string `xml:"NextKeyMarker,omitempty"` NextUploadIDMarker string `xml:"NextUploadIdMarker,omitempty"` Prefix string `xml:"Prefix"` Uploads []MultipartUpload `xml:"Upload"` UploadIDMarker string `xml:"UploadIdMarker,omitempty"` } ListPartsResponse struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListPartsResult" json:"-"` Bucket string `xml:"Bucket"` Initiator Initiator `xml:"Initiator"` IsTruncated bool `xml:"IsTruncated"` Key string `xml:"Key"` MaxParts int `xml:"MaxParts,omitempty"` NextPartNumberMarker int `xml:"NextPartNumberMarker,omitempty"` Owner Owner `xml:"Owner"` Parts []*layer.Part `xml:"Part"` PartNumberMarker int `xml:"PartNumberMarker,omitempty"` StorageClass string `xml:"StorageClass"` UploadID string `xml:"UploadId"` } MultipartUpload struct { Initiated string `xml:"Initiated"` Initiator Initiator `xml:"Initiator"` Key string `xml:"Key"` Owner Owner `xml:"Owner"` StorageClass string `xml:"StorageClass"` UploadID string `xml:"UploadId"` } Initiator struct { ID string `xml:"ID"` DisplayName string `xml:"DisplayName"` } CompleteMultipartUpload struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUpload"` Parts []*layer.CompletedPart `xml:"Part"` } UploadPartCopyResponse struct { ETag string `xml:"ETag"` LastModified string `xml:"LastModified"` } ) const ( uploadIDHeaderName = "uploadId" partNumberHeaderName = "partNumber" prefixQueryName = "prefix" delimiterQueryName = "delimiter" maxUploadsQueryName = "max-uploads" encodingTypeQueryName = "encoding-type" keyMarkerQueryName = "key-marker" uploadIDMarkerQueryName = "upload-id-marker" ) func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { reqInfo := middleware.GetReqInfo(r.Context()) bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName) if err != nil { h.logAndSendError(w, "could not get bucket info", reqInfo, err) return } uploadID := uuid.New() additional := []zap.Field{zap.String("uploadID", uploadID.String())} p := &layer.CreateMultipartParams{ Info: &layer.UploadInfoParams{ UploadID: uploadID.String(), Bkt: bktInfo, Key: reqInfo.ObjectName, }, Data: &layer.UploadData{}, } if containsACLHeaders(r) { key, err := h.bearerTokenIssuerKey(r.Context()) if err != nil { h.logAndSendError(w, "couldn't get gate key", reqInfo, err, additional...) return } if _, err = parseACLHeaders(r.Header, key); err != nil { h.logAndSendError(w, "could not parse acl", reqInfo, err, additional...) return } p.Data.ACLHeaders = formACLHeadersForMultipart(r.Header) } if len(r.Header.Get(api.AmzTagging)) > 0 { p.Data.TagSet, err = parseTaggingHeader(r.Header) if err != nil { h.logAndSendError(w, "could not parse tagging", reqInfo, err, additional...) return } } p.Info.Encryption, err = formEncryptionParams(r) if err != nil { h.logAndSendError(w, "invalid sse headers", reqInfo, err, additional...) return } p.Header = parseMetadata(r) if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 { p.Header[api.ContentType] = contentType } if contentLanguage := r.Header.Get(api.ContentLanguage); len(contentLanguage) > 0 { p.Header[api.ContentLanguage] = contentLanguage } p.CopiesNumbers, err = h.pickCopiesNumbers(p.Header, bktInfo.LocationConstraint) if err != nil { h.logAndSendError(w, "invalid copies number", reqInfo, err, additional...) return } if err = h.obj.CreateMultipartUpload(r.Context(), p); err != nil { h.logAndSendError(w, "could create multipart upload", reqInfo, err, additional...) return } if p.Info.Encryption.Enabled() { addSSECHeaders(w.Header(), r.Header) } resp := InitiateMultipartUploadResponse{ Bucket: reqInfo.BucketName, Key: reqInfo.ObjectName, UploadID: uploadID.String(), } if err = middleware.EncodeToResponse(w, resp); err != nil { h.logAndSendError(w, "could not encode InitiateMultipartUploadResponse to response", reqInfo, err, additional...) return } } func formACLHeadersForMultipart(header http.Header) map[string]string { result := make(map[string]string) if value := header.Get(api.AmzACL); value != "" { result[api.AmzACL] = value } if value := header.Get(api.AmzGrantRead); value != "" { result[api.AmzGrantRead] = value } if value := header.Get(api.AmzGrantFullControl); value != "" { result[api.AmzGrantFullControl] = value } if value := header.Get(api.AmzGrantWrite); value != "" { result[api.AmzGrantWrite] = value } return result } func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) { reqInfo := middleware.GetReqInfo(r.Context()) bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName) if err != nil { h.logAndSendError(w, "could not get bucket info", reqInfo, err) return } var ( queryValues = r.URL.Query() uploadID = queryValues.Get(uploadIDHeaderName) partNumStr = queryValues.Get(partNumberHeaderName) additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("partNumber", partNumStr)} ) partNumber, err := strconv.Atoi(partNumStr) if err != nil || partNumber < layer.UploadMinPartNumber || partNumber > layer.UploadMaxPartNumber { h.logAndSendError(w, "invalid part number", reqInfo, errors.GetAPIError(errors.ErrInvalidPartNumber), additional...) return } body, err := h.getBodyReader(r) if err != nil { h.logAndSendError(w, "failed to get body reader", reqInfo, err, additional...) return } var size uint64 if r.ContentLength > 0 { size = uint64(r.ContentLength) } p := &layer.UploadPartParams{ Info: &layer.UploadInfoParams{ UploadID: uploadID, Bkt: bktInfo, Key: reqInfo.ObjectName, }, PartNumber: partNumber, Size: size, Reader: body, ContentMD5: r.Header.Get(api.ContentMD5), ContentSHA256Hash: r.Header.Get(api.AmzContentSha256), } p.Info.Encryption, err = formEncryptionParams(r) if err != nil { h.logAndSendError(w, "invalid sse headers", reqInfo, err, additional...) return } hash, err := h.obj.UploadPart(r.Context(), p) if err != nil { h.logAndSendError(w, "could not upload a part", reqInfo, err, additional...) return } if p.Info.Encryption.Enabled() { addSSECHeaders(w.Header(), r.Header) } w.Header().Set(api.ETag, data.Quote(hash)) middleware.WriteSuccessResponseHeadersOnly(w) } func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) { var ( versionID string ctx = r.Context() reqInfo = middleware.GetReqInfo(ctx) queryValues = reqInfo.URL.Query() uploadID = queryValues.Get(uploadIDHeaderName) partNumStr = queryValues.Get(partNumberHeaderName) additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("partNumber", partNumStr)} ) partNumber, err := strconv.Atoi(partNumStr) if err != nil || partNumber < layer.UploadMinPartNumber || partNumber > layer.UploadMaxPartNumber { h.logAndSendError(w, "invalid part number", reqInfo, errors.GetAPIError(errors.ErrInvalidPartNumber), additional...) return } src := r.Header.Get(api.AmzCopySource) if u, err := url.Parse(src); err == nil { versionID = u.Query().Get(api.QueryVersionID) src = u.Path } srcBucket, srcObject, err := path2BucketObject(src) if err != nil { h.logAndSendError(w, "invalid source copy", reqInfo, err, additional...) return } srcRange, err := parseRange(r.Header.Get(api.AmzCopySourceRange)) if err != nil { h.logAndSendError(w, "could not parse copy range", reqInfo, errors.GetAPIError(errors.ErrInvalidCopyPartRange), additional...) return } srcBktInfo, err := h.getBucketAndCheckOwner(r, srcBucket, api.AmzSourceExpectedBucketOwner) if err != nil { h.logAndSendError(w, "could not get source bucket info", reqInfo, err, additional...) return } bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName) if err != nil { h.logAndSendError(w, "could not get target bucket info", reqInfo, err, additional...) return } headPrm := &layer.HeadObjectParams{ BktInfo: srcBktInfo, Object: srcObject, VersionID: versionID, } srcInfo, err := h.obj.GetObjectInfo(ctx, headPrm) if err != nil { if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" { h.logAndSendError(w, "could not head source object version", reqInfo, errors.GetAPIError(errors.ErrBadRequest), additional...) return } h.logAndSendError(w, "could not head source object", reqInfo, err, additional...) return } args, err := parseCopyObjectArgs(r.Header) if err != nil { h.logAndSendError(w, "could not parse copy object args", reqInfo, errors.GetAPIError(errors.ErrInvalidCopyPartRange), additional...) return } if err = checkPreconditions(srcInfo, args.Conditional, h.cfg.MD5Enabled()); err != nil { h.logAndSendError(w, "precondition failed", reqInfo, errors.GetAPIError(errors.ErrPreconditionFailed), additional...) return } srcEncryptionParams, err := formCopySourceEncryptionParams(r) if err != nil { h.logAndSendError(w, "invalid sse headers", reqInfo, err) return } if err = srcEncryptionParams.MatchObjectEncryption(layer.FormEncryptionInfo(srcInfo.Headers)); err != nil { h.logAndSendError(w, "encryption doesn't match object", reqInfo, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrBadRequest), err), additional...) return } p := &layer.UploadCopyParams{ Versioned: headPrm.Versioned(), Info: &layer.UploadInfoParams{ UploadID: uploadID, Bkt: bktInfo, Key: reqInfo.ObjectName, }, SrcObjInfo: srcInfo, SrcBktInfo: srcBktInfo, SrcEncryption: srcEncryptionParams, PartNumber: partNumber, Range: srcRange, } p.Info.Encryption, err = formEncryptionParams(r) if err != nil { h.logAndSendError(w, "invalid sse headers", reqInfo, err, additional...) return } info, err := h.obj.UploadPartCopy(ctx, p) if err != nil { h.logAndSendError(w, "could not upload part copy", reqInfo, err, additional...) return } response := UploadPartCopyResponse{ LastModified: info.Created.UTC().Format(time.RFC3339), ETag: data.Quote(info.ETag(h.cfg.MD5Enabled())), } if p.Info.Encryption.Enabled() { addSSECHeaders(w.Header(), r.Header) } if err = middleware.EncodeToResponse(w, response); err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err, additional...) } } func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { reqInfo := middleware.GetReqInfo(r.Context()) bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName) if err != nil { h.logAndSendError(w, "could not get bucket info", reqInfo, err) return } settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) if err != nil { h.logAndSendError(w, "could not get bucket settings", reqInfo, err) return } var ( uploadID = r.URL.Query().Get(uploadIDHeaderName) uploadInfo = &layer.UploadInfoParams{ UploadID: uploadID, Bkt: bktInfo, Key: reqInfo.ObjectName, } additional = []zap.Field{zap.String("uploadID", uploadID)} ) reqBody := new(CompleteMultipartUpload) if err = h.cfg.NewXMLDecoder(r.Body).Decode(reqBody); err != nil { h.logAndSendError(w, "could not read complete multipart upload xml", reqInfo, errors.GetAPIError(errors.ErrMalformedXML), additional...) return } if len(reqBody.Parts) == 0 { h.logAndSendError(w, "invalid xml with parts", reqInfo, errors.GetAPIError(errors.ErrMalformedXML), additional...) return } c := &layer.CompleteMultipartParams{ Info: uploadInfo, Parts: reqBody.Parts, } // Start complete multipart upload which may take some time to fetch object // and re-upload it part by part. objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo) if err != nil { h.logAndSendError(w, "complete multipart error", reqInfo, err, additional...) return } response := CompleteMultipartUploadResponse{ Bucket: objInfo.Bucket, Key: objInfo.Name, ETag: data.Quote(objInfo.ETag(h.cfg.MD5Enabled())), } if settings.VersioningEnabled() { w.Header().Set(api.AmzVersionID, objInfo.VersionID()) } if err = middleware.EncodeToResponse(w, response); err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err, additional...) } } func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo, reqInfo *middleware.ReqInfo) (*data.ObjectInfo, error) { ctx := r.Context() uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c) if err != nil { return nil, fmt.Errorf("could not complete multipart upload: %w", err) } objInfo := extendedObjInfo.ObjectInfo if len(uploadData.TagSet) != 0 { tagPrm := &layer.PutObjectTaggingParams{ ObjectVersion: &layer.ObjectVersion{ BktInfo: bktInfo, ObjectName: objInfo.Name, VersionID: objInfo.VersionID(), }, TagSet: uploadData.TagSet, NodeVersion: extendedObjInfo.NodeVersion, } if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil { return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err) } } if len(uploadData.ACLHeaders) != 0 { sessionTokenSetEACL, err := getSessionTokenSetEACL(ctx) if err != nil { return nil, fmt.Errorf("couldn't get eacl token: %w", err) } key, err := h.bearerTokenIssuerKey(ctx) if err != nil { return nil, fmt.Errorf("couldn't get gate key: %w", err) } acl, err := parseACLHeaders(r.Header, key) if err != nil { return nil, fmt.Errorf("could not parse acl: %w", err) } resInfo := &resourceInfo{ Bucket: objInfo.Bucket, Object: objInfo.Name, } astObject, err := aclToAst(acl, resInfo) if err != nil { return nil, fmt.Errorf("could not translate acl of completed multipart upload to ast: %w", err) } if _, err = h.updateBucketACL(r, astObject, bktInfo, sessionTokenSetEACL); err != nil { return nil, fmt.Errorf("could not update bucket acl while completing multipart upload: %w", err) } } s := &SendNotificationParams{ Event: EventObjectCreatedCompleteMultipartUpload, NotificationInfo: data.NotificationInfoFromObject(objInfo, h.cfg.MD5Enabled()), BktInfo: bktInfo, ReqInfo: reqInfo, } if err = h.sendNotifications(ctx, s); err != nil { h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err)) } return objInfo, nil } func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) { reqInfo := middleware.GetReqInfo(r.Context()) bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName) if err != nil { h.logAndSendError(w, "could not get bucket info", reqInfo, err) return } var ( queryValues = reqInfo.URL.Query() maxUploadsStr = queryValues.Get(maxUploadsQueryName) maxUploads = layer.MaxSizeUploadsList ) if maxUploadsStr != "" { val, err := strconv.Atoi(maxUploadsStr) if err != nil || val < 1 || val > 1000 { h.logAndSendError(w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads)) return } maxUploads = val } p := &layer.ListMultipartUploadsParams{ Bkt: bktInfo, Delimiter: queryValues.Get(delimiterQueryName), EncodingType: queryValues.Get(encodingTypeQueryName), KeyMarker: queryValues.Get(keyMarkerQueryName), MaxUploads: maxUploads, Prefix: queryValues.Get(prefixQueryName), UploadIDMarker: queryValues.Get(uploadIDMarkerQueryName), } list, err := h.obj.ListMultipartUploads(r.Context(), p) if err != nil { h.logAndSendError(w, "could not list multipart uploads", reqInfo, err) return } if err = middleware.EncodeToResponse(w, encodeListMultipartUploadsToResponse(list, p)); err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err) } } func (h *handler) ListPartsHandler(w http.ResponseWriter, r *http.Request) { reqInfo := middleware.GetReqInfo(r.Context()) bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName) if err != nil { h.logAndSendError(w, "could not get bucket info", reqInfo, err) return } var ( partNumberMarker int queryValues = reqInfo.URL.Query() uploadID = queryValues.Get(uploadIDHeaderName) additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)} maxParts = layer.MaxSizePartsList ) if queryValues.Get("max-parts") != "" { val, err := strconv.Atoi(queryValues.Get("max-parts")) if err != nil || val < 0 { h.logAndSendError(w, "invalid MaxParts", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxParts), additional...) return } if val < layer.MaxSizePartsList { maxParts = val } } if queryValues.Get("part-number-marker") != "" { if partNumberMarker, err = strconv.Atoi(queryValues.Get("part-number-marker")); err != nil || partNumberMarker < 0 { h.logAndSendError(w, "invalid PartNumberMarker", reqInfo, err, additional...) return } } p := &layer.ListPartsParams{ Info: &layer.UploadInfoParams{ UploadID: uploadID, Bkt: bktInfo, Key: reqInfo.ObjectName, }, MaxParts: maxParts, PartNumberMarker: partNumberMarker, } p.Info.Encryption, err = formEncryptionParams(r) if err != nil { h.logAndSendError(w, "invalid sse headers", reqInfo, err) return } list, err := h.obj.ListParts(r.Context(), p) if err != nil { h.logAndSendError(w, "could not list parts", reqInfo, err, additional...) return } if err = middleware.EncodeToResponse(w, encodeListPartsToResponse(list, p)); err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err) } } func (h *handler) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { reqInfo := middleware.GetReqInfo(r.Context()) bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName) if err != nil { h.logAndSendError(w, "could not get bucket info", reqInfo, err) return } uploadID := reqInfo.URL.Query().Get(uploadIDHeaderName) additional := []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)} p := &layer.UploadInfoParams{ UploadID: uploadID, Bkt: bktInfo, Key: reqInfo.ObjectName, } p.Encryption, err = formEncryptionParams(r) if err != nil { h.logAndSendError(w, "invalid sse headers", reqInfo, err) return } if err = h.obj.AbortMultipartUpload(r.Context(), p); err != nil { h.logAndSendError(w, "could not abort multipart upload", reqInfo, err, additional...) return } w.WriteHeader(http.StatusNoContent) } func encodeListMultipartUploadsToResponse(info *layer.ListMultipartUploadsInfo, params *layer.ListMultipartUploadsParams) *ListMultipartUploadsResponse { res := ListMultipartUploadsResponse{ Bucket: params.Bkt.Name, CommonPrefixes: fillPrefixes(info.Prefixes, params.EncodingType), Delimiter: params.Delimiter, EncodingType: params.EncodingType, IsTruncated: info.IsTruncated, KeyMarker: params.KeyMarker, MaxUploads: params.MaxUploads, NextKeyMarker: info.NextKeyMarker, NextUploadIDMarker: info.NextUploadIDMarker, Prefix: params.Prefix, UploadIDMarker: params.UploadIDMarker, } uploads := make([]MultipartUpload, 0, len(info.Uploads)) for _, u := range info.Uploads { m := MultipartUpload{ Initiated: u.Created.UTC().Format(time.RFC3339), Initiator: Initiator{ ID: u.Owner.String(), DisplayName: u.Owner.String(), }, Key: u.Key, Owner: Owner{ ID: u.Owner.String(), DisplayName: u.Owner.String(), }, UploadID: u.UploadID, StorageClass: api.DefaultStorageClass, } uploads = append(uploads, m) } res.Uploads = uploads return &res } func encodeListPartsToResponse(info *layer.ListPartsInfo, params *layer.ListPartsParams) *ListPartsResponse { return &ListPartsResponse{ XMLName: xml.Name{}, Bucket: params.Info.Bkt.Name, Initiator: Initiator{ ID: info.Owner.String(), DisplayName: info.Owner.String(), }, IsTruncated: info.IsTruncated, Key: params.Info.Key, MaxParts: params.MaxParts, NextPartNumberMarker: info.NextPartNumberMarker, Owner: Owner{ ID: info.Owner.String(), DisplayName: info.Owner.String(), }, PartNumberMarker: params.PartNumberMarker, UploadID: params.Info.UploadID, Parts: info.Parts, StorageClass: api.DefaultStorageClass, } }