forked from TrueCloudLab/frostfs-s3-gw
[#63] multipart: Fix copying
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
ad81b599dd
commit
bcf5a85aab
5 changed files with 97 additions and 35 deletions
|
@ -180,6 +180,7 @@ const (
|
||||||
ErrGatewayTimeout
|
ErrGatewayTimeout
|
||||||
ErrOperationMaxedOut
|
ErrOperationMaxedOut
|
||||||
ErrInvalidRequest
|
ErrInvalidRequest
|
||||||
|
ErrInvalidRequestLargeCopy
|
||||||
ErrInvalidStorageClass
|
ErrInvalidStorageClass
|
||||||
|
|
||||||
ErrMalformedJSON
|
ErrMalformedJSON
|
||||||
|
@ -1161,6 +1162,12 @@ var errorCodes = errorCodeMap{
|
||||||
Description: "Invalid Request",
|
Description: "Invalid Request",
|
||||||
HTTPStatusCode: http.StatusBadRequest,
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
},
|
},
|
||||||
|
ErrInvalidRequestLargeCopy: {
|
||||||
|
ErrCode: ErrInvalidRequestLargeCopy,
|
||||||
|
Code: "InvalidRequest",
|
||||||
|
Description: "CopyObject request made on objects larger than 5GB in size.",
|
||||||
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
|
},
|
||||||
ErrIncorrectContinuationToken: {
|
ErrIncorrectContinuationToken: {
|
||||||
ErrCode: ErrIncorrectContinuationToken,
|
ErrCode: ErrIncorrectContinuationToken,
|
||||||
Code: "InvalidArgument",
|
Code: "InvalidArgument",
|
||||||
|
|
|
@ -106,6 +106,25 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
srcObjInfo := extendedSrcObjInfo.ObjectInfo
|
srcObjInfo := extendedSrcObjInfo.ObjectInfo
|
||||||
|
|
||||||
|
encryptionParams, err := formEncryptionParams(r)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = encryptionParams.MatchObjectEncryption(layer.FormEncryptionInfo(srcObjInfo.Headers)); err != nil {
|
||||||
|
h.logAndSendError(w, "encryption doesn't match object", reqInfo, errors.GetAPIError(errors.ErrBadRequest), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if srcSize, err := getObjectSize(extendedSrcObjInfo, encryptionParams); err != nil {
|
||||||
|
h.logAndSendError(w, "failed to get source object size", reqInfo, err)
|
||||||
|
return
|
||||||
|
} else if srcSize > layer.UploadMaxSize { //https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
|
||||||
|
h.logAndSendError(w, "too bid object to copy with single copy operation, use multipart upload copy instead", reqInfo, errors.GetAPIError(errors.ErrInvalidRequestLargeCopy))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
args, err := parseCopyObjectArgs(r.Header)
|
args, err := parseCopyObjectArgs(r.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logAndSendError(w, "could not parse request params", reqInfo, err)
|
h.logAndSendError(w, "could not parse request params", reqInfo, err)
|
||||||
|
@ -144,17 +163,6 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
encryptionParams, err := formEncryptionParams(r)
|
|
||||||
if err != nil {
|
|
||||||
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = encryptionParams.MatchObjectEncryption(layer.FormEncryptionInfo(srcObjInfo.Headers)); err != nil {
|
|
||||||
h.logAndSendError(w, "encryption doesn't match object", reqInfo, errors.GetAPIError(errors.ErrBadRequest), zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = checkPreconditions(srcObjInfo, args.Conditional); err != nil {
|
if err = checkPreconditions(srcObjInfo, args.Conditional); err != nil {
|
||||||
h.logAndSendError(w, "precondition failed", reqInfo, errors.GetAPIError(errors.ErrPreconditionFailed))
|
h.logAndSendError(w, "precondition failed", reqInfo, errors.GetAPIError(errors.ErrPreconditionFailed))
|
||||||
return
|
return
|
||||||
|
@ -164,7 +172,8 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if len(srcObjInfo.ContentType) > 0 {
|
if len(srcObjInfo.ContentType) > 0 {
|
||||||
srcObjInfo.Headers[api.ContentType] = srcObjInfo.ContentType
|
srcObjInfo.Headers[api.ContentType] = srcObjInfo.ContentType
|
||||||
}
|
}
|
||||||
metadata = srcObjInfo.Headers
|
metadata = makeCopyMap(srcObjInfo.Headers)
|
||||||
|
delete(metadata, layer.MultipartObjectSize) // object payload will be real one rather than list of compound parts
|
||||||
} else if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 {
|
} else if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 {
|
||||||
metadata[api.ContentType] = contentType
|
metadata[api.ContentType] = contentType
|
||||||
}
|
}
|
||||||
|
@ -257,6 +266,14 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeCopyMap(headers map[string]string) map[string]string {
|
||||||
|
res := make(map[string]string, len(headers))
|
||||||
|
for key, val := range headers {
|
||||||
|
res[key] = val
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
func isCopyingToItselfForbidden(reqInfo *middleware.ReqInfo, srcBucket string, srcObject string, settings *data.BucketSettings, args *copyObjectArgs) bool {
|
func isCopyingToItselfForbidden(reqInfo *middleware.ReqInfo, srcBucket string, srcObject string, settings *data.BucketSettings, args *copyObjectArgs) bool {
|
||||||
if reqInfo.BucketName != srcBucket || reqInfo.ObjectName != srcObject {
|
if reqInfo.BucketName != srcBucket || reqInfo.ObjectName != srcObject {
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -29,14 +30,14 @@ func TestCopyWithTaggingDirective(t *testing.T) {
|
||||||
copyMeta := CopyMeta{
|
copyMeta := CopyMeta{
|
||||||
Tags: map[string]string{"key2": "val"},
|
Tags: map[string]string{"key2": "val"},
|
||||||
}
|
}
|
||||||
copyObject(t, tc, bktName, objName, objToCopy, copyMeta, http.StatusOK)
|
copyObject(tc, bktName, objName, objToCopy, copyMeta, http.StatusOK)
|
||||||
tagging := getObjectTagging(t, tc, bktName, objToCopy, emptyVersion)
|
tagging := getObjectTagging(t, tc, bktName, objToCopy, emptyVersion)
|
||||||
require.Len(t, tagging.TagSet, 1)
|
require.Len(t, tagging.TagSet, 1)
|
||||||
require.Equal(t, "key", tagging.TagSet[0].Key)
|
require.Equal(t, "key", tagging.TagSet[0].Key)
|
||||||
require.Equal(t, "val", tagging.TagSet[0].Value)
|
require.Equal(t, "val", tagging.TagSet[0].Value)
|
||||||
|
|
||||||
copyMeta.TaggingDirective = replaceDirective
|
copyMeta.TaggingDirective = replaceDirective
|
||||||
copyObject(t, tc, bktName, objName, objToCopy2, copyMeta, http.StatusOK)
|
copyObject(tc, bktName, objName, objToCopy2, copyMeta, http.StatusOK)
|
||||||
tagging = getObjectTagging(t, tc, bktName, objToCopy2, emptyVersion)
|
tagging = getObjectTagging(t, tc, bktName, objToCopy2, emptyVersion)
|
||||||
require.Len(t, tagging.TagSet, 1)
|
require.Len(t, tagging.TagSet, 1)
|
||||||
require.Equal(t, "key2", tagging.TagSet[0].Key)
|
require.Equal(t, "key2", tagging.TagSet[0].Key)
|
||||||
|
@ -51,20 +52,54 @@ func TestCopyToItself(t *testing.T) {
|
||||||
|
|
||||||
copyMeta := CopyMeta{MetadataDirective: replaceDirective}
|
copyMeta := CopyMeta{MetadataDirective: replaceDirective}
|
||||||
|
|
||||||
copyObject(t, tc, bktName, objName, objName, CopyMeta{}, http.StatusBadRequest)
|
copyObject(tc, bktName, objName, objName, CopyMeta{}, http.StatusBadRequest)
|
||||||
copyObject(t, tc, bktName, objName, objName, copyMeta, http.StatusOK)
|
copyObject(tc, bktName, objName, objName, copyMeta, http.StatusOK)
|
||||||
|
|
||||||
putBucketVersioning(t, tc, bktName, true)
|
putBucketVersioning(t, tc, bktName, true)
|
||||||
copyObject(t, tc, bktName, objName, objName, CopyMeta{}, http.StatusOK)
|
copyObject(tc, bktName, objName, objName, CopyMeta{}, http.StatusOK)
|
||||||
copyObject(t, tc, bktName, objName, objName, copyMeta, http.StatusOK)
|
copyObject(tc, bktName, objName, objName, copyMeta, http.StatusOK)
|
||||||
|
|
||||||
putBucketVersioning(t, tc, bktName, false)
|
putBucketVersioning(t, tc, bktName, false)
|
||||||
copyObject(t, tc, bktName, objName, objName, CopyMeta{}, http.StatusOK)
|
copyObject(tc, bktName, objName, objName, CopyMeta{}, http.StatusOK)
|
||||||
copyObject(t, tc, bktName, objName, objName, copyMeta, http.StatusOK)
|
copyObject(tc, bktName, objName, objName, copyMeta, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyObject(t *testing.T, tc *handlerContext, bktName, fromObject, toObject string, copyMeta CopyMeta, statusCode int) {
|
func TestCopyMultipart(t *testing.T) {
|
||||||
w, r := prepareTestRequest(tc, bktName, toObject, nil)
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-copy", "object-for-copy"
|
||||||
|
createTestBucket(hc, bktName)
|
||||||
|
|
||||||
|
partSize := layer.UploadMinSize
|
||||||
|
objLen := 6 * partSize
|
||||||
|
headers := map[string]string{}
|
||||||
|
|
||||||
|
data := multipartUpload(hc, bktName, objName, headers, objLen, partSize)
|
||||||
|
require.Equal(t, objLen, len(data))
|
||||||
|
|
||||||
|
objToCopy := "copy-target"
|
||||||
|
var copyMeta CopyMeta
|
||||||
|
copyObject(hc, bktName, objName, objToCopy, copyMeta, http.StatusOK)
|
||||||
|
|
||||||
|
copiedData, _ := getObject(hc, bktName, objToCopy)
|
||||||
|
equalDataSlices(t, data, copiedData)
|
||||||
|
|
||||||
|
result := getObjectAttributes(hc, bktName, objToCopy, objectParts)
|
||||||
|
require.NotNil(t, result.ObjectParts)
|
||||||
|
|
||||||
|
objToCopy2 := "copy-target2"
|
||||||
|
copyMeta.MetadataDirective = replaceDirective
|
||||||
|
copyObject(hc, bktName, objName, objToCopy2, copyMeta, http.StatusOK)
|
||||||
|
|
||||||
|
result = getObjectAttributes(hc, bktName, objToCopy2, objectParts)
|
||||||
|
require.Nil(t, result.ObjectParts)
|
||||||
|
|
||||||
|
copiedData, _ = getObject(hc, bktName, objToCopy2)
|
||||||
|
equalDataSlices(t, data, copiedData)
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyObject(hc *handlerContext, bktName, fromObject, toObject string, copyMeta CopyMeta, statusCode int) {
|
||||||
|
w, r := prepareTestRequest(hc, bktName, toObject, nil)
|
||||||
r.Header.Set(api.AmzCopySource, bktName+"/"+fromObject)
|
r.Header.Set(api.AmzCopySource, bktName+"/"+fromObject)
|
||||||
|
|
||||||
r.Header.Set(api.AmzMetadataDirective, copyMeta.MetadataDirective)
|
r.Header.Set(api.AmzMetadataDirective, copyMeta.MetadataDirective)
|
||||||
|
@ -79,8 +114,8 @@ func copyObject(t *testing.T, tc *handlerContext, bktName, fromObject, toObject
|
||||||
}
|
}
|
||||||
r.Header.Set(api.AmzTagging, tagsQuery.Encode())
|
r.Header.Set(api.AmzTagging, tagsQuery.Encode())
|
||||||
|
|
||||||
tc.Handler().CopyObjectHandler(w, r)
|
hc.Handler().CopyObjectHandler(w, r)
|
||||||
assertStatus(t, w, statusCode)
|
assertStatus(hc.t, w, statusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func putObjectTagging(t *testing.T, tc *handlerContext, bktName, objName string, tags map[string]string) {
|
func putObjectTagging(t *testing.T, tc *handlerContext, bktName, objName string, tags map[string]string) {
|
||||||
|
|
|
@ -217,11 +217,11 @@ func getObjectRange(t *testing.T, tc *handlerContext, bktName, objName string, s
|
||||||
}
|
}
|
||||||
|
|
||||||
func getObjectAssertS3Error(hc *handlerContext, bktName, objName, version string, code apiErrors.ErrorCode) {
|
func getObjectAssertS3Error(hc *handlerContext, bktName, objName, version string, code apiErrors.ErrorCode) {
|
||||||
w := getObjectBase(hc, bktName, objName, version)
|
w := getObjectBaseResponse(hc, bktName, objName, version)
|
||||||
assertS3Error(hc.t, w, apiErrors.GetAPIError(code))
|
assertS3Error(hc.t, w, apiErrors.GetAPIError(code))
|
||||||
}
|
}
|
||||||
|
|
||||||
func getObjectBase(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder {
|
func getObjectBaseResponse(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder {
|
||||||
query := make(url.Values)
|
query := make(url.Values)
|
||||||
query.Add(api.QueryVersionID, version)
|
query.Add(api.QueryVersionID, version)
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,10 @@ const (
|
||||||
UploadIDAttributeName = "S3-Upload-Id"
|
UploadIDAttributeName = "S3-Upload-Id"
|
||||||
UploadPartNumberAttributeName = "S3-Upload-Part-Number"
|
UploadPartNumberAttributeName = "S3-Upload-Part-Number"
|
||||||
UploadCompletedParts = "S3-Completed-Parts"
|
UploadCompletedParts = "S3-Completed-Parts"
|
||||||
MultipartObjectSize = "S3-Multipart-Object-Size"
|
|
||||||
|
// MultipartObjectSize contains the real object size if object is combined (payload contains list of parts).
|
||||||
|
// This header is used to determine if object is combined.
|
||||||
|
MultipartObjectSize = "S3-Multipart-Object-Size"
|
||||||
|
|
||||||
metaPrefix = "meta-"
|
metaPrefix = "meta-"
|
||||||
aclPrefix = "acl-"
|
aclPrefix = "acl-"
|
||||||
|
@ -35,8 +38,8 @@ const (
|
||||||
MaxSizePartsList = 1000
|
MaxSizePartsList = 1000
|
||||||
UploadMinPartNumber = 1
|
UploadMinPartNumber = 1
|
||||||
UploadMaxPartNumber = 10000
|
UploadMaxPartNumber = 10000
|
||||||
uploadMinSize = 5 * 1048576 // 5MB
|
UploadMinSize = 5 * 1024 * 1024 // 5MB
|
||||||
uploadMaxSize = 5 * 1073741824 // 5GB
|
UploadMaxSize = 1024 * UploadMinSize // 5GB
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
@ -184,8 +187,8 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Size > uploadMaxSize {
|
if p.Size > UploadMaxSize {
|
||||||
return "", fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), p.Size, uploadMaxSize)
|
return "", fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), p.Size, UploadMaxSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo, err := n.uploadPart(ctx, multipartInfo, p)
|
objInfo, err := n.uploadPart(ctx, multipartInfo, p)
|
||||||
|
@ -292,8 +295,8 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
||||||
return nil, fmt.Errorf("%w: %d-%d/%d", s3errors.GetAPIError(s3errors.ErrInvalidCopyPartRangeSource), p.Range.Start, p.Range.End, p.SrcObjInfo.Size)
|
return nil, fmt.Errorf("%w: %d-%d/%d", s3errors.GetAPIError(s3errors.ErrInvalidCopyPartRangeSource), p.Range.Start, p.Range.End, p.SrcObjInfo.Size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if size > uploadMaxSize {
|
if size > UploadMaxSize {
|
||||||
return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, uploadMaxSize)
|
return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, UploadMaxSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
objPayload, err := n.GetObject(ctx, &GetObjectParams{
|
objPayload, err := n.GetObject(ctx, &GetObjectParams{
|
||||||
|
@ -346,8 +349,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
delete(partsInfo, part.PartNumber)
|
delete(partsInfo, part.PartNumber)
|
||||||
|
|
||||||
// for the last part we have no minimum size limit
|
// for the last part we have no minimum size limit
|
||||||
if i != len(p.Parts)-1 && partInfo.Size < uploadMinSize {
|
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
|
||||||
return nil, nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooSmall), partInfo.Size, uploadMinSize)
|
return nil, nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooSmall), partInfo.Size, UploadMinSize)
|
||||||
}
|
}
|
||||||
parts = append(parts, partInfo)
|
parts = append(parts, partInfo)
|
||||||
multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted)
|
multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted)
|
||||||
|
|
Loading…
Reference in a new issue