bugfix/185-list_multipat_uploads_wrong_keys #187
13 changed files with 302 additions and 63 deletions
|
@ -14,6 +14,7 @@ This document outlines major changes between releases.
|
|||
- Fix goroutine leak on put object error (#178)
|
||||
- Fix parsing signed headers in presigned urls (#182)
|
||||
- Fix url escaping (#188)
|
||||
- Use correct keys in `list-multipart-uploads` response (#185)
|
||||
|
||||
### Added
|
||||
- Add a metric with addresses of nodes of the same and highest priority that are currently healthy (#51)
|
||||
|
|
|
@ -322,7 +322,7 @@ var errorCodes = errorCodeMap{
|
|||
ErrInvalidMaxUploads: {
|
||||
ErrCode: ErrInvalidMaxUploads,
|
||||
Code: "InvalidArgument",
|
||||
Description: "Argument max-uploads must be an integer between 0 and 2147483647",
|
||||
Description: "Argument max-uploads must be an integer from 1 to 1000",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrInvalidMaxKeys: {
|
||||
|
|
|
@ -118,7 +118,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if srcSize, err := getObjectSize(extendedSrcObjInfo, encryptionParams); err != nil {
|
||||
if srcSize, err := layer.GetObjectSize(srcObjInfo); err != nil {
|
||||
h.logAndSendError(w, "failed to get source object size", reqInfo, err)
|
||||
return
|
||||
} else if srcSize > layer.UploadMaxSize { //https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
|
||||
|
|
|
@ -333,7 +333,7 @@ func TestDeleteObjectFromListCache(t *testing.T) {
|
|||
bktName, objName := "bucket-for-removal", "object-to-delete"
|
||||
bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
|
||||
|
||||
versions := listObjectsV1(t, tc, bktName, "", "", "", -1)
|
||||
versions := listObjectsV1(tc, bktName, "", "", "", -1)
|
||||
require.Len(t, versions.Contents, 1)
|
||||
|
||||
checkFound(t, tc, bktName, objName, objInfo.VersionID())
|
||||
|
@ -341,7 +341,7 @@ func TestDeleteObjectFromListCache(t *testing.T) {
|
|||
checkNotFound(t, tc, bktName, objName, objInfo.VersionID())
|
||||
|
||||
// check cache is clean after object removal
|
||||
versions = listObjectsV1(t, tc, bktName, "", "", "", -1)
|
||||
versions = listObjectsV1(tc, bktName, "", "", "", -1)
|
||||
require.Len(t, versions.Contents, 0)
|
||||
|
||||
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo))
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -168,7 +167,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
fullSize, err := getObjectSize(extendedInfo, encryptionParams)
|
||||
fullSize, err := layer.GetObjectSize(info)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid size header", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
|
||||
return
|
||||
|
@ -233,23 +232,6 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func getObjectSize(extendedInfo *data.ExtendedObjectInfo, encryptionParams encryption.Params) (uint64, error) {
|
||||
var err error
|
||||
fullSize := extendedInfo.ObjectInfo.Size
|
||||
|
||||
if encryptionParams.Enabled() {
|
||||
if fullSize, err = strconv.ParseUint(extendedInfo.ObjectInfo.Headers[layer.AttributeDecryptedSize], 10, 64); err != nil {
|
||||
return 0, fmt.Errorf("invalid decrypted size header: %w", err)
|
||||
}
|
||||
} else if extendedInfo.NodeVersion.IsCombined {
|
||||
if fullSize, err = strconv.ParseUint(extendedInfo.ObjectInfo.Headers[layer.MultipartObjectSize], 10, 64); err != nil {
|
||||
return 0, fmt.Errorf("invalid multipart size header: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return fullSize, nil
|
||||
}
|
||||
|
||||
func checkPreconditions(info *data.ObjectInfo, args *conditionalArgs) error {
|
||||
if len(args.IfMatch) > 0 && args.IfMatch != info.HashSum {
|
||||
return fmt.Errorf("%w: etag mismatched: '%s', '%s'", errors.GetAPIError(errors.ErrPreconditionFailed), args.IfMatch, info.HashSum)
|
||||
|
|
|
@ -93,6 +93,13 @@ type (
|
|||
const (
|
||||
uploadIDHeaderName = "uploadId"
|
||||
partNumberHeaderName = "partNumber"
|
||||
|
||||
prefixQueryName = "prefix"
|
||||
delimiterQueryName = "delimiter"
|
||||
maxUploadsQueryName = "max-uploads"
|
||||
encodingTypeQueryName = "encoding-type"
|
||||
keyMarkerQueryName = "key-marker"
|
||||
uploadIDMarkerQueryName = "upload-id-marker"
|
||||
)
|
||||
|
||||
func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -529,30 +536,27 @@ func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Req
|
|||
|
||||
var (
|
||||
queryValues = reqInfo.URL.Query()
|
||||
delimiter = queryValues.Get("delimiter")
|
||||
prefix = queryValues.Get("prefix")
|
||||
maxUploadsStr = queryValues.Get(maxUploadsQueryName)
|
||||
maxUploads = layer.MaxSizeUploadsList
|
||||
)
|
||||
|
||||
if queryValues.Get("max-uploads") != "" {
|
||||
val, err := strconv.Atoi(queryValues.Get("max-uploads"))
|
||||
if err != nil || val < 0 {
|
||||
if maxUploadsStr != "" {
|
||||
val, err := strconv.Atoi(maxUploadsStr)
|
||||
if err != nil || val < 1 || val > 1000 {
|
||||
h.logAndSendError(w, "invalid maxUploads", reqInfo, errors.GetAPIError(errors.ErrInvalidMaxUploads))
|
||||
return
|
||||
}
|
||||
if val < maxUploads {
|
||||
maxUploads = val
|
||||
}
|
||||
}
|
||||
|
||||
p := &layer.ListMultipartUploadsParams{
|
||||
Bkt: bktInfo,
|
||||
Delimiter: delimiter,
|
||||
EncodingType: queryValues.Get("encoding-type"),
|
||||
KeyMarker: queryValues.Get("key-marker"),
|
||||
Delimiter: queryValues.Get(delimiterQueryName),
|
||||
EncodingType: queryValues.Get(encodingTypeQueryName),
|
||||
KeyMarker: queryValues.Get(keyMarkerQueryName),
|
||||
MaxUploads: maxUploads,
|
||||
Prefix: prefix,
|
||||
UploadIDMarker: queryValues.Get("upload-id-marker"),
|
||||
Prefix: queryValues.Get(prefixQueryName),
|
||||
UploadIDMarker: queryValues.Get(uploadIDMarkerQueryName),
|
||||
}
|
||||
|
||||
list, err := h.obj.ListMultipartUploads(r.Context(), p)
|
||||
|
|
|
@ -3,12 +3,16 @@ package handler
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -105,6 +109,164 @@ func TestMultipartReUploadPart(t *testing.T) {
|
|||
equalDataSlices(t, append(data1, data2...), data)
|
||||
}
|
||||
|
||||
func TestListMultipartUploads(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
bktName := "bucket-to-list-uploads"
|
||||
createTestBucket(hc, bktName)
|
||||
|
||||
objName1 := "/my/object/name"
|
||||
uploadInfo1 := createMultipartUpload(hc, bktName, objName1, map[string]string{})
|
||||
objName2 := "/my/object2"
|
||||
uploadInfo2 := createMultipartUpload(hc, bktName, objName2, map[string]string{})
|
||||
objName3 := "/zzz/object/name3"
|
||||
uploadInfo3 := createMultipartUpload(hc, bktName, objName3, map[string]string{})
|
||||
|
||||
t.Run("check upload key", func(t *testing.T) {
|
||||
listUploads := listAllMultipartUploads(hc, bktName)
|
||||
require.Len(t, listUploads.Uploads, 3)
|
||||
for i, upload := range []*InitiateMultipartUploadResponse{uploadInfo1, uploadInfo2, uploadInfo3} {
|
||||
require.Equal(t, upload.UploadID, listUploads.Uploads[i].UploadID)
|
||||
require.Equal(t, upload.Key, listUploads.Uploads[i].Key)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("check max uploads", func(t *testing.T) {
|
||||
listUploads := listMultipartUploadsBase(hc, bktName, "", "", "", "", 2)
|
||||
require.Len(t, listUploads.Uploads, 2)
|
||||
require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
|
||||
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID)
|
||||
})
|
||||
|
||||
t.Run("check prefix", func(t *testing.T) {
|
||||
listUploads := listMultipartUploadsBase(hc, bktName, "/my", "", "", "", -1)
|
||||
require.Len(t, listUploads.Uploads, 2)
|
||||
require.Equal(t, uploadInfo1.UploadID, listUploads.Uploads[0].UploadID)
|
||||
require.Equal(t, uploadInfo2.UploadID, listUploads.Uploads[1].UploadID)
|
||||
})
|
||||
|
||||
t.Run("check markers", func(t *testing.T) {
|
||||
t.Run("check only key-marker", func(t *testing.T) {
|
||||
listUploads := listMultipartUploadsBase(hc, bktName, "", "", "", objName2, -1)
|
||||
require.Len(t, listUploads.Uploads, 1)
|
||||
// If upload-id-marker is not specified, only the keys lexicographically greater than the specified key-marker will be included in the list.
|
||||
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
|
||||
})
|
||||
|
||||
t.Run("check only upload-id-marker", func(t *testing.T) {
|
||||
uploadIDMarker := uploadInfo1.UploadID
|
||||
if uploadIDMarker > uploadInfo2.UploadID {
|
||||
uploadIDMarker = uploadInfo2.UploadID
|
||||
}
|
||||
listUploads := listMultipartUploadsBase(hc, bktName, "", "", uploadIDMarker, "", -1)
|
||||
// If key-marker is not specified, the upload-id-marker parameter is ignored.
|
||||
require.Len(t, listUploads.Uploads, 3)
|
||||
})
|
||||
|
||||
t.Run("check key-marker along with upload-id-marker", func(t *testing.T) {
|
||||
uploadIDMarker := "00000000-0000-0000-0000-000000000000"
|
||||
|
||||
listUploads := listMultipartUploadsBase(hc, bktName, "", "", uploadIDMarker, objName3, -1)
|
||||
require.Len(t, listUploads.Uploads, 1)
|
||||
// If upload-id-marker is specified, any multipart uploads for a key equal to the key-marker might also be included,
|
||||
// provided those multipart uploads have upload IDs lexicographically greater than the specified upload-id-marker.
|
||||
require.Equal(t, uploadInfo3.UploadID, listUploads.Uploads[0].UploadID)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMultipartUploadSize(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
bktName, objName := "bucket-for-test-multipart-size", "object-multipart"
|
||||
createTestBucket(hc, bktName)
|
||||
|
||||
partSize := layer.UploadMinSize
|
||||
objLen := 2 * partSize
|
||||
headers := map[string]string{}
|
||||
|
||||
data := multipartUpload(hc, bktName, objName, headers, objLen, partSize)
|
||||
require.Equal(t, objLen, len(data))
|
||||
|
||||
t.Run("check correct size in list v1", func(t *testing.T) {
|
||||
listV1 := listObjectsV1(hc, bktName, "", "", "", -1)
|
||||
require.Len(t, listV1.Contents, 1)
|
||||
require.Equal(t, objLen, int(listV1.Contents[0].Size))
|
||||
require.Equal(t, objName, listV1.Contents[0].Key)
|
||||
})
|
||||
|
||||
t.Run("check correct size in list v2", func(t *testing.T) {
|
||||
listV2 := listObjectsV2(hc, bktName, "", "", "", "", -1)
|
||||
require.Len(t, listV2.Contents, 1)
|
||||
require.Equal(t, objLen, int(listV2.Contents[0].Size))
|
||||
require.Equal(t, objName, listV2.Contents[0].Key)
|
||||
})
|
||||
|
||||
t.Run("check correct get", func(t *testing.T) {
|
||||
_, hdr := getObject(hc, bktName, objName)
|
||||
require.Equal(t, strconv.Itoa(objLen), hdr.Get(api.ContentLength))
|
||||
|
||||
part := getObjectRange(t, hc, bktName, objName, partSize, objLen-1)
|
||||
equalDataSlices(t, data[partSize:], part)
|
||||
})
|
||||
|
||||
t.Run("check correct size when part copy", func(t *testing.T) {
|
||||
objName2 := "obj2"
|
||||
uploadInfo := createMultipartUpload(hc, bktName, objName2, headers)
|
||||
sourceCopy := bktName + "/" + objName
|
||||
uploadPartCopy(hc, bktName, objName2, uploadInfo.UploadID, 1, sourceCopy, 0, 0)
|
||||
uploadPartCopy(hc, bktName, objName2, uploadInfo.UploadID, 2, sourceCopy, 0, partSize)
|
||||
})
|
||||
}
|
||||
|
||||
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
||||
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
|
||||
}
|
||||
|
||||
func uploadPartCopyBase(hc *handlerContext, bktName, objName string, encrypted bool, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
||||
query := make(url.Values)
|
||||
query.Set(uploadIDQuery, uploadID)
|
||||
query.Set(partNumberQuery, strconv.Itoa(num))
|
||||
|
||||
w, r := prepareTestRequestWithQuery(hc, bktName, objName, query, nil)
|
||||
if encrypted {
|
||||
setEncryptHeaders(r)
|
||||
}
|
||||
r.Header.Set(api.AmzCopySource, srcObj)
|
||||
if start+end > 0 {
|
||||
r.Header.Set(api.AmzCopySourceRange, fmt.Sprintf("bytes=%d-%d", start, end))
|
||||
}
|
||||
|
||||
hc.Handler().UploadPartCopy(w, r)
|
||||
uploadPartCopyResponse := &UploadPartCopyResponse{}
|
||||
readResponse(hc.t, w, http.StatusOK, uploadPartCopyResponse)
|
||||
|
||||
return uploadPartCopyResponse
|
||||
}
|
||||
|
||||
func listAllMultipartUploads(hc *handlerContext, bktName string) *ListMultipartUploadsResponse {
|
||||
return listMultipartUploadsBase(hc, bktName, "", "", "", "", -1)
|
||||
}
|
||||
|
||||
func listMultipartUploadsBase(hc *handlerContext, bktName, prefix, delimiter, uploadIDMarker, keyMarker string, maxUploads int) *ListMultipartUploadsResponse {
|
||||
query := make(url.Values)
|
||||
query.Set(prefixQueryName, prefix)
|
||||
query.Set(delimiterQueryName, delimiter)
|
||||
query.Set(uploadIDMarkerQueryName, uploadIDMarker)
|
||||
query.Set(keyMarkerQueryName, keyMarker)
|
||||
if maxUploads != -1 {
|
||||
query.Set(maxUploadsQueryName, strconv.Itoa(maxUploads))
|
||||
}
|
||||
|
||||
w, r := prepareTestRequestWithQuery(hc, bktName, "", query, nil)
|
||||
|
||||
hc.Handler().ListMultipartUploadsHandler(w, r)
|
||||
listPartsResponse := &ListMultipartUploadsResponse{}
|
||||
readResponse(hc.t, w, http.StatusOK, listPartsResponse)
|
||||
|
||||
return listPartsResponse
|
||||
}
|
||||
|
||||
func listParts(hc *handlerContext, bktName, objName string, uploadID string) *ListPartsResponse {
|
||||
return listPartsBase(hc, bktName, objName, false, uploadID)
|
||||
}
|
||||
|
|
|
@ -198,6 +198,10 @@ func fillContents(src []*data.ObjectInfo, encode string, fetchOwner bool) []Obje
|
|||
ETag: obj.HashSum,
|
||||
}
|
||||
|
||||
if size, err := layer.GetObjectSize(obj); err == nil {
|
||||
res.Size = size
|
||||
}
|
||||
|
||||
if fetchOwner {
|
||||
res.Owner = &Owner{
|
||||
ID: obj.Owner.String(),
|
||||
|
|
|
@ -67,11 +67,11 @@ func TestS3CompatibilityBucketListV2BothContinuationTokenStartAfter(t *testing.T
|
|||
createTestObject(tc, bktInfo, objName)
|
||||
}
|
||||
|
||||
listV2Response1 := listObjectsV2(t, tc, bktName, "", "", "bar", "", 1)
|
||||
listV2Response1 := listObjectsV2(tc, bktName, "", "", "bar", "", 1)
|
||||
nextContinuationToken := listV2Response1.NextContinuationToken
|
||||
require.Equal(t, "baz", listV2Response1.Contents[0].Key)
|
||||
|
||||
listV2Response2 := listObjectsV2(t, tc, bktName, "", "", "bar", nextContinuationToken, -1)
|
||||
listV2Response2 := listObjectsV2(tc, bktName, "", "", "bar", nextContinuationToken, -1)
|
||||
|
||||
require.Equal(t, nextContinuationToken, listV2Response2.ContinuationToken)
|
||||
require.Equal(t, "bar", listV2Response2.StartAfter)
|
||||
|
@ -92,7 +92,7 @@ func TestS3BucketListDelimiterBasic(t *testing.T) {
|
|||
createTestObject(tc, bktInfo, objName)
|
||||
}
|
||||
|
||||
listV1Response := listObjectsV1(t, tc, bktName, "", "/", "", -1)
|
||||
listV1Response := listObjectsV1(tc, bktName, "", "/", "", -1)
|
||||
require.Equal(t, "/", listV1Response.Delimiter)
|
||||
require.Equal(t, "asdf", listV1Response.Contents[0].Key)
|
||||
require.Len(t, listV1Response.CommonPrefixes, 2)
|
||||
|
@ -111,7 +111,7 @@ func TestS3BucketListV2DelimiterPercentage(t *testing.T) {
|
|||
createTestObject(tc, bktInfo, objName)
|
||||
}
|
||||
|
||||
listV2Response := listObjectsV2(t, tc, bktName, "", "%", "", "", -1)
|
||||
listV2Response := listObjectsV2(tc, bktName, "", "%", "", "", -1)
|
||||
require.Equal(t, "%", listV2Response.Delimiter)
|
||||
require.Len(t, listV2Response.Contents, 1)
|
||||
require.Equal(t, "foo", listV2Response.Contents[0].Key)
|
||||
|
@ -149,7 +149,7 @@ func TestS3BucketListV2DelimiterPrefix(t *testing.T) {
|
|||
validateListV2(t, tc, bktName, prefix, delim, "", 2, false, true, []string{"boo/bar"}, []string{"boo/baz/"})
|
||||
}
|
||||
|
||||
func listObjectsV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
|
||||
func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
|
||||
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
|
||||
if len(startAfter) != 0 {
|
||||
query.Add("start-after", startAfter)
|
||||
|
@ -158,17 +158,17 @@ func listObjectsV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter,
|
|||
query.Add("continuation-token", continuationToken)
|
||||
}
|
||||
|
||||
w, r := prepareTestFullRequest(tc, bktName, "", query, nil)
|
||||
tc.Handler().ListObjectsV2Handler(w, r)
|
||||
assertStatus(t, w, http.StatusOK)
|
||||
w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
|
||||
hc.Handler().ListObjectsV2Handler(w, r)
|
||||
assertStatus(hc.t, w, http.StatusOK)
|
||||
res := &ListObjectsV2Response{}
|
||||
parseTestResponse(t, w, res)
|
||||
parseTestResponse(hc.t, w, res)
|
||||
return res
|
||||
}
|
||||
|
||||
func validateListV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, continuationToken string, maxKeys int,
|
||||
isTruncated, last bool, checkObjects, checkPrefixes []string) string {
|
||||
response := listObjectsV2(t, tc, bktName, prefix, delimiter, "", continuationToken, maxKeys)
|
||||
response := listObjectsV2(tc, bktName, prefix, delimiter, "", continuationToken, maxKeys)
|
||||
|
||||
require.Equal(t, isTruncated, response.IsTruncated)
|
||||
require.Equal(t, last, len(response.NextContinuationToken) == 0)
|
||||
|
@ -202,16 +202,16 @@ func prepareCommonListObjectsQuery(prefix, delimiter string, maxKeys int) url.Va
|
|||
return query
|
||||
}
|
||||
|
||||
func listObjectsV1(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, marker string, maxKeys int) *ListObjectsV1Response {
|
||||
func listObjectsV1(hc *handlerContext, bktName, prefix, delimiter, marker string, maxKeys int) *ListObjectsV1Response {
|
||||
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
|
||||
if len(marker) != 0 {
|
||||
query.Add("marker", marker)
|
||||
}
|
||||
|
||||
w, r := prepareTestFullRequest(tc, bktName, "", query, nil)
|
||||
tc.Handler().ListObjectsV1Handler(w, r)
|
||||
assertStatus(t, w, http.StatusOK)
|
||||
w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
|
||||
hc.Handler().ListObjectsV1Handler(w, r)
|
||||
assertStatus(hc.t, w, http.StatusOK)
|
||||
res := &ListObjectsV1Response{}
|
||||
parseTestResponse(t, w, res)
|
||||
parseTestResponse(hc.t, w, res)
|
||||
return res
|
||||
}
|
||||
|
|
|
@ -289,10 +289,16 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
|||
}
|
||||
|
||||
size := p.SrcObjInfo.Size
|
||||
srcObjectSize := p.SrcObjInfo.Size
|
||||
|
||||
if objSize, err := GetObjectSize(p.SrcObjInfo); err == nil {
|
||||
srcObjectSize = objSize
|
||||
}
|
||||
|
||||
if p.Range != nil {
|
||||
size = p.Range.End - p.Range.Start + 1
|
||||
if p.Range.End > p.SrcObjInfo.Size {
|
||||
return nil, fmt.Errorf("%w: %d-%d/%d", s3errors.GetAPIError(s3errors.ErrInvalidCopyPartRangeSource), p.Range.Start, p.Range.End, p.SrcObjInfo.Size)
|
||||
if p.Range.End > srcObjectSize {
|
||||
return nil, fmt.Errorf("%w: %d-%d/%d", s3errors.GetAPIError(s3errors.ErrInvalidCopyPartRangeSource), p.Range.Start, p.Range.End, srcObjectSize)
|
||||
}
|
||||
}
|
||||
if size > UploadMaxSize {
|
||||
|
|
|
@ -97,6 +97,23 @@ func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object) *data.ObjectI
|
|||
}
|
||||
}
|
||||
|
||||
func GetObjectSize(objInfo *data.ObjectInfo) (uint64, error) {
|
||||
var err error
|
||||
fullSize := objInfo.Size
|
||||
|
||||
if objInfo.Headers[AttributeDecryptedSize] != "" {
|
||||
if fullSize, err = strconv.ParseUint(objInfo.Headers[AttributeDecryptedSize], 10, 64); err != nil {
|
||||
return 0, fmt.Errorf("invalid decrypted size header: %w", err)
|
||||
}
|
||||
} else if objInfo.Headers[MultipartObjectSize] != "" {
|
||||
if fullSize, err = strconv.ParseUint(objInfo.Headers[MultipartObjectSize], 10, 64); err != nil {
|
||||
return 0, fmt.Errorf("invalid multipart size header: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return fullSize, nil
|
||||
}
|
||||
|
||||
func FormEncryptionInfo(headers map[string]string) encryption.ObjectEncryption {
|
||||
algorithm := headers[AttributeEncryptionAlgorithm]
|
||||
return encryption.ObjectEncryption{
|
||||
|
|
|
@ -263,6 +263,9 @@ type payloadReader struct {
|
|||
|
||||
func (x payloadReader) Read(p []byte) (int, error) {
|
||||
n, err := x.ReadCloser.Read(p)
|
||||
if err != nil && errors.Is(err, io.EOF) {
|
||||
return n, err
|
||||
}
|
||||
return n, handleObjectError("read payload", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -221,6 +221,30 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
|
|||
return version
|
||||
}
|
||||
|
||||
func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
|
||||
uploadID, _ := treeNode.Get(uploadIDKV)
|
||||
if uploadID == "" {
|
||||
return nil, fmt.Errorf("it's not a multipart node")
|
||||
}
|
||||
|
||||
multipartInfo := &data.MultipartInfo{
|
||||
ID: treeNode.ID,
|
||||
Key: filePath,
|
||||
UploadID: uploadID,
|
||||
Meta: treeNode.Meta,
|
||||
}
|
||||
|
||||
ownerID, _ := treeNode.Get(ownerKV)
|
||||
_ = multipartInfo.Owner.DecodeString(ownerID)
|
||||
|
||||
created, _ := treeNode.Get(createdKV)
|
||||
if utcMilli, err := strconv.ParseInt(created, 10, 64); err == nil {
|
||||
multipartInfo.Created = time.UnixMilli(utcMilli)
|
||||
}
|
||||
|
||||
return multipartInfo, nil
|
||||
}
|
||||
|
||||
func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
|
||||
multipartInfo := &data.MultipartInfo{
|
||||
ID: node.GetNodeID(),
|
||||
|
@ -858,14 +882,14 @@ func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketIn
|
|||
}
|
||||
|
||||
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
|
||||
subTreeNodes, _, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
|
||||
subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []*data.MultipartInfo
|
||||
for _, node := range subTreeNodes {
|
||||
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID())
|
||||
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -875,19 +899,55 @@ func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.Bu
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) ([]*data.MultipartInfo, error) {
|
||||
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
|
||||
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make([]*data.MultipartInfo, 0, len(subTree))
|
||||
for _, node := range subTree {
|
||||
multipartInfo, err := newMultipartInfo(node)
|
||||
if err != nil { // missed uploadID (it's a part node)
|
||||
var parentPrefix string
|
||||
if parentFilePath != "" { // The root of subTree can also have a parent
|
||||
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
|
||||
}
|
||||
|
||||
var filepath string
|
||||
namesMap := make(map[uint64]string, len(subTree))
|
||||
multiparts := make(map[string][]*data.MultipartInfo, len(subTree))
|
||||
|
||||
for i, node := range subTree {
|
||||
treeNode, fileName, err := parseTreeNode(node)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
result = append(result, multipartInfo)
|
||||
|
||||
if i != 0 {
|
||||
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
|
||||
return nil, fmt.Errorf("invalid node order: %w", err)
|
||||
}
|
||||
} else {
|
||||
filepath = parentPrefix + fileName
|
||||
namesMap[treeNode.ID] = filepath
|
||||
}
|
||||
|
||||
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
key := formLatestNodeKey(node.GetParentID(), fileName)
|
||||
multipartInfos, ok := multiparts[key]
|
||||
if !ok {
|
||||
multipartInfos = []*data.MultipartInfo{multipartInfo}
|
||||
} else {
|
||||
multipartInfos = append(multipartInfos, multipartInfo)
|
||||
}
|
||||
|
||||
multiparts[key] = multipartInfos
|
||||
}
|
||||
|
||||
result := make([]*data.MultipartInfo, 0, len(multiparts))
|
||||
for _, multipartInfo := range multiparts {
|
||||
result = append(result, multipartInfo...)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
|
Loading…
Reference in a new issue