776 lines
23 KiB
Go
776 lines
23 KiB
Go
package handler
|
|
|
|
import (
|
|
"encoding/xml"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type (
|
|
InitiateMultipartUploadResponse struct {
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult" json:"-"`
|
|
Bucket string `xml:"Bucket"`
|
|
Key string `xml:"Key"`
|
|
UploadID string `xml:"UploadId"`
|
|
}
|
|
|
|
CompleteMultipartUploadResponse struct {
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"`
|
|
Bucket string `xml:"Bucket"`
|
|
Key string `xml:"Key"`
|
|
ETag string `xml:"ETag"`
|
|
}
|
|
|
|
ListMultipartUploadsResponse struct {
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult" json:"-"`
|
|
Bucket string `xml:"Bucket"`
|
|
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"`
|
|
Delimiter string `xml:"Delimiter,omitempty"`
|
|
EncodingType string `xml:"EncodingType,omitempty"`
|
|
IsTruncated bool `xml:"IsTruncated"`
|
|
KeyMarker string `xml:"KeyMarker"`
|
|
MaxUploads int `xml:"MaxUploads"`
|
|
NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
|
|
NextUploadIDMarker string `xml:"NextUploadIdMarker,omitempty"`
|
|
Prefix string `xml:"Prefix"`
|
|
Uploads []MultipartUpload `xml:"Upload"`
|
|
UploadIDMarker string `xml:"UploadIdMarker,omitempty"`
|
|
}
|
|
|
|
ListPartsResponse struct {
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListPartsResult" json:"-"`
|
|
Bucket string `xml:"Bucket"`
|
|
Initiator Initiator `xml:"Initiator"`
|
|
IsTruncated bool `xml:"IsTruncated"`
|
|
Key string `xml:"Key"`
|
|
MaxParts int `xml:"MaxParts,omitempty"`
|
|
NextPartNumberMarker int `xml:"NextPartNumberMarker,omitempty"`
|
|
Owner Owner `xml:"Owner"`
|
|
Parts []*layer.Part `xml:"Part"`
|
|
PartNumberMarker int `xml:"PartNumberMarker,omitempty"`
|
|
StorageClass string `xml:"StorageClass,omitempty"`
|
|
UploadID string `xml:"UploadId"`
|
|
}
|
|
|
|
MultipartUpload struct {
|
|
Initiated string `xml:"Initiated"`
|
|
Initiator Initiator `xml:"Initiator"`
|
|
Key string `xml:"Key"`
|
|
Owner Owner `xml:"Owner"`
|
|
StorageClass string `xml:"StorageClass,omitempty"`
|
|
UploadID string `xml:"UploadId"`
|
|
}
|
|
|
|
Initiator struct {
|
|
ID string `xml:"ID"`
|
|
DisplayName string `xml:"DisplayName"`
|
|
}
|
|
|
|
CompleteMultipartUpload struct {
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUpload"`
|
|
Parts []*layer.CompletedPart `xml:"Part"`
|
|
}
|
|
|
|
UploadPartCopyResponse struct {
|
|
ETag string `xml:"ETag"`
|
|
LastModified string `xml:"LastModified"`
|
|
}
|
|
)
|
|
|
|
const (
|
|
uploadIDHeaderName = "uploadId"
|
|
partNumberHeaderName = "partNumber"
|
|
|
|
prefixQueryName = "prefix"
|
|
delimiterQueryName = "delimiter"
|
|
maxUploadsQueryName = "max-uploads"
|
|
encodingTypeQueryName = "encoding-type"
|
|
keyMarkerQueryName = "key-marker"
|
|
uploadIDMarkerQueryName = "upload-id-marker"
|
|
)
|
|
|
|
func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|
reqInfo := middleware.GetReqInfo(r.Context())
|
|
|
|
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
|
return
|
|
}
|
|
|
|
uploadID := uuid.New()
|
|
additional := []zap.Field{zap.String("uploadID", uploadID.String())}
|
|
|
|
p := &layer.CreateMultipartParams{
|
|
Info: &layer.UploadInfoParams{
|
|
UploadID: uploadID.String(),
|
|
Bkt: bktInfo,
|
|
Key: reqInfo.ObjectName,
|
|
},
|
|
Data: &layer.UploadData{},
|
|
}
|
|
|
|
if containsACLHeaders(r) {
|
|
key, err := h.bearerTokenIssuerKey(r.Context())
|
|
if err != nil {
|
|
h.logAndSendError(w, "couldn't get gate key", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
if _, err = parseACLHeaders(r.Header, key); err != nil {
|
|
h.logAndSendError(w, "could not parse acl", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
p.Data.ACLHeaders = formACLHeadersForMultipart(r.Header)
|
|
}
|
|
|
|
if len(r.Header.Get(api.AmzTagging)) > 0 {
|
|
p.Data.TagSet, err = parseTaggingHeader(r.Header)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not parse tagging", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
}
|
|
|
|
p.Info.Encryption, err = formEncryptionParams(r)
|
|
if err != nil {
|
|
h.logAndSendError(w, "invalid sse headers", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
p.Header = parseMetadata(r)
|
|
if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 {
|
|
p.Header[api.ContentType] = contentType
|
|
}
|
|
|
|
p.CopiesNumbers, err = h.pickCopiesNumbers(p.Header, bktInfo.LocationConstraint)
|
|
if err != nil {
|
|
h.logAndSendError(w, "invalid copies number", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
if err = h.obj.CreateMultipartUpload(r.Context(), p); err != nil {
|
|
h.logAndSendError(w, "could create multipart upload", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
if p.Info.Encryption.Enabled() {
|
|
addSSECHeaders(w.Header(), r.Header)
|
|
}
|
|
|
|
resp := InitiateMultipartUploadResponse{
|
|
Bucket: reqInfo.BucketName,
|
|
Key: reqInfo.ObjectName,
|
|
UploadID: uploadID.String(),
|
|
}
|
|
|
|
if err = middleware.EncodeToResponse(w, resp); err != nil {
|
|
h.logAndSendError(w, "could not encode InitiateMultipartUploadResponse to response", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
}
|
|
|
|
func formACLHeadersForMultipart(header http.Header) map[string]string {
|
|
result := make(map[string]string)
|
|
|
|
if value := header.Get(api.AmzACL); value != "" {
|
|
result[api.AmzACL] = value
|
|
}
|
|
if value := header.Get(api.AmzGrantRead); value != "" {
|
|
result[api.AmzGrantRead] = value
|
|
}
|
|
if value := header.Get(api.AmzGrantFullControl); value != "" {
|
|
result[api.AmzGrantFullControl] = value
|
|
}
|
|
if value := header.Get(api.AmzGrantWrite); value != "" {
|
|
result[api.AmzGrantWrite] = value
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
|
reqInfo := middleware.GetReqInfo(r.Context())
|
|
|
|
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
|
return
|
|
}
|
|
|
|
var (
|
|
queryValues = r.URL.Query()
|
|
uploadID = queryValues.Get(uploadIDHeaderName)
|
|
partNumStr = queryValues.Get(partNumberHeaderName)
|
|
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("partNumber", partNumStr)}
|
|
)
|
|
|
|
partNumber, err := strconv.Atoi(partNumStr)
|
|
if err != nil || partNumber < layer.UploadMinPartNumber || partNumber > layer.UploadMaxPartNumber {
|
|
h.logAndSendError(w, "invalid part number", reqInfo, errors.GetAPIError(errors.ErrInvalidPartNumber), additional...)
|
|
return
|
|
}
|
|
|
|
body, err := h.getBodyReader(r)
|
|
if err != nil {
|
|
h.logAndSendError(w, "failed to get body reader", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
var size uint64
|
|
if r.ContentLength > 0 {
|
|
size = uint64(r.ContentLength)
|
|
}
|
|
|
|
p := &layer.UploadPartParams{
|
|
Info: &layer.UploadInfoParams{
|
|
UploadID: uploadID,
|
|
Bkt: bktInfo,
|
|
Key: reqInfo.ObjectName,
|
|
},
|
|
PartNumber: partNumber,
|
|
Size: size,
|
|
Reader: body,
|
|
}
|
|
|
|
p.Info.Encryption, err = formEncryptionParams(r)
|
|
if err != nil {
|
|
h.logAndSendError(w, "invalid sse headers", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
hash, err := h.obj.UploadPart(r.Context(), p)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not upload a part", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
if p.Info.Encryption.Enabled() {
|
|
addSSECHeaders(w.Header(), r.Header)
|
|
}
|
|
|
|
w.Header().Set(api.ETag, hash)
|
|
middleware.WriteSuccessResponseHeadersOnly(w)
|
|
}
|
|
|
|
func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
|
var (
|
|
versionID string
|
|
ctx = r.Context()
|
|
reqInfo = middleware.GetReqInfo(ctx)
|
|
queryValues = reqInfo.URL.Query()
|
|
uploadID = queryValues.Get(uploadIDHeaderName)
|
|
partNumStr = queryValues.Get(partNumberHeaderName)
|
|
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("partNumber", partNumStr)}
|
|
)
|
|
|
|
partNumber, err := strconv.Atoi(partNumStr)
|
|
if err != nil || partNumber < layer.UploadMinPartNumber || partNumber > layer.UploadMaxPartNumber {
|
|
h.logAndSendError(w, "invalid part number", reqInfo, errors.GetAPIError(errors.ErrInvalidPartNumber), additional...)
|
|
return
|
|
}
|
|
|
|
src := r.Header.Get(api.AmzCopySource)
|
|
if u, err := url.Parse(src); err == nil {
|
|
versionID = u.Query().Get(api.QueryVersionID)
|
|
src = u.Path
|
|
}
|
|
srcBucket, srcObject, err := path2BucketObject(src)
|
|
if err != nil {
|
|
h.logAndSendError(w, "invalid source copy", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
srcRange, err := parseRange(r.Header.Get(api.AmzCopySourceRange))
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not parse copy range", reqInfo,
|
|
errors.GetAPIError(errors.ErrInvalidCopyPartRange), additional...)
|
|
return
|
|
}
|
|
|
|
srcBktInfo, err := h.getBucketAndCheckOwner(r, srcBucket, api.AmzSourceExpectedBucketOwner)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not get source bucket info", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not get target bucket info", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
headPrm := &layer.HeadObjectParams{
|
|
BktInfo: srcBktInfo,
|
|
Object: srcObject,
|
|
VersionID: versionID,
|
|
}
|
|
|
|
srcInfo, err := h.obj.GetObjectInfo(ctx, headPrm)
|
|
if err != nil {
|
|
if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" {
|
|
h.logAndSendError(w, "could not head source object version", reqInfo,
|
|
errors.GetAPIError(errors.ErrBadRequest), additional...)
|
|
return
|
|
}
|
|
h.logAndSendError(w, "could not head source object", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
args, err := parseCopyObjectArgs(r.Header)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not parse copy object args", reqInfo,
|
|
errors.GetAPIError(errors.ErrInvalidCopyPartRange), additional...)
|
|
return
|
|
}
|
|
|
|
if err = checkPreconditions(srcInfo, args.Conditional); err != nil {
|
|
h.logAndSendError(w, "precondition failed", reqInfo, errors.GetAPIError(errors.ErrPreconditionFailed),
|
|
additional...)
|
|
return
|
|
}
|
|
|
|
p := &layer.UploadCopyParams{
|
|
Versioned: headPrm.Versioned(),
|
|
Info: &layer.UploadInfoParams{
|
|
UploadID: uploadID,
|
|
Bkt: bktInfo,
|
|
Key: reqInfo.ObjectName,
|
|
},
|
|
SrcObjInfo: srcInfo,
|
|
SrcBktInfo: srcBktInfo,
|
|
PartNumber: partNumber,
|
|
Range: srcRange,
|
|
}
|
|
|
|
p.Info.Encryption, err = formEncryptionParams(r)
|
|
if err != nil {
|
|
h.logAndSendError(w, "invalid sse headers", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
if err = p.Info.Encryption.MatchObjectEncryption(layer.FormEncryptionInfo(srcInfo.Headers)); err != nil {
|
|
h.logAndSendError(w, "encryption doesn't match object", reqInfo, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrBadRequest), err), additional...)
|
|
return
|
|
}
|
|
|
|
info, err := h.obj.UploadPartCopy(ctx, p)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not upload part copy", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
response := UploadPartCopyResponse{
|
|
ETag: info.HashSum,
|
|
LastModified: info.Created.UTC().Format(time.RFC3339),
|
|
}
|
|
|
|
if p.Info.Encryption.Enabled() {
|
|
addSSECHeaders(w.Header(), r.Header)
|
|
}
|
|
|
|
if err = middleware.EncodeToResponse(w, response); err != nil {
|
|
h.logAndSendError(w, "something went wrong", reqInfo, err, additional...)
|
|
}
|
|
}
|
|
|
|
func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|
reqInfo := middleware.GetReqInfo(r.Context())
|
|
|
|
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
|
return
|
|
}
|
|
|
|
var (
|
|
uploadID = r.URL.Query().Get(uploadIDHeaderName)
|
|
uploadInfo = &layer.UploadInfoParams{
|
|
UploadID: uploadID,
|
|
Bkt: bktInfo,
|
|
Key: reqInfo.ObjectName,
|
|
}
|
|
additional = []zap.Field{zap.String("uploadID", uploadID)}
|
|
)
|
|
|
|
reqBody := new(CompleteMultipartUpload)
|
|
if err = h.cfg.XMLDecoder.NewCompleteMultipartDecoder(r.Body).Decode(reqBody); err != nil {
|
|
h.logAndSendError(w, "could not read complete multipart upload xml", reqInfo,
|
|
errors.GetAPIError(errors.ErrMalformedXML), additional...)
|
|
return
|
|
}
|
|
if len(reqBody.Parts) == 0 {
|
|
h.logAndSendError(w, "invalid xml with parts", reqInfo, errors.GetAPIError(errors.ErrMalformedXML), additional...)
|
|
return
|
|
}
|
|
|
|
c := &layer.CompleteMultipartParams{
|
|
Info: uploadInfo,
|
|
Parts: reqBody.Parts,
|
|
}
|
|
|
|
// Next operations might take some time, so we want to keep client's
|
|
// connection alive. To do so, gateway sends periodic white spaces
|
|
// back to the client the same way as Amazon S3 service does.
|
|
stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.CompleteMultipartKeepalive)
|
|
|
|
// Start complete multipart upload which may take some time to fetch object
|
|
// and re-upload it part by part.
|
|
objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo)
|
|
|
|
// Stop periodic writer as complete multipart upload is finished
|
|
// successfully or not.
|
|
headerIsWritten := stopPeriodicResponseWriter()
|
|
|
|
responseWriter := middleware.EncodeToResponse
|
|
errLogger := h.logAndSendError
|
|
// Do not send XML and HTTP headers if periodic writer was invoked at this point.
|
|
if headerIsWritten {
|
|
responseWriter = middleware.EncodeToResponseNoHeader
|
|
errLogger = h.logAndSendErrorNoHeader
|
|
}
|
|
|
|
if err != nil {
|
|
errLogger(w, "complete multipart error", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
response := CompleteMultipartUploadResponse{
|
|
Bucket: objInfo.Bucket,
|
|
ETag: objInfo.HashSum,
|
|
Key: objInfo.Name,
|
|
}
|
|
|
|
// Here we previously set api.AmzVersionID header for versioned bucket.
|
|
// It is not possible after #60, because of periodic white
|
|
// space XML writer to keep connection with the client.
|
|
|
|
if err = responseWriter(w, response); err != nil {
|
|
errLogger(w, "something went wrong", reqInfo, err, additional...)
|
|
}
|
|
}
|
|
|
|
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo, reqInfo *middleware.ReqInfo) (*data.ObjectInfo, error) {
|
|
ctx := r.Context()
|
|
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not complete multipart upload: %w", err)
|
|
}
|
|
objInfo := extendedObjInfo.ObjectInfo
|
|
|
|
if len(uploadData.TagSet) != 0 {
|
|
tagPrm := &layer.PutObjectTaggingParams{
|
|
ObjectVersion: &layer.ObjectVersion{
|
|
BktInfo: bktInfo,
|
|
ObjectName: objInfo.Name,
|
|
VersionID: objInfo.VersionID(),
|
|
},
|
|
TagSet: uploadData.TagSet,
|
|
NodeVersion: extendedObjInfo.NodeVersion,
|
|
}
|
|
if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
|
return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err)
|
|
}
|
|
}
|
|
|
|
if len(uploadData.ACLHeaders) != 0 {
|
|
sessionTokenSetEACL, err := getSessionTokenSetEACL(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't get eacl token: %w", err)
|
|
}
|
|
key, err := h.bearerTokenIssuerKey(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't get gate key: %w", err)
|
|
}
|
|
acl, err := parseACLHeaders(r.Header, key)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not parse acl: %w", err)
|
|
}
|
|
|
|
resInfo := &resourceInfo{
|
|
Bucket: objInfo.Bucket,
|
|
Object: objInfo.Name,
|
|
}
|
|
astObject, err := aclToAst(acl, resInfo)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not translate acl of completed multipart upload to ast: %w", err)
|
|
}
|
|
if _, err = h.updateBucketACL(r, astObject, bktInfo, sessionTokenSetEACL); err != nil {
|
|
return nil, fmt.Errorf("could not update bucket acl while completing multipart upload: %w", err)
|
|
}
|
|
}
|
|
|
|
s := &SendNotificationParams{
|
|
Event: EventObjectCreatedCompleteMultipartUpload,
|
|
NotificationInfo: data.NotificationInfoFromObject(objInfo),
|
|
BktInfo: bktInfo,
|
|
ReqInfo: reqInfo,
|
|
}
|
|
if err = h.sendNotifications(ctx, s); err != nil {
|
|
h.reqLogger(ctx).Error(logs.CouldntSendNotification, zap.Error(err))
|
|
}
|
|
|
|
return objInfo, nil
|
|
}
|
|
|
|
func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
|
|
reqInfo := middleware.GetReqInfo(r.Context())
|
|
|
|
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
|
return
|
|
}
|
|
|
|
var (
|
|
queryValues = reqInfo.URL.Query()
|
|
maxUploadsStr = queryValues.Get(maxUploadsQueryName)
|
|
maxUploads = layer.MaxSizeUploadsList
|
|
)
|
|
|
|
if maxUploadsStr != "" {
|
|
val, err := strconv.Atoi(maxUploadsStr)
|
|
if err != nil || val < 1 || val > 1000 {
|
|
h.logAndSendError(w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
|
|
return
|
|
}
|
|
maxUploads = val
|
|
}
|
|
|
|
p := &layer.ListMultipartUploadsParams{
|
|
Bkt: bktInfo,
|
|
Delimiter: queryValues.Get(delimiterQueryName),
|
|
EncodingType: queryValues.Get(encodingTypeQueryName),
|
|
KeyMarker: queryValues.Get(keyMarkerQueryName),
|
|
MaxUploads: maxUploads,
|
|
Prefix: queryValues.Get(prefixQueryName),
|
|
UploadIDMarker: queryValues.Get(uploadIDMarkerQueryName),
|
|
}
|
|
|
|
list, err := h.obj.ListMultipartUploads(r.Context(), p)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not list multipart uploads", reqInfo, err)
|
|
return
|
|
}
|
|
|
|
if err = middleware.EncodeToResponse(w, encodeListMultipartUploadsToResponse(list, p)); err != nil {
|
|
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
|
}
|
|
}
|
|
|
|
func (h *handler) ListPartsHandler(w http.ResponseWriter, r *http.Request) {
|
|
reqInfo := middleware.GetReqInfo(r.Context())
|
|
|
|
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
|
return
|
|
}
|
|
|
|
var (
|
|
partNumberMarker int
|
|
|
|
queryValues = reqInfo.URL.Query()
|
|
uploadID = queryValues.Get(uploadIDHeaderName)
|
|
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
|
|
maxParts = layer.MaxSizePartsList
|
|
)
|
|
|
|
if queryValues.Get("max-parts") != "" {
|
|
val, err := strconv.Atoi(queryValues.Get("max-parts"))
|
|
if err != nil || val < 0 {
|
|
h.logAndSendError(w, "invalid MaxParts", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxParts), additional...)
|
|
return
|
|
}
|
|
if val < layer.MaxSizePartsList {
|
|
maxParts = val
|
|
}
|
|
}
|
|
|
|
if queryValues.Get("part-number-marker") != "" {
|
|
if partNumberMarker, err = strconv.Atoi(queryValues.Get("part-number-marker")); err != nil || partNumberMarker <= 0 {
|
|
h.logAndSendError(w, "invalid PartNumberMarker", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
}
|
|
|
|
p := &layer.ListPartsParams{
|
|
Info: &layer.UploadInfoParams{
|
|
UploadID: uploadID,
|
|
Bkt: bktInfo,
|
|
Key: reqInfo.ObjectName,
|
|
},
|
|
MaxParts: maxParts,
|
|
PartNumberMarker: partNumberMarker,
|
|
}
|
|
|
|
p.Info.Encryption, err = formEncryptionParams(r)
|
|
if err != nil {
|
|
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
|
|
return
|
|
}
|
|
|
|
list, err := h.obj.ListParts(r.Context(), p)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not list parts", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
if err = middleware.EncodeToResponse(w, encodeListPartsToResponse(list, p)); err != nil {
|
|
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
|
}
|
|
}
|
|
|
|
func (h *handler) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|
reqInfo := middleware.GetReqInfo(r.Context())
|
|
|
|
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
|
if err != nil {
|
|
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
|
return
|
|
}
|
|
|
|
uploadID := reqInfo.URL.Query().Get(uploadIDHeaderName)
|
|
additional := []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
|
|
|
|
p := &layer.UploadInfoParams{
|
|
UploadID: uploadID,
|
|
Bkt: bktInfo,
|
|
Key: reqInfo.ObjectName,
|
|
}
|
|
|
|
p.Encryption, err = formEncryptionParams(r)
|
|
if err != nil {
|
|
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
|
|
return
|
|
}
|
|
|
|
if err = h.obj.AbortMultipartUpload(r.Context(), p); err != nil {
|
|
h.logAndSendError(w, "could not abort multipart upload", reqInfo, err, additional...)
|
|
return
|
|
}
|
|
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
func encodeListMultipartUploadsToResponse(info *layer.ListMultipartUploadsInfo, params *layer.ListMultipartUploadsParams) *ListMultipartUploadsResponse {
|
|
res := ListMultipartUploadsResponse{
|
|
Bucket: params.Bkt.Name,
|
|
CommonPrefixes: fillPrefixes(info.Prefixes, params.EncodingType),
|
|
Delimiter: params.Delimiter,
|
|
EncodingType: params.EncodingType,
|
|
IsTruncated: info.IsTruncated,
|
|
KeyMarker: params.KeyMarker,
|
|
MaxUploads: params.MaxUploads,
|
|
NextKeyMarker: info.NextKeyMarker,
|
|
NextUploadIDMarker: info.NextUploadIDMarker,
|
|
Prefix: params.Prefix,
|
|
UploadIDMarker: params.UploadIDMarker,
|
|
}
|
|
|
|
uploads := make([]MultipartUpload, 0, len(info.Uploads))
|
|
for _, u := range info.Uploads {
|
|
m := MultipartUpload{
|
|
Initiated: u.Created.UTC().Format(time.RFC3339),
|
|
Initiator: Initiator{
|
|
ID: u.Owner.String(),
|
|
DisplayName: u.Owner.String(),
|
|
},
|
|
Key: u.Key,
|
|
Owner: Owner{
|
|
ID: u.Owner.String(),
|
|
DisplayName: u.Owner.String(),
|
|
},
|
|
UploadID: u.UploadID,
|
|
}
|
|
uploads = append(uploads, m)
|
|
}
|
|
|
|
res.Uploads = uploads
|
|
|
|
return &res
|
|
}
|
|
|
|
func encodeListPartsToResponse(info *layer.ListPartsInfo, params *layer.ListPartsParams) *ListPartsResponse {
|
|
return &ListPartsResponse{
|
|
XMLName: xml.Name{},
|
|
Bucket: params.Info.Bkt.Name,
|
|
Initiator: Initiator{
|
|
ID: info.Owner.String(),
|
|
DisplayName: info.Owner.String(),
|
|
},
|
|
IsTruncated: info.IsTruncated,
|
|
Key: params.Info.Key,
|
|
MaxParts: params.MaxParts,
|
|
NextPartNumberMarker: info.NextPartNumberMarker,
|
|
Owner: Owner{
|
|
ID: info.Owner.String(),
|
|
DisplayName: info.Owner.String(),
|
|
},
|
|
PartNumberMarker: params.PartNumberMarker,
|
|
UploadID: params.Info.UploadID,
|
|
Parts: info.Parts,
|
|
}
|
|
}
|
|
|
|
// periodicXMLWriter creates go routine to write xml header and whitespaces
|
|
// over time to avoid connection drop from the client. To work properly,
|
|
// pass `http.ResponseWriter` with implemented `http.Flusher` interface.
|
|
// Returns stop function which returns boolean if writer has been used
|
|
// during goroutine execution. To disable writer, pass 0 duration value.
|
|
func periodicXMLWriter(w io.Writer, dur time.Duration) (stop func() bool) {
|
|
if dur == 0 { // 0 duration disables periodic writer
|
|
return func() bool { return false }
|
|
}
|
|
|
|
whitespaceChar := []byte(" ")
|
|
closer := make(chan struct{})
|
|
done := make(chan struct{})
|
|
headerWritten := false
|
|
|
|
go func() {
|
|
defer close(done)
|
|
|
|
tick := time.NewTicker(dur)
|
|
defer tick.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-tick.C:
|
|
if !headerWritten {
|
|
_, err := w.Write([]byte(xml.Header))
|
|
headerWritten = err == nil
|
|
}
|
|
_, err := w.Write(whitespaceChar)
|
|
if err != nil {
|
|
return // is there anything we can do better than ignore error?
|
|
}
|
|
if buffered, ok := w.(http.Flusher); ok {
|
|
buffered.Flush()
|
|
}
|
|
case <-closer:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
stop = func() bool {
|
|
close(closer)
|
|
<-done // wait for goroutine to stop
|
|
return headerWritten
|
|
}
|
|
|
|
return stop
|
|
}
|