forked from TrueCloudLab/frostfs-s3-gw
[#185] Use correct object size when object is combined or encrypted
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
631d9d83b6
commit
6617adc22b
8 changed files with 120 additions and 39 deletions
|
@ -118,7 +118,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if srcSize, err := getObjectSize(extendedSrcObjInfo, encryptionParams); err != nil {
|
if srcSize, err := layer.GetObjectSize(srcObjInfo); err != nil {
|
||||||
h.logAndSendError(w, "failed to get source object size", reqInfo, err)
|
h.logAndSendError(w, "failed to get source object size", reqInfo, err)
|
||||||
return
|
return
|
||||||
} else if srcSize > layer.UploadMaxSize { //https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
|
} else if srcSize > layer.UploadMaxSize { //https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
|
||||||
|
|
|
@ -333,7 +333,7 @@ func TestDeleteObjectFromListCache(t *testing.T) {
|
||||||
bktName, objName := "bucket-for-removal", "object-to-delete"
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
||||||
bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
|
bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
|
||||||
|
|
||||||
versions := listObjectsV1(t, tc, bktName, "", "", "", -1)
|
versions := listObjectsV1(tc, bktName, "", "", "", -1)
|
||||||
require.Len(t, versions.Contents, 1)
|
require.Len(t, versions.Contents, 1)
|
||||||
|
|
||||||
checkFound(t, tc, bktName, objName, objInfo.VersionID())
|
checkFound(t, tc, bktName, objName, objInfo.VersionID())
|
||||||
|
@ -341,7 +341,7 @@ func TestDeleteObjectFromListCache(t *testing.T) {
|
||||||
checkNotFound(t, tc, bktName, objName, objInfo.VersionID())
|
checkNotFound(t, tc, bktName, objName, objInfo.VersionID())
|
||||||
|
|
||||||
// check cache is clean after object removal
|
// check cache is clean after object removal
|
||||||
versions = listObjectsV1(t, tc, bktName, "", "", "", -1)
|
versions = listObjectsV1(tc, bktName, "", "", "", -1)
|
||||||
require.Len(t, versions.Contents, 0)
|
require.Len(t, versions.Contents, 0)
|
||||||
|
|
||||||
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo))
|
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo))
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -168,7 +167,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fullSize, err := getObjectSize(extendedInfo, encryptionParams)
|
fullSize, err := layer.GetObjectSize(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logAndSendError(w, "invalid size header", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
|
h.logAndSendError(w, "invalid size header", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
|
||||||
return
|
return
|
||||||
|
@ -233,23 +232,6 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getObjectSize(extendedInfo *data.ExtendedObjectInfo, encryptionParams encryption.Params) (uint64, error) {
|
|
||||||
var err error
|
|
||||||
fullSize := extendedInfo.ObjectInfo.Size
|
|
||||||
|
|
||||||
if encryptionParams.Enabled() {
|
|
||||||
if fullSize, err = strconv.ParseUint(extendedInfo.ObjectInfo.Headers[layer.AttributeDecryptedSize], 10, 64); err != nil {
|
|
||||||
return 0, fmt.Errorf("invalid decrypted size header: %w", err)
|
|
||||||
}
|
|
||||||
} else if extendedInfo.NodeVersion.IsCombined {
|
|
||||||
if fullSize, err = strconv.ParseUint(extendedInfo.ObjectInfo.Headers[layer.MultipartObjectSize], 10, 64); err != nil {
|
|
||||||
return 0, fmt.Errorf("invalid multipart size header: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return fullSize, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkPreconditions(info *data.ObjectInfo, args *conditionalArgs) error {
|
func checkPreconditions(info *data.ObjectInfo, args *conditionalArgs) error {
|
||||||
if len(args.IfMatch) > 0 && args.IfMatch != info.HashSum {
|
if len(args.IfMatch) > 0 && args.IfMatch != info.HashSum {
|
||||||
return fmt.Errorf("%w: etag mismatched: '%s', '%s'", errors.GetAPIError(errors.ErrPreconditionFailed), args.IfMatch, info.HashSum)
|
return fmt.Errorf("%w: etag mismatched: '%s', '%s'", errors.GetAPIError(errors.ErrPreconditionFailed), args.IfMatch, info.HashSum)
|
||||||
|
|
|
@ -3,13 +3,16 @@ package handler
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -172,6 +175,75 @@ func TestListMultipartUploads(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultipartUploadSize(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-test-multipart-size", "object-multipart"
|
||||||
|
createTestBucket(hc, bktName)
|
||||||
|
|
||||||
|
partSize := layer.UploadMinSize
|
||||||
|
objLen := 2 * partSize
|
||||||
|
headers := map[string]string{}
|
||||||
|
|
||||||
|
data := multipartUpload(hc, bktName, objName, headers, objLen, partSize)
|
||||||
|
require.Equal(t, objLen, len(data))
|
||||||
|
|
||||||
|
t.Run("check correct size in list v1", func(t *testing.T) {
|
||||||
|
listV1 := listObjectsV1(hc, bktName, "", "", "", -1)
|
||||||
|
require.Len(t, listV1.Contents, 1)
|
||||||
|
require.Equal(t, objLen, int(listV1.Contents[0].Size))
|
||||||
|
require.Equal(t, objName, listV1.Contents[0].Key)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("check correct size in list v2", func(t *testing.T) {
|
||||||
|
listV2 := listObjectsV2(hc, bktName, "", "", "", "", -1)
|
||||||
|
require.Len(t, listV2.Contents, 1)
|
||||||
|
require.Equal(t, objLen, int(listV2.Contents[0].Size))
|
||||||
|
require.Equal(t, objName, listV2.Contents[0].Key)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("check correct get", func(t *testing.T) {
|
||||||
|
_, hdr := getObject(hc, bktName, objName)
|
||||||
|
require.Equal(t, strconv.Itoa(objLen), hdr.Get(api.ContentLength))
|
||||||
|
|
||||||
|
part := getObjectRange(t, hc, bktName, objName, partSize, objLen-1)
|
||||||
|
equalDataSlices(t, data[partSize:], part)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("check correct size when part copy", func(t *testing.T) {
|
||||||
|
objName2 := "obj2"
|
||||||
|
uploadInfo := createMultipartUpload(hc, bktName, objName2, headers)
|
||||||
|
sourceCopy := bktName + "/" + objName
|
||||||
|
uploadPartCopy(hc, bktName, objName2, uploadInfo.UploadID, 1, sourceCopy, 0, 0)
|
||||||
|
uploadPartCopy(hc, bktName, objName2, uploadInfo.UploadID, 2, sourceCopy, 0, partSize)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
||||||
|
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
|
||||||
|
}
|
||||||
|
|
||||||
|
func uploadPartCopyBase(hc *handlerContext, bktName, objName string, encrypted bool, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
||||||
|
query := make(url.Values)
|
||||||
|
query.Set(uploadIDQuery, uploadID)
|
||||||
|
query.Set(partNumberQuery, strconv.Itoa(num))
|
||||||
|
|
||||||
|
w, r := prepareTestRequestWithQuery(hc, bktName, objName, query, nil)
|
||||||
|
if encrypted {
|
||||||
|
setEncryptHeaders(r)
|
||||||
|
}
|
||||||
|
r.Header.Set(api.AmzCopySource, srcObj)
|
||||||
|
if start+end > 0 {
|
||||||
|
r.Header.Set(api.AmzCopySourceRange, fmt.Sprintf("bytes=%d-%d", start, end))
|
||||||
|
}
|
||||||
|
|
||||||
|
hc.Handler().UploadPartCopy(w, r)
|
||||||
|
uploadPartCopyResponse := &UploadPartCopyResponse{}
|
||||||
|
readResponse(hc.t, w, http.StatusOK, uploadPartCopyResponse)
|
||||||
|
|
||||||
|
return uploadPartCopyResponse
|
||||||
|
}
|
||||||
|
|
||||||
func listAllMultipartUploads(hc *handlerContext, bktName string) *ListMultipartUploadsResponse {
|
func listAllMultipartUploads(hc *handlerContext, bktName string) *ListMultipartUploadsResponse {
|
||||||
return listMultipartUploadsBase(hc, bktName, "", "", "", "", -1)
|
return listMultipartUploadsBase(hc, bktName, "", "", "", "", -1)
|
||||||
}
|
}
|
||||||
|
|
|
@ -198,6 +198,10 @@ func fillContents(src []*data.ObjectInfo, encode string, fetchOwner bool) []Obje
|
||||||
ETag: obj.HashSum,
|
ETag: obj.HashSum,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if size, err := layer.GetObjectSize(obj); err == nil {
|
||||||
|
res.Size = size
|
||||||
|
}
|
||||||
|
|
||||||
if fetchOwner {
|
if fetchOwner {
|
||||||
res.Owner = &Owner{
|
res.Owner = &Owner{
|
||||||
ID: obj.Owner.String(),
|
ID: obj.Owner.String(),
|
||||||
|
|
|
@ -67,11 +67,11 @@ func TestS3CompatibilityBucketListV2BothContinuationTokenStartAfter(t *testing.T
|
||||||
createTestObject(tc, bktInfo, objName)
|
createTestObject(tc, bktInfo, objName)
|
||||||
}
|
}
|
||||||
|
|
||||||
listV2Response1 := listObjectsV2(t, tc, bktName, "", "", "bar", "", 1)
|
listV2Response1 := listObjectsV2(tc, bktName, "", "", "bar", "", 1)
|
||||||
nextContinuationToken := listV2Response1.NextContinuationToken
|
nextContinuationToken := listV2Response1.NextContinuationToken
|
||||||
require.Equal(t, "baz", listV2Response1.Contents[0].Key)
|
require.Equal(t, "baz", listV2Response1.Contents[0].Key)
|
||||||
|
|
||||||
listV2Response2 := listObjectsV2(t, tc, bktName, "", "", "bar", nextContinuationToken, -1)
|
listV2Response2 := listObjectsV2(tc, bktName, "", "", "bar", nextContinuationToken, -1)
|
||||||
|
|
||||||
require.Equal(t, nextContinuationToken, listV2Response2.ContinuationToken)
|
require.Equal(t, nextContinuationToken, listV2Response2.ContinuationToken)
|
||||||
require.Equal(t, "bar", listV2Response2.StartAfter)
|
require.Equal(t, "bar", listV2Response2.StartAfter)
|
||||||
|
@ -92,7 +92,7 @@ func TestS3BucketListDelimiterBasic(t *testing.T) {
|
||||||
createTestObject(tc, bktInfo, objName)
|
createTestObject(tc, bktInfo, objName)
|
||||||
}
|
}
|
||||||
|
|
||||||
listV1Response := listObjectsV1(t, tc, bktName, "", "/", "", -1)
|
listV1Response := listObjectsV1(tc, bktName, "", "/", "", -1)
|
||||||
require.Equal(t, "/", listV1Response.Delimiter)
|
require.Equal(t, "/", listV1Response.Delimiter)
|
||||||
require.Equal(t, "asdf", listV1Response.Contents[0].Key)
|
require.Equal(t, "asdf", listV1Response.Contents[0].Key)
|
||||||
require.Len(t, listV1Response.CommonPrefixes, 2)
|
require.Len(t, listV1Response.CommonPrefixes, 2)
|
||||||
|
@ -111,7 +111,7 @@ func TestS3BucketListV2DelimiterPercentage(t *testing.T) {
|
||||||
createTestObject(tc, bktInfo, objName)
|
createTestObject(tc, bktInfo, objName)
|
||||||
}
|
}
|
||||||
|
|
||||||
listV2Response := listObjectsV2(t, tc, bktName, "", "%", "", "", -1)
|
listV2Response := listObjectsV2(tc, bktName, "", "%", "", "", -1)
|
||||||
require.Equal(t, "%", listV2Response.Delimiter)
|
require.Equal(t, "%", listV2Response.Delimiter)
|
||||||
require.Len(t, listV2Response.Contents, 1)
|
require.Len(t, listV2Response.Contents, 1)
|
||||||
require.Equal(t, "foo", listV2Response.Contents[0].Key)
|
require.Equal(t, "foo", listV2Response.Contents[0].Key)
|
||||||
|
@ -149,7 +149,7 @@ func TestS3BucketListV2DelimiterPrefix(t *testing.T) {
|
||||||
validateListV2(t, tc, bktName, prefix, delim, "", 2, false, true, []string{"boo/bar"}, []string{"boo/baz/"})
|
validateListV2(t, tc, bktName, prefix, delim, "", 2, false, true, []string{"boo/bar"}, []string{"boo/baz/"})
|
||||||
}
|
}
|
||||||
|
|
||||||
func listObjectsV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
|
func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
|
||||||
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
|
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
|
||||||
if len(startAfter) != 0 {
|
if len(startAfter) != 0 {
|
||||||
query.Add("start-after", startAfter)
|
query.Add("start-after", startAfter)
|
||||||
|
@ -158,17 +158,17 @@ func listObjectsV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter,
|
||||||
query.Add("continuation-token", continuationToken)
|
query.Add("continuation-token", continuationToken)
|
||||||
}
|
}
|
||||||
|
|
||||||
w, r := prepareTestFullRequest(tc, bktName, "", query, nil)
|
w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
|
||||||
tc.Handler().ListObjectsV2Handler(w, r)
|
hc.Handler().ListObjectsV2Handler(w, r)
|
||||||
assertStatus(t, w, http.StatusOK)
|
assertStatus(hc.t, w, http.StatusOK)
|
||||||
res := &ListObjectsV2Response{}
|
res := &ListObjectsV2Response{}
|
||||||
parseTestResponse(t, w, res)
|
parseTestResponse(hc.t, w, res)
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateListV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, continuationToken string, maxKeys int,
|
func validateListV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, continuationToken string, maxKeys int,
|
||||||
isTruncated, last bool, checkObjects, checkPrefixes []string) string {
|
isTruncated, last bool, checkObjects, checkPrefixes []string) string {
|
||||||
response := listObjectsV2(t, tc, bktName, prefix, delimiter, "", continuationToken, maxKeys)
|
response := listObjectsV2(tc, bktName, prefix, delimiter, "", continuationToken, maxKeys)
|
||||||
|
|
||||||
require.Equal(t, isTruncated, response.IsTruncated)
|
require.Equal(t, isTruncated, response.IsTruncated)
|
||||||
require.Equal(t, last, len(response.NextContinuationToken) == 0)
|
require.Equal(t, last, len(response.NextContinuationToken) == 0)
|
||||||
|
@ -202,16 +202,16 @@ func prepareCommonListObjectsQuery(prefix, delimiter string, maxKeys int) url.Va
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
func listObjectsV1(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, marker string, maxKeys int) *ListObjectsV1Response {
|
func listObjectsV1(hc *handlerContext, bktName, prefix, delimiter, marker string, maxKeys int) *ListObjectsV1Response {
|
||||||
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
|
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
|
||||||
if len(marker) != 0 {
|
if len(marker) != 0 {
|
||||||
query.Add("marker", marker)
|
query.Add("marker", marker)
|
||||||
}
|
}
|
||||||
|
|
||||||
w, r := prepareTestFullRequest(tc, bktName, "", query, nil)
|
w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
|
||||||
tc.Handler().ListObjectsV1Handler(w, r)
|
hc.Handler().ListObjectsV1Handler(w, r)
|
||||||
assertStatus(t, w, http.StatusOK)
|
assertStatus(hc.t, w, http.StatusOK)
|
||||||
res := &ListObjectsV1Response{}
|
res := &ListObjectsV1Response{}
|
||||||
parseTestResponse(t, w, res)
|
parseTestResponse(hc.t, w, res)
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
|
@ -289,10 +289,16 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
||||||
}
|
}
|
||||||
|
|
||||||
size := p.SrcObjInfo.Size
|
size := p.SrcObjInfo.Size
|
||||||
|
srcObjectSize := p.SrcObjInfo.Size
|
||||||
|
|
||||||
|
if objSize, err := GetObjectSize(p.SrcObjInfo); err == nil {
|
||||||
|
srcObjectSize = objSize
|
||||||
|
}
|
||||||
|
|
||||||
if p.Range != nil {
|
if p.Range != nil {
|
||||||
size = p.Range.End - p.Range.Start + 1
|
size = p.Range.End - p.Range.Start + 1
|
||||||
if p.Range.End > p.SrcObjInfo.Size {
|
if p.Range.End > srcObjectSize {
|
||||||
return nil, fmt.Errorf("%w: %d-%d/%d", s3errors.GetAPIError(s3errors.ErrInvalidCopyPartRangeSource), p.Range.Start, p.Range.End, p.SrcObjInfo.Size)
|
return nil, fmt.Errorf("%w: %d-%d/%d", s3errors.GetAPIError(s3errors.ErrInvalidCopyPartRangeSource), p.Range.Start, p.Range.End, srcObjectSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if size > UploadMaxSize {
|
if size > UploadMaxSize {
|
||||||
|
|
|
@ -97,6 +97,23 @@ func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object) *data.ObjectI
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetObjectSize(objInfo *data.ObjectInfo) (uint64, error) {
|
||||||
|
var err error
|
||||||
|
fullSize := objInfo.Size
|
||||||
|
|
||||||
|
if objInfo.Headers[AttributeDecryptedSize] != "" {
|
||||||
|
if fullSize, err = strconv.ParseUint(objInfo.Headers[AttributeDecryptedSize], 10, 64); err != nil {
|
||||||
|
return 0, fmt.Errorf("invalid decrypted size header: %w", err)
|
||||||
|
}
|
||||||
|
} else if objInfo.Headers[MultipartObjectSize] != "" {
|
||||||
|
if fullSize, err = strconv.ParseUint(objInfo.Headers[MultipartObjectSize], 10, 64); err != nil {
|
||||||
|
return 0, fmt.Errorf("invalid multipart size header: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fullSize, nil
|
||||||
|
}
|
||||||
|
|
||||||
func FormEncryptionInfo(headers map[string]string) encryption.ObjectEncryption {
|
func FormEncryptionInfo(headers map[string]string) encryption.ObjectEncryption {
|
||||||
algorithm := headers[AttributeEncryptionAlgorithm]
|
algorithm := headers[AttributeEncryptionAlgorithm]
|
||||||
return encryption.ObjectEncryption{
|
return encryption.ObjectEncryption{
|
||||||
|
|
Loading…
Reference in a new issue