forked from TrueCloudLab/frostfs-s3-gw
[#186] Add MultipartUpload support
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
parent
284a560ea6
commit
873622d4d5
9 changed files with 1291 additions and 67 deletions
|
@ -44,6 +44,7 @@ const (
|
|||
ErrInvalidMaxUploads
|
||||
ErrInvalidMaxParts
|
||||
ErrInvalidPartNumberMarker
|
||||
ErrInvalidPartNumber
|
||||
ErrInvalidRequestBody
|
||||
ErrInvalidCopySource
|
||||
ErrInvalidMetadataDirective
|
||||
|
@ -998,7 +999,7 @@ var errorCodes = errorCodeMap{
|
|||
},
|
||||
ErrInvalidCopyPartRangeSource: {
|
||||
ErrCode: ErrInvalidCopyPartRangeSource,
|
||||
Code: "InvalidArgument",
|
||||
Code: "InvalidRange",
|
||||
Description: "Range specified is not valid for source object",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
|
@ -1925,6 +1926,12 @@ var errorCodes = errorCodeMap{
|
|||
Description: "ExposeHeader \"*\" contains wildcard. We currently do not support wildcard for ExposeHeader",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrInvalidPartNumber: {
|
||||
ErrCode: ErrInvalidPartNumber,
|
||||
Code: "InvalidArgument",
|
||||
Description: "Part number must be an integer between 1 and 10000, inclusive",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
// Add your error structure here.
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
|
@ -9,13 +8,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-sdk-go/owner"
|
||||
)
|
||||
|
||||
// ListMultipartUploadsResult contains ListMultipartUploadsResult XML representation.
|
||||
type ListMultipartUploadsResult struct {
|
||||
XMLName xml.Name `xml:"ListMultipartUploadsResult"`
|
||||
Text string `xml:",chardata"`
|
||||
Xmlns string `xml:"xmlns,attr"`
|
||||
}
|
||||
|
||||
const maxObjectList = 1000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse.
|
||||
|
||||
// ListBucketsHandler handles bucket listing requests.
|
||||
|
@ -54,17 +46,3 @@ func (h *handler) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
|
|||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
||||
// ListMultipartUploadsHandler implements multipart uploads listing handler.
|
||||
func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
reqInfo = api.GetReqInfo(r.Context())
|
||||
res = new(ListMultipartUploadsResult)
|
||||
)
|
||||
|
||||
res.Xmlns = "http://s3.amazonaws.com/doc/2006-03-01/"
|
||||
|
||||
if err := api.EncodeToResponse(w, res); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
|
645
api/handler/multipart_upload.go
Normal file
645
api/handler/multipart_upload.go
Normal file
|
@ -0,0 +1,645 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"encoding/xml"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
InitiateMultipartUploadResponse struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult" json:"-"`
|
||||
Bucket string `xml:"Bucket"`
|
||||
Key string `xml:"Key"`
|
||||
UploadID string `xml:"UploadId"`
|
||||
}
|
||||
|
||||
CompleteMultipartUploadResponse struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"`
|
||||
Bucket string `xml:"Bucket"`
|
||||
Key string `xml:"Key"`
|
||||
ETag string `xml:"ETag"`
|
||||
}
|
||||
|
||||
ListMultipartUploadsResponse struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult" json:"-"`
|
||||
Bucket string `xml:"Bucket"`
|
||||
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"`
|
||||
Delimiter string `xml:"Delimiter,omitempty"`
|
||||
EncodingType string `xml:"EncodingType,omitempty"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
KeyMarker string `xml:"KeyMarker"`
|
||||
MaxUploads int `xml:"MaxUploads"`
|
||||
NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
|
||||
NextUploadIDMarker string `xml:"NextUploadIdMarker,omitempty"`
|
||||
Prefix string `xml:"Prefix"`
|
||||
Uploads []MultipartUpload `xml:"Upload"`
|
||||
UploadIDMarker string `xml:"UploadIdMarker,omitempty"`
|
||||
}
|
||||
|
||||
ListPartsResponse struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListPartsResult" json:"-"`
|
||||
Bucket string `xml:"Bucket"`
|
||||
Initiator Initiator `xml:"Initiator"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
Key string `xml:"Key"`
|
||||
MaxParts int `xml:"MaxParts,omitempty"`
|
||||
NextPartNumberMarker int `xml:"NextPartNumberMarker,omitempty"`
|
||||
Owner Owner `xml:"Owner"`
|
||||
Parts []*layer.Part `xml:"Part"`
|
||||
PartNumberMarker int `xml:"PartNumberMarker,omitempty"`
|
||||
StorageClass string `xml:"StorageClass,omitempty"`
|
||||
UploadID string `xml:"UploadId"`
|
||||
}
|
||||
|
||||
MultipartUpload struct {
|
||||
Initiated string `xml:"Initiated"`
|
||||
Initiator Initiator `xml:"Initiator"`
|
||||
Key string `xml:"Key"`
|
||||
Owner Owner `xml:"Owner"`
|
||||
StorageClass string `xml:"StorageClass,omitempty"`
|
||||
UploadID string `xml:"UploadId"`
|
||||
}
|
||||
|
||||
Initiator struct {
|
||||
ID string `xml:"ID"`
|
||||
DisplayName string `xml:"DisplayName"`
|
||||
}
|
||||
|
||||
CompleteMultipartUpload struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUpload"`
|
||||
Parts []*layer.CompletedPart `xml:"Part"`
|
||||
}
|
||||
|
||||
UploadPartCopyResponse struct {
|
||||
ETag string `xml:"ETag"`
|
||||
LastModified string `xml:"LastModified"`
|
||||
}
|
||||
|
||||
UploadData struct {
|
||||
TagSet map[string]string
|
||||
ACL *AccessControlPolicy
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
uploadIDHeaderName = "uploadId"
|
||||
partNumberHeaderName = "partNumber"
|
||||
)
|
||||
|
||||
func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
/* initiation of multipart uploads is implemented via creation of "system" upload part with 0 part number
|
||||
(min value of partNumber of a common part is 1) and holding data: metadata, acl, tagging */
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
|
||||
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
hasData bool
|
||||
b []byte
|
||||
|
||||
uploadID = uuid.New()
|
||||
data = &UploadData{}
|
||||
additional = []zap.Field{
|
||||
zap.String("uploadID", uploadID.String()),
|
||||
zap.String("Key", reqInfo.ObjectName),
|
||||
}
|
||||
uploadInfo = &layer.UploadInfoParams{
|
||||
UploadID: uploadID.String(),
|
||||
Bkt: bktInfo,
|
||||
Key: reqInfo.ObjectName,
|
||||
}
|
||||
)
|
||||
|
||||
if containsACLHeaders(r) {
|
||||
gateKey, err := h.gateKey(r.Context())
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "couldn't get gate key", reqInfo, err)
|
||||
return
|
||||
}
|
||||
data.ACL, err = parseACLHeaders(r.Header, gateKey)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not parse acl", reqInfo, err)
|
||||
return
|
||||
}
|
||||
hasData = true
|
||||
}
|
||||
|
||||
if len(r.Header.Get(api.AmzTagging)) > 0 {
|
||||
data.TagSet, err = parseTaggingHeader(r.Header)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not parse tagging", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
hasData = true
|
||||
}
|
||||
|
||||
metadata := parseMetadata(r)
|
||||
if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 {
|
||||
metadata[api.ContentType] = contentType
|
||||
}
|
||||
|
||||
p := &layer.UploadPartParams{
|
||||
Info: uploadInfo,
|
||||
PartNumber: 0,
|
||||
Header: metadata,
|
||||
}
|
||||
|
||||
if hasData {
|
||||
b, err = json.Marshal(data)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not marshal json with acl and/or tagging", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
p.Reader = bytes.NewReader(b)
|
||||
}
|
||||
|
||||
info, err := h.obj.UploadPart(r.Context(), p)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not upload a part", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
resp := InitiateMultipartUploadResponse{
|
||||
Bucket: info.Bucket,
|
||||
Key: info.Headers[layer.UploadKeyAttributeName],
|
||||
UploadID: info.Headers[layer.UploadIDAttributeName],
|
||||
}
|
||||
|
||||
if err := api.EncodeToResponse(w, resp); err != nil {
|
||||
h.logAndSendError(w, "could not encode InitiateMultipartUploadResponse to response", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
|
||||
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
|
||||
return
|
||||
}
|
||||
var (
|
||||
queryValues = r.URL.Query()
|
||||
uploadID = queryValues.Get(uploadIDHeaderName)
|
||||
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
|
||||
)
|
||||
|
||||
partNumber, err := strconv.Atoi(queryValues.Get(partNumberHeaderName))
|
||||
if err != nil || partNumber < layer.UploadMinPartNumber || partNumber > layer.UploadMaxPartNumber {
|
||||
h.logAndSendError(w, "invalid part number", reqInfo, errors.GetAPIError(errors.ErrInvalidPartNumber))
|
||||
return
|
||||
}
|
||||
|
||||
p := &layer.UploadPartParams{
|
||||
Info: &layer.UploadInfoParams{
|
||||
UploadID: uploadID,
|
||||
Bkt: bktInfo,
|
||||
Key: reqInfo.ObjectName,
|
||||
},
|
||||
PartNumber: partNumber,
|
||||
Size: r.ContentLength,
|
||||
Reader: r.Body,
|
||||
}
|
||||
|
||||
info, err := h.obj.UploadPart(r.Context(), p)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not upload a part", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set(api.ETag, info.HashSum)
|
||||
api.WriteSuccessResponseHeadersOnly(w)
|
||||
}
|
||||
|
||||
func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
|
||||
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
queryValues = reqInfo.URL.Query()
|
||||
uploadID = queryValues.Get(uploadIDHeaderName)
|
||||
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
|
||||
)
|
||||
|
||||
partNumber, err := strconv.Atoi(queryValues.Get(partNumberHeaderName))
|
||||
if err != nil || partNumber < layer.UploadMinPartNumber || partNumber > layer.UploadMaxPartNumber {
|
||||
h.logAndSendError(w, "invalid part number", reqInfo, errors.GetAPIError(errors.ErrInvalidPartNumber))
|
||||
return
|
||||
}
|
||||
|
||||
src := r.Header.Get(api.AmzCopySource)
|
||||
srcBucket, srcObject := path2BucketObject(src)
|
||||
|
||||
srcRange, err := parseRange(r.Header.Get(api.AmzCopySourceRange))
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not parse copy range", reqInfo,
|
||||
errors.GetAPIError(errors.ErrInvalidCopyPartRange), additional...)
|
||||
return
|
||||
}
|
||||
|
||||
srcInfo, err := h.obj.GetObjectInfo(r.Context(), &layer.HeadObjectParams{
|
||||
Bucket: srcBucket,
|
||||
Object: srcObject,
|
||||
})
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not head source object", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
args, err := parseCopyObjectArgs(r.Header)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not parse copy object args", reqInfo,
|
||||
errors.GetAPIError(errors.ErrInvalidCopyPartRange), additional...)
|
||||
return
|
||||
}
|
||||
|
||||
if err = checkPreconditions(srcInfo, args.Conditional); err != nil {
|
||||
h.logAndSendError(w, "precondition failed", reqInfo, errors.GetAPIError(errors.ErrPreconditionFailed),
|
||||
additional...)
|
||||
return
|
||||
}
|
||||
|
||||
if err = h.checkBucketOwner(r, srcBucket, r.Header.Get(api.AmzSourceExpectedBucketOwner)); err != nil {
|
||||
h.logAndSendError(w, "source expected owner doesn't match", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
p := &layer.UploadCopyParams{
|
||||
Info: &layer.UploadInfoParams{
|
||||
UploadID: uploadID,
|
||||
Bkt: bktInfo,
|
||||
Key: reqInfo.ObjectName,
|
||||
},
|
||||
SrcObjInfo: srcInfo,
|
||||
PartNumber: partNumber,
|
||||
Range: srcRange,
|
||||
}
|
||||
|
||||
info, err := h.obj.UploadPartCopy(r.Context(), p)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not upload part copy", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
response := UploadPartCopyResponse{
|
||||
ETag: info.HashSum,
|
||||
LastModified: info.Created.Format(time.RFC3339),
|
||||
}
|
||||
|
||||
if err = api.EncodeToResponse(w, response); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
|
||||
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
uploadID = r.URL.Query().Get(uploadIDHeaderName)
|
||||
uploadInfo = &layer.UploadInfoParams{
|
||||
UploadID: uploadID,
|
||||
Bkt: bktInfo,
|
||||
Key: reqInfo.ObjectName,
|
||||
}
|
||||
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
|
||||
)
|
||||
|
||||
reqBody := new(CompleteMultipartUpload)
|
||||
if err := xml.NewDecoder(r.Body).Decode(reqBody); err != nil {
|
||||
h.logAndSendError(w, "could not read complete multipart upload xml", reqInfo,
|
||||
errors.GetAPIError(errors.ErrMalformedXML), additional...)
|
||||
return
|
||||
}
|
||||
|
||||
initPart, err := h.obj.GetUploadInitInfo(r.Context(), uploadInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get multipart upload info", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
c := &layer.CompleteMultipartParams{
|
||||
Info: uploadInfo,
|
||||
Parts: reqBody.Parts,
|
||||
}
|
||||
objInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
if initPart.Size > 0 {
|
||||
initPartPayload := bytes.NewBuffer(make([]byte, 0, initPart.Size))
|
||||
p := &layer.GetObjectParams{
|
||||
ObjectInfo: initPart,
|
||||
Writer: initPartPayload,
|
||||
}
|
||||
if err = h.obj.GetObject(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "could not get multipart upload acl and/or tagging", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
uploadData := &UploadData{}
|
||||
if err = json.Unmarshal(initPartPayload.Bytes(), uploadData); err != nil {
|
||||
h.logAndSendError(w, "could not unmarshal multipart upload acl and/or tagging", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
if len(uploadData.TagSet) != 0 {
|
||||
t := &layer.PutTaggingParams{
|
||||
ObjectInfo: objInfo,
|
||||
TagSet: uploadData.TagSet,
|
||||
}
|
||||
if err = h.obj.PutObjectTagging(r.Context(), t); err != nil {
|
||||
h.logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if uploadData.ACL != nil {
|
||||
resInfo := &resourceInfo{
|
||||
Bucket: objInfo.Bucket,
|
||||
Object: objInfo.Name,
|
||||
}
|
||||
astObject, err := aclToAst(uploadData.ACL, resInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not translate acl of completed multipart upload to ast", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
if err = h.updateBucketACL(r, astObject, reqInfo.BucketName); err != nil {
|
||||
h.logAndSendError(w, "could not update bucket acl while completing multipart upload", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, err = h.obj.DeleteObjects(r.Context(), bktInfo.Name, []*layer.VersionedObject{{Name: initPart.Name}})
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not delete init file of multipart upload", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
response := CompleteMultipartUploadResponse{
|
||||
Bucket: objInfo.Bucket,
|
||||
ETag: objInfo.HashSum,
|
||||
Key: objInfo.Name,
|
||||
}
|
||||
|
||||
if err = api.EncodeToResponse(w, response); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
|
||||
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
queryValues = reqInfo.URL.Query()
|
||||
delimiter = queryValues.Get("delimiter")
|
||||
prefix = queryValues.Get("prefix")
|
||||
maxUploads = layer.MaxSizeUploadsList
|
||||
)
|
||||
|
||||
if queryValues.Get("max-uploads") != "" {
|
||||
val, err := strconv.Atoi(queryValues.Get("max-uploads"))
|
||||
if err != nil || val < 0 {
|
||||
h.logAndSendError(w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
|
||||
return
|
||||
}
|
||||
if val < maxUploads {
|
||||
maxUploads = val
|
||||
}
|
||||
}
|
||||
|
||||
p := &layer.ListMultipartUploadsParams{
|
||||
Bkt: bktInfo,
|
||||
Delimiter: delimiter,
|
||||
EncodingType: queryValues.Get("encoding-type"),
|
||||
KeyMarker: queryValues.Get("key-marker"),
|
||||
MaxUploads: maxUploads,
|
||||
Prefix: prefix,
|
||||
UploadIDMarker: queryValues.Get("upload-id-marker"),
|
||||
}
|
||||
|
||||
list, err := h.obj.ListMultipartUploads(r.Context(), p)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not list multipart uploads", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err = api.EncodeToResponse(w, encodeListMultipartUploadsToResponse(list, p)); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) ListPartsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
|
||||
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
partNumberMarker int
|
||||
|
||||
queryValues = reqInfo.URL.Query()
|
||||
uploadID = queryValues.Get(uploadIDHeaderName)
|
||||
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
|
||||
maxParts = layer.MaxSizePartsList
|
||||
)
|
||||
|
||||
if queryValues.Get("max-parts") != "" {
|
||||
val, err := strconv.Atoi(queryValues.Get("max-parts"))
|
||||
if err != nil || val < 0 {
|
||||
h.logAndSendError(w, "invalid MaxParts", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxParts), additional...)
|
||||
return
|
||||
}
|
||||
if val < layer.MaxSizePartsList {
|
||||
maxParts = val
|
||||
}
|
||||
}
|
||||
|
||||
if queryValues.Get("part-number-marker") != "" {
|
||||
if partNumberMarker, err = strconv.Atoi(queryValues.Get("part-number-marker")); err != nil || partNumberMarker <= 0 {
|
||||
h.logAndSendError(w, "invalid PartNumberMarker", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
p := &layer.ListPartsParams{
|
||||
Info: &layer.UploadInfoParams{
|
||||
UploadID: uploadID,
|
||||
Bkt: bktInfo,
|
||||
Key: reqInfo.ObjectName,
|
||||
},
|
||||
MaxParts: maxParts,
|
||||
PartNumberMarker: partNumberMarker,
|
||||
}
|
||||
|
||||
list, err := h.obj.ListParts(r.Context(), p)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not list parts", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
if err = api.EncodeToResponse(w, encodeListPartsToResponse(list, p)); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
|
||||
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
queryValues = reqInfo.URL.Query()
|
||||
uploadID = queryValues.Get(uploadIDHeaderName)
|
||||
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
|
||||
|
||||
p = &layer.UploadInfoParams{
|
||||
UploadID: uploadID,
|
||||
Bkt: bktInfo,
|
||||
Key: reqInfo.ObjectName,
|
||||
}
|
||||
)
|
||||
|
||||
if err := h.obj.AbortMultipartUpload(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "could not abort multipart upload", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func encodeListMultipartUploadsToResponse(info *layer.ListMultipartUploadsInfo, params *layer.ListMultipartUploadsParams) *ListMultipartUploadsResponse {
|
||||
res := ListMultipartUploadsResponse{
|
||||
Bucket: params.Bkt.Name,
|
||||
CommonPrefixes: fillPrefixes(info.Prefixes, params.EncodingType),
|
||||
Delimiter: params.Delimiter,
|
||||
EncodingType: params.EncodingType,
|
||||
IsTruncated: info.IsTruncated,
|
||||
KeyMarker: params.KeyMarker,
|
||||
MaxUploads: params.MaxUploads,
|
||||
NextKeyMarker: info.NextKeyMarker,
|
||||
NextUploadIDMarker: info.NextUploadIDMarker,
|
||||
Prefix: params.Prefix,
|
||||
UploadIDMarker: params.UploadIDMarker,
|
||||
}
|
||||
|
||||
uploads := make([]MultipartUpload, 0, len(info.Uploads))
|
||||
for _, u := range info.Uploads {
|
||||
m := MultipartUpload{
|
||||
Initiated: u.Created.Format(time.RFC3339),
|
||||
Initiator: Initiator{
|
||||
ID: u.Owner.String(),
|
||||
DisplayName: u.Owner.String(),
|
||||
},
|
||||
Key: u.Key,
|
||||
Owner: Owner{
|
||||
ID: u.Owner.String(),
|
||||
DisplayName: u.Owner.String(),
|
||||
},
|
||||
UploadID: u.UploadID,
|
||||
}
|
||||
uploads = append(uploads, m)
|
||||
}
|
||||
|
||||
res.Uploads = uploads
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
func encodeListPartsToResponse(info *layer.ListPartsInfo, params *layer.ListPartsParams) *ListPartsResponse {
|
||||
return &ListPartsResponse{
|
||||
XMLName: xml.Name{},
|
||||
Bucket: params.Info.Bkt.Name,
|
||||
Initiator: Initiator{
|
||||
ID: info.Owner.String(),
|
||||
DisplayName: info.Owner.String(),
|
||||
},
|
||||
IsTruncated: info.IsTruncated,
|
||||
Key: params.Info.Key,
|
||||
MaxParts: params.MaxParts,
|
||||
NextPartNumberMarker: info.NextPartNumberMarker,
|
||||
Owner: Owner{
|
||||
ID: info.Owner.String(),
|
||||
DisplayName: info.Owner.String(),
|
||||
},
|
||||
PartNumberMarker: params.PartNumberMarker,
|
||||
UploadID: params.Info.UploadID,
|
||||
Parts: info.Parts,
|
||||
}
|
||||
}
|
|
@ -7,30 +7,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
)
|
||||
|
||||
func (h *handler) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not implemented", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
|
||||
}
|
||||
|
||||
func (h *handler) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not implemented", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
|
||||
}
|
||||
|
||||
func (h *handler) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not implemented", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
|
||||
}
|
||||
|
||||
func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not implemented", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
|
||||
}
|
||||
|
||||
func (h *handler) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not implemented", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
|
||||
}
|
||||
|
||||
func (h *handler) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not implemented", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
|
||||
}
|
||||
|
||||
func (h *handler) SelectObjectContentHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h.logAndSendError(w, "not implemented", api.GetReqInfo(r.Context()), errors.GetAPIError(errors.ErrNotImplemented))
|
||||
}
|
||||
|
|
|
@ -2,8 +2,12 @@ package handler
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -38,3 +42,39 @@ func (h *handler) checkBucketOwner(r *http.Request, bucket string, header ...str
|
|||
|
||||
return checkOwner(bktInfo, expected)
|
||||
}
|
||||
|
||||
func parseRange(s string) (*layer.RangeParams, error) {
|
||||
if s == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
prefix := "bytes="
|
||||
|
||||
if !strings.HasPrefix(s, prefix) {
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidRange)
|
||||
}
|
||||
|
||||
s = strings.TrimPrefix(s, prefix)
|
||||
|
||||
valuesStr := strings.Split(s, "-")
|
||||
if len(valuesStr) != 2 {
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidRange)
|
||||
}
|
||||
|
||||
values := make([]uint64, 0, len(valuesStr))
|
||||
for _, v := range valuesStr {
|
||||
num, err := strconv.ParseUint(v, 10, 64)
|
||||
if err != nil {
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidRange)
|
||||
}
|
||||
values = append(values, num)
|
||||
}
|
||||
if values[0] > values[1] {
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidRange)
|
||||
}
|
||||
|
||||
return &layer.RangeParams{
|
||||
Start: values[0],
|
||||
End: values[1],
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -8,6 +8,8 @@ const (
|
|||
AmzTaggingCount = "X-Amz-Tagging-Count"
|
||||
AmzTagging = "X-Amz-Tagging"
|
||||
AmzDeleteMarker = "X-Amz-Delete-Marker"
|
||||
AmzCopySource = "X-Amz-Copy-Source"
|
||||
AmzCopySourceRange = "X-Amz-Copy-Source-Range"
|
||||
|
||||
LastModified = "Last-Modified"
|
||||
Date = "Date"
|
||||
|
|
|
@ -125,6 +125,7 @@ type (
|
|||
DstObject string
|
||||
SrcSize int64
|
||||
Header map[string]string
|
||||
Range *RangeParams
|
||||
}
|
||||
// CreateBucketParams stores bucket create request parameters.
|
||||
CreateBucketParams struct {
|
||||
|
@ -221,6 +222,14 @@ type (
|
|||
DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) ([]*VersionedObject, error)
|
||||
DeleteObjectTagging(ctx context.Context, p *data.ObjectInfo) error
|
||||
DeleteBucketTagging(ctx context.Context, bucket string) error
|
||||
|
||||
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error)
|
||||
UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error)
|
||||
UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error)
|
||||
ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error)
|
||||
AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error
|
||||
ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error)
|
||||
GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*data.ObjectInfo, error)
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -528,6 +537,7 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Obje
|
|||
err := n.GetObject(ctx, &GetObjectParams{
|
||||
ObjectInfo: p.SrcObject,
|
||||
Writer: pw,
|
||||
Range: p.Range,
|
||||
})
|
||||
|
||||
if err = pw.CloseWithError(err); err != nil {
|
||||
|
|
566
api/layer/multipart_upload.go
Normal file
566
api/layer/multipart_upload.go
Normal file
|
@ -0,0 +1,566 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/owner"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
UploadIDAttributeName = "S3-Upload-Id"
|
||||
UploadPartNumberAttributeName = "S3-Upload-Part-Number"
|
||||
UploadKeyAttributeName = "S3-Upload-Key"
|
||||
UploadPartKeyPrefix = ".upload-"
|
||||
|
||||
MaxSizeUploadsList = 1000
|
||||
MaxSizePartsList = 1000
|
||||
UploadMinPartNumber = 1
|
||||
UploadMaxPartNumber = 10000
|
||||
uploadMinSize = 5 * 1048576 // 5MB
|
||||
uploadMaxSize = 5 * 1073741824 // 5GB
|
||||
)
|
||||
|
||||
type (
|
||||
UploadInfoParams struct {
|
||||
UploadID string
|
||||
Bkt *data.BucketInfo
|
||||
Key string
|
||||
}
|
||||
|
||||
UploadPartParams struct {
|
||||
Info *UploadInfoParams
|
||||
PartNumber int
|
||||
Size int64
|
||||
Reader io.Reader
|
||||
Header map[string]string
|
||||
}
|
||||
|
||||
UploadCopyParams struct {
|
||||
Info *UploadInfoParams
|
||||
SrcObjInfo *data.ObjectInfo
|
||||
PartNumber int
|
||||
Range *RangeParams
|
||||
}
|
||||
|
||||
CompleteMultipartParams struct {
|
||||
Info *UploadInfoParams
|
||||
Parts []*CompletedPart
|
||||
}
|
||||
|
||||
CompletedPart struct {
|
||||
ETag string
|
||||
PartNumber int
|
||||
}
|
||||
|
||||
Part struct {
|
||||
ETag string
|
||||
LastModified string
|
||||
PartNumber int
|
||||
Size int64
|
||||
}
|
||||
|
||||
ListMultipartUploadsParams struct {
|
||||
Bkt *data.BucketInfo
|
||||
Delimiter string
|
||||
EncodingType string
|
||||
KeyMarker string
|
||||
MaxUploads int
|
||||
Prefix string
|
||||
UploadIDMarker string
|
||||
}
|
||||
|
||||
ListPartsParams struct {
|
||||
Info *UploadInfoParams
|
||||
MaxParts int
|
||||
PartNumberMarker int
|
||||
}
|
||||
|
||||
ListPartsInfo struct {
|
||||
Parts []*Part
|
||||
Owner *owner.ID
|
||||
NextPartNumberMarker int
|
||||
IsTruncated bool
|
||||
}
|
||||
|
||||
ListMultipartUploadsInfo struct {
|
||||
Prefixes []string
|
||||
Uploads []*UploadInfo
|
||||
IsTruncated bool
|
||||
NextKeyMarker string
|
||||
NextUploadIDMarker string
|
||||
}
|
||||
UploadInfo struct {
|
||||
IsDir bool
|
||||
Key string
|
||||
UploadID string
|
||||
Owner *owner.ID
|
||||
Created time.Time
|
||||
}
|
||||
)
|
||||
|
||||
func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error) {
|
||||
if p.PartNumber != 0 {
|
||||
if _, err := n.GetUploadInitInfo(ctx, p.Info); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if p.Size > uploadMaxSize {
|
||||
return nil, errors.GetAPIError(errors.ErrEntityTooLarge)
|
||||
}
|
||||
|
||||
if p.Header == nil {
|
||||
p.Header = make(map[string]string)
|
||||
}
|
||||
|
||||
appendUploadHeaders(p.Header, p.Info.UploadID, p.Info.Key, p.PartNumber)
|
||||
|
||||
params := &PutObjectParams{
|
||||
Bucket: p.Info.Bkt.Name,
|
||||
Object: createUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber),
|
||||
Size: p.Size,
|
||||
Reader: p.Reader,
|
||||
Header: p.Header,
|
||||
}
|
||||
|
||||
return n.objectPut(ctx, p.Info.Bkt, params)
|
||||
}
|
||||
|
||||
func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) {
|
||||
if _, err := n.GetUploadInitInfo(ctx, p.Info); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if p.Range != nil {
|
||||
if p.Range.End-p.Range.Start > uploadMaxSize {
|
||||
return nil, errors.GetAPIError(errors.ErrEntityTooLarge)
|
||||
}
|
||||
if p.Range.End > uint64(p.SrcObjInfo.Size) {
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidCopyPartRangeSource)
|
||||
}
|
||||
} else {
|
||||
if p.SrcObjInfo.Size > uploadMaxSize {
|
||||
return nil, errors.GetAPIError(errors.ErrEntityTooLarge)
|
||||
}
|
||||
}
|
||||
|
||||
metadata := make(map[string]string)
|
||||
appendUploadHeaders(metadata, p.Info.UploadID, p.Info.Key, p.PartNumber)
|
||||
|
||||
c := &CopyObjectParams{
|
||||
SrcObject: p.SrcObjInfo,
|
||||
DstBucket: p.Info.Bkt.Name,
|
||||
DstObject: createUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber),
|
||||
SrcSize: p.SrcObjInfo.Size,
|
||||
Header: metadata,
|
||||
Range: p.Range,
|
||||
}
|
||||
|
||||
return n.CopyObject(ctx, c)
|
||||
}
|
||||
|
||||
func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error) {
|
||||
var obj *data.ObjectInfo
|
||||
|
||||
for i := 1; i < len(p.Parts); i++ {
|
||||
if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber {
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidPartOrder)
|
||||
}
|
||||
}
|
||||
|
||||
objects, err := n.getUploadParts(ctx, p.Info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(objects) == 1 {
|
||||
obj, err := n.headLastVersionIfNotDeleted(ctx, p.Info.Bkt, p.Info.Key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if obj != nil && obj.Headers[UploadIDAttributeName] != "" {
|
||||
return obj, nil
|
||||
}
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidPart)
|
||||
}
|
||||
|
||||
if _, ok := objects[0]; !ok {
|
||||
n.log.Error("could not get init multipart upload",
|
||||
zap.Stringer("bucket id", p.Info.Bkt.CID),
|
||||
zap.String("uploadID", p.Info.UploadID),
|
||||
zap.String("uploadKey", p.Info.Key),
|
||||
)
|
||||
// we return InternalError because if we are here it means we've checked InitPart before in handler and
|
||||
// received successful result, it's strange we didn't get the InitPart again
|
||||
return nil, errors.GetAPIError(errors.ErrInternalError)
|
||||
}
|
||||
|
||||
if len(objects) < len(p.Parts) {
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidPart)
|
||||
}
|
||||
|
||||
parts := make([]*data.ObjectInfo, 0, len(p.Parts))
|
||||
|
||||
for i, part := range p.Parts {
|
||||
info := objects[part.PartNumber]
|
||||
if info == nil || part.ETag != info.HashSum {
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidPart)
|
||||
}
|
||||
// for the last part we have no minimum size limit
|
||||
if i != len(p.Parts)-1 && info.Size < uploadMinSize {
|
||||
return nil, errors.GetAPIError(errors.ErrEntityTooSmall)
|
||||
}
|
||||
parts = append(parts, info)
|
||||
}
|
||||
|
||||
initMetadata := objects[0].Headers
|
||||
if len(objects[0].ContentType) != 0 {
|
||||
initMetadata[api.ContentType] = objects[0].ContentType
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
done := make(chan bool)
|
||||
uploadCompleted := false
|
||||
|
||||
/* We will keep "S3-Upload-Id" attribute in completed object to determine is it "common" object or completed object.
|
||||
We will need to differ these objects if something goes wrong during completing multipart upload.
|
||||
I.e. we had completed the object but didn't put tagging/acl for some reason */
|
||||
delete(initMetadata, UploadPartNumberAttributeName)
|
||||
delete(initMetadata, UploadKeyAttributeName)
|
||||
delete(initMetadata, attrVersionsIgnore)
|
||||
|
||||
go func(done chan bool) {
|
||||
obj, err = n.objectPut(ctx, p.Info.Bkt, &PutObjectParams{
|
||||
Bucket: p.Info.Bkt.Name,
|
||||
Object: p.Info.Key,
|
||||
Reader: pr,
|
||||
Header: initMetadata,
|
||||
})
|
||||
if err != nil {
|
||||
n.log.Error("could not put a completed object (multipart upload)",
|
||||
zap.String("uploadID", p.Info.UploadID),
|
||||
zap.String("uploadKey", p.Info.Key),
|
||||
zap.Error(err))
|
||||
done <- true
|
||||
return
|
||||
}
|
||||
uploadCompleted = true
|
||||
done <- true
|
||||
}(done)
|
||||
for _, part := range parts {
|
||||
_, err := n.objectGetWithPayloadWriter(ctx, &getParams{
|
||||
Writer: pw,
|
||||
cid: p.Info.Bkt.CID,
|
||||
oid: part.ID,
|
||||
})
|
||||
if err != nil {
|
||||
_ = pw.Close()
|
||||
n.log.Error("could not download a part of multipart upload",
|
||||
zap.String("uploadID", p.Info.UploadID),
|
||||
zap.String("part number", part.Headers[UploadPartNumberAttributeName]),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
_ = pw.Close()
|
||||
<-done
|
||||
|
||||
if !uploadCompleted {
|
||||
return nil, errors.GetAPIError(errors.ErrInternalError)
|
||||
}
|
||||
|
||||
for partNum, objInfo := range objects {
|
||||
if partNum == 0 {
|
||||
continue
|
||||
}
|
||||
if err = n.objectDelete(ctx, p.Info.Bkt.CID, objInfo.ID); err != nil {
|
||||
n.log.Warn("could not delete upload part",
|
||||
zap.Stringer("object id", objInfo.ID),
|
||||
zap.Stringer("bucket id", p.Info.Bkt.CID),
|
||||
zap.Error(err))
|
||||
return nil, errors.GetAPIError(errors.ErrInternalError)
|
||||
}
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
|
||||
var result ListMultipartUploadsInfo
|
||||
if p.MaxUploads == 0 {
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
f := &findParams{
|
||||
filters: []filter{{attr: UploadPartNumberAttributeName, val: "0"}},
|
||||
cid: p.Bkt.CID,
|
||||
}
|
||||
|
||||
ids, err := n.objectSearch(ctx, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
uploads := make([]*UploadInfo, 0, len(ids))
|
||||
uniqDirs := make(map[string]struct{})
|
||||
|
||||
for _, id := range ids {
|
||||
meta, err := n.objectHead(ctx, p.Bkt.CID, id)
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't head object",
|
||||
zap.Stringer("object id", id),
|
||||
zap.Stringer("bucket id", p.Bkt.CID),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
info := uploadInfoFromMeta(meta, p.Prefix, p.Delimiter)
|
||||
if info != nil {
|
||||
if info.IsDir {
|
||||
if _, ok := uniqDirs[info.Key]; ok {
|
||||
continue
|
||||
}
|
||||
uniqDirs[info.Key] = struct{}{}
|
||||
}
|
||||
uploads = append(uploads, info)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(uploads, func(i, j int) bool {
|
||||
if uploads[i].Key == uploads[j].Key {
|
||||
return uploads[i].UploadID < uploads[j].UploadID
|
||||
}
|
||||
return uploads[i].Key < uploads[j].Key
|
||||
})
|
||||
|
||||
if p.KeyMarker != "" {
|
||||
if p.UploadIDMarker != "" {
|
||||
uploads = trimAfterUploadIDAndKey(p.KeyMarker, p.UploadIDMarker, uploads)
|
||||
} else {
|
||||
uploads = trimAfterUploadKey(p.KeyMarker, uploads)
|
||||
}
|
||||
}
|
||||
|
||||
if len(uploads) > p.MaxUploads {
|
||||
result.IsTruncated = true
|
||||
uploads = uploads[:p.MaxUploads]
|
||||
result.NextUploadIDMarker = uploads[len(uploads)-1].UploadID
|
||||
result.NextKeyMarker = uploads[len(uploads)-1].Key
|
||||
}
|
||||
|
||||
for _, ov := range uploads {
|
||||
if ov.IsDir {
|
||||
result.Prefixes = append(result.Prefixes, ov.Key)
|
||||
} else {
|
||||
result.Uploads = append(result.Uploads, ov)
|
||||
}
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
|
||||
objects, err := n.getUploadParts(ctx, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, info := range objects {
|
||||
err := n.objectDelete(ctx, info.CID, info.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||
var res ListPartsInfo
|
||||
objs, err := n.getUploadParts(ctx, p.Info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res.Owner = objs[0].Owner
|
||||
|
||||
parts := make([]*Part, 0, len(objs))
|
||||
|
||||
for num, objInfo := range objs {
|
||||
if num == 0 {
|
||||
continue
|
||||
}
|
||||
parts = append(parts, &Part{
|
||||
ETag: objInfo.HashSum,
|
||||
LastModified: objInfo.Created.Format(time.RFC3339),
|
||||
PartNumber: num,
|
||||
Size: objInfo.Size,
|
||||
})
|
||||
}
|
||||
|
||||
sort.Slice(parts, func(i, j int) bool {
|
||||
return parts[i].PartNumber < parts[j].PartNumber
|
||||
})
|
||||
|
||||
if p.PartNumberMarker != 0 {
|
||||
for i, part := range parts {
|
||||
if part.PartNumber > p.PartNumberMarker {
|
||||
parts = parts[i:]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(parts) > p.MaxParts {
|
||||
res.IsTruncated = true
|
||||
res.NextPartNumberMarker = parts[p.MaxParts-1].PartNumber
|
||||
parts = parts[:p.MaxParts]
|
||||
}
|
||||
|
||||
res.Parts = parts
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (n *layer) GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*data.ObjectInfo, error) {
|
||||
ids, err := n.objectSearchByName(ctx, p.Bkt.CID, createUploadPartName(p.UploadID, p.Key, 0))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
return nil, errors.GetAPIError(errors.ErrNoSuchUpload)
|
||||
}
|
||||
if len(ids) > 1 {
|
||||
return nil, errors.GetAPIError(errors.ErrInternalError)
|
||||
}
|
||||
|
||||
meta, err := n.objectHead(ctx, p.Bkt.CID, ids[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return objInfoFromMeta(p.Bkt, meta), nil
|
||||
}
|
||||
|
||||
func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (map[int]*data.ObjectInfo, error) {
|
||||
f := &findParams{
|
||||
cid: p.Bkt.CID,
|
||||
prefix: UploadPartKeyPrefix + p.UploadID + "-" + p.Key,
|
||||
}
|
||||
|
||||
ids, err := n.objectSearch(ctx, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
return nil, errors.GetAPIError(errors.ErrNoSuchUpload)
|
||||
}
|
||||
|
||||
res := make(map[int]*data.ObjectInfo)
|
||||
|
||||
for _, id := range ids {
|
||||
meta, err := n.objectHead(ctx, p.Bkt.CID, id)
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't head a part of upload",
|
||||
zap.Stringer("object id", id),
|
||||
zap.Stringer("bucket id", p.Bkt.CID),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
info := objInfoFromMeta(p.Bkt, meta)
|
||||
numStr := info.Headers[UploadPartNumberAttributeName]
|
||||
num, err := strconv.Atoi(numStr)
|
||||
if err != nil {
|
||||
return nil, errors.GetAPIError(errors.ErrInternalError)
|
||||
}
|
||||
res[num] = info
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func createUploadPartName(uploadID, key string, partNumber int) string {
|
||||
return UploadPartKeyPrefix + uploadID + "-" + key + "-" + strconv.Itoa(partNumber)
|
||||
}
|
||||
|
||||
func trimAfterUploadIDAndKey(key, id string, uploads []*UploadInfo) []*UploadInfo {
|
||||
var res []*UploadInfo
|
||||
if len(uploads) != 0 && uploads[len(uploads)-1].Key < key {
|
||||
return res
|
||||
}
|
||||
|
||||
for _, obj := range uploads {
|
||||
if obj.Key >= key && obj.UploadID > id {
|
||||
res = append(res, obj)
|
||||
}
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
|
||||
var result []*UploadInfo
|
||||
if len(objects) != 0 && objects[len(objects)-1].Key <= key {
|
||||
return result
|
||||
}
|
||||
for i, obj := range objects {
|
||||
if obj.Key > key {
|
||||
result = objects[i:]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func uploadInfoFromMeta(meta *object.Object, prefix, delimiter string) *UploadInfo {
|
||||
var (
|
||||
isDir bool
|
||||
creation time.Time
|
||||
userHeaders = userHeaders(meta.Attributes())
|
||||
key = userHeaders[UploadKeyAttributeName]
|
||||
)
|
||||
|
||||
if !strings.HasPrefix(key, prefix) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if val, ok := userHeaders[object.AttributeTimestamp]; ok {
|
||||
if dt, err := strconv.ParseInt(val, 10, 64); err == nil {
|
||||
creation = time.Unix(dt, 0)
|
||||
}
|
||||
}
|
||||
|
||||
if len(delimiter) > 0 {
|
||||
tail := strings.TrimPrefix(key, prefix)
|
||||
index := strings.Index(tail, delimiter)
|
||||
if index >= 0 {
|
||||
isDir = true
|
||||
key = prefix + tail[:index+1]
|
||||
}
|
||||
}
|
||||
|
||||
return &UploadInfo{
|
||||
IsDir: isDir,
|
||||
Key: key,
|
||||
UploadID: userHeaders[UploadIDAttributeName],
|
||||
Owner: meta.OwnerID(),
|
||||
Created: creation,
|
||||
}
|
||||
}
|
||||
|
||||
func appendUploadHeaders(metadata map[string]string, uploadID, key string, partNumber int) {
|
||||
metadata[UploadIDAttributeName] = uploadID
|
||||
metadata[UploadPartNumberAttributeName] = strconv.Itoa(partNumber)
|
||||
metadata[UploadKeyAttributeName] = key
|
||||
metadata[attrVersionsIgnore] = "true"
|
||||
}
|
|
@ -17,12 +17,6 @@ type (
|
|||
// Handler is an S3 API handler interface.
|
||||
Handler interface {
|
||||
HeadObjectHandler(http.ResponseWriter, *http.Request)
|
||||
CopyObjectPartHandler(http.ResponseWriter, *http.Request)
|
||||
PutObjectPartHandler(http.ResponseWriter, *http.Request)
|
||||
ListObjectPartsHandler(http.ResponseWriter, *http.Request)
|
||||
CompleteMultipartUploadHandler(http.ResponseWriter, *http.Request)
|
||||
NewMultipartUploadHandler(http.ResponseWriter, *http.Request)
|
||||
AbortMultipartUploadHandler(http.ResponseWriter, *http.Request)
|
||||
GetObjectACLHandler(http.ResponseWriter, *http.Request)
|
||||
PutObjectACLHandler(http.ResponseWriter, *http.Request)
|
||||
GetObjectTaggingHandler(http.ResponseWriter, *http.Request)
|
||||
|
@ -58,7 +52,6 @@ type (
|
|||
GetBucketVersioningHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketNotificationHandler(http.ResponseWriter, *http.Request)
|
||||
ListenBucketNotificationHandler(http.ResponseWriter, *http.Request)
|
||||
ListMultipartUploadsHandler(http.ResponseWriter, *http.Request)
|
||||
ListObjectsV2MHandler(http.ResponseWriter, *http.Request)
|
||||
ListObjectsV2Handler(http.ResponseWriter, *http.Request)
|
||||
ListBucketObjectVersionsHandler(http.ResponseWriter, *http.Request)
|
||||
|
@ -81,6 +74,13 @@ type (
|
|||
ListBucketsHandler(http.ResponseWriter, *http.Request)
|
||||
Preflight(w http.ResponseWriter, r *http.Request)
|
||||
AppendCORSHeaders(w http.ResponseWriter, r *http.Request)
|
||||
CreateMultipartUploadHandler(http.ResponseWriter, *http.Request)
|
||||
UploadPartHandler(http.ResponseWriter, *http.Request)
|
||||
UploadPartCopy(w http.ResponseWriter, r *http.Request)
|
||||
CompleteMultipartUploadHandler(http.ResponseWriter, *http.Request)
|
||||
AbortMultipartUploadHandler(http.ResponseWriter, *http.Request)
|
||||
ListPartsHandler(w http.ResponseWriter, r *http.Request)
|
||||
ListMultipartUploadsHandler(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
// mimeType represents various MIME type used API responses.
|
||||
|
@ -218,28 +218,32 @@ func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center aut
|
|||
bucket.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("headobject", h.HeadObjectHandler))).Name("HeadObject")
|
||||
// CopyObjectPart
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(hdrAmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(m.Handle(metrics.APIStats("copyobjectpart", h.CopyObjectPartHandler))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").
|
||||
Name("CopyObjectPart")
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(hdrAmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(m.Handle(metrics.APIStats("uploadpartcopy", h.UploadPartCopy))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").
|
||||
Name("UploadPartCopy")
|
||||
// PutObjectPart
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putobjectpart", h.PutObjectPartHandler))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").
|
||||
Name("PutObjectObject")
|
||||
// ListObjectParts
|
||||
m.Handle(metrics.APIStats("uploadpart", h.UploadPartHandler))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").
|
||||
Name("UploadPart")
|
||||
// ListParts
|
||||
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listobjectparts", h.ListObjectPartsHandler))).Queries("uploadId", "{uploadId:.*}").
|
||||
m.Handle(metrics.APIStats("listobjectparts", h.ListPartsHandler))).Queries("uploadId", "{uploadId:.*}").
|
||||
Name("ListObjectParts")
|
||||
// CompleteMultipartUpload
|
||||
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("completemutipartupload", h.CompleteMultipartUploadHandler))).Queries("uploadId", "{uploadId:.*}").
|
||||
Name("CompleteMultipartUpload")
|
||||
// NewMultipartUpload
|
||||
// CreateMultipartUpload
|
||||
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("newmultipartupload", h.NewMultipartUploadHandler))).Queries("uploads", "").
|
||||
Name("NewMultipartUpload")
|
||||
m.Handle(metrics.APIStats("createmultipartupload", h.CreateMultipartUploadHandler))).Queries("uploads", "").
|
||||
Name("CreateMultipartUpload")
|
||||
// AbortMultipartUpload
|
||||
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("abortmultipartupload", h.AbortMultipartUploadHandler))).Queries("uploadId", "{uploadId:.*}").
|
||||
Name("AbortMultipartUpload")
|
||||
// ListMultipartUploads
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listmultipartuploads", h.ListMultipartUploadsHandler))).Queries("uploads", "").
|
||||
Name("ListMultipartUploads")
|
||||
// GetObjectACL - this is a dummy call.
|
||||
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getobjectacl", h.GetObjectACLHandler))).Queries("acl", "").
|
||||
|
@ -384,10 +388,6 @@ func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center aut
|
|||
// ListenBucketNotification
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(metrics.APIStats("listenbucketnotification", h.ListenBucketNotificationHandler)).Queries("events", "{events:.*}").
|
||||
Name("ListenBucketNotification")
|
||||
// ListMultipartUploads
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listmultipartuploads", h.ListMultipartUploadsHandler))).Queries("uploads", "").
|
||||
Name("ListMultipartUploads")
|
||||
// ListObjectsV2M
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listobjectsv2M", h.ListObjectsV2MHandler))).Queries("list-type", "2", "metadata", "true").
|
||||
|
|
Loading…
Reference in a new issue