From 52931663e12b2414485e33b58f56936e6f1bbb8f Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Fri, 21 Jul 2023 16:16:55 +0300 Subject: [PATCH] [#176] multipart: Replace part on re-upload We want to have exactly one object and tree node for each part number Signed-off-by: Denis Kirillov --- CHANGELOG.md | 1 + api/handler/multipart_upload_test.go | 63 ++++++++++++++++++++++++++++ pkg/service/tree/tree.go | 14 ++----- 3 files changed, 68 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8482c9d..37d2e9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ This document outlines major changes between releases. - Use specific s3 errors instead of `InternalError` where possible (#143) - `grpc` schemas in tree configuration (#166) - Return appropriate 404 code when object missed in storage but there is in gate cache (#158) +- Replace part on re-upload when use multipart upload (#176) ### Added - Support dump metrics descriptions (#80) diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go index a43c468..f8e0a2a 100644 --- a/api/handler/multipart_upload_test.go +++ b/api/handler/multipart_upload_test.go @@ -3,6 +3,8 @@ package handler import ( "bytes" "encoding/xml" + "net/http" + "net/url" "testing" "time" @@ -61,3 +63,64 @@ func TestMultipartUploadInvalidPart(t *testing.T) { w := completeMultipartUploadBase(hc, bktName, objName, multipartUpload.UploadID, []string{etag1, etag2}) assertS3Error(hc.t, w, s3Errors.GetAPIError(s3Errors.ErrEntityTooSmall)) } + +func TestMultipartReUploadPart(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName, objName := "bucket-to-upload-part", "object-multipart" + bktInfo := createTestBucket(hc, bktName) + partSizeLast := 8 // less than min part size + partSizeFirst := 5 * 1024 * 1024 + + uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{}) + etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSizeLast) + etag2, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 2, partSizeFirst) + + list := listParts(hc, bktName, objName, uploadInfo.UploadID) + require.Len(t, list.Parts, 2) + require.Equal(t, etag1, list.Parts[0].ETag) + require.Equal(t, etag2, list.Parts[1].ETag) + + w := completeMultipartUploadBase(hc, bktName, objName, uploadInfo.UploadID, []string{etag1, etag2}) + assertS3Error(hc.t, w, s3Errors.GetAPIError(s3Errors.ErrEntityTooSmall)) + + etag1, data1 := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSizeFirst) + etag2, data2 := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 2, partSizeLast) + + list = listParts(hc, bktName, objName, uploadInfo.UploadID) + require.Len(t, list.Parts, 2) + require.Equal(t, etag1, list.Parts[0].ETag) + require.Equal(t, etag2, list.Parts[1].ETag) + + innerUploadInfo, err := hc.tree.GetMultipartUpload(hc.context, bktInfo, objName, uploadInfo.UploadID) + require.NoError(t, err) + treeParts, err := hc.tree.GetParts(hc.Context(), bktInfo, innerUploadInfo.ID) + require.NoError(t, err) + require.Len(t, treeParts, len(list.Parts)) + + w = completeMultipartUploadBase(hc, bktName, objName, uploadInfo.UploadID, []string{etag1, etag2}) + assertStatus(hc.t, w, http.StatusOK) + + data, _ := getObject(hc, bktName, objName) + equalDataSlices(t, append(data1, data2...), data) +} + +func listParts(hc *handlerContext, bktName, objName string, uploadID string) *ListPartsResponse { + return listPartsBase(hc, bktName, objName, false, uploadID) +} + +func listPartsBase(hc *handlerContext, bktName, objName string, encrypted bool, uploadID string) *ListPartsResponse { + query := make(url.Values) + query.Set(uploadIDQuery, uploadID) + + w, r := prepareTestRequestWithQuery(hc, bktName, objName, query, nil) + if encrypted { + setEncryptHeaders(r) + } + + hc.Handler().ListPartsHandler(w, r) + listPartsResponse := &ListPartsResponse{} + readResponse(hc.t, w, http.StatusOK, listPartsResponse) + + return listPartsResponse +} diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 8617bdd..722e79e 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -933,7 +933,6 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN etagKV: info.ETag, } - var foundPartID uint64 for _, part := range parts { if part.GetNodeID() == multipartNodeID { continue @@ -943,20 +942,15 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN continue } if partInfo.Number == info.Number { - foundPartID = part.GetNodeID() - oldObjIDToDelete = partInfo.OID - break + return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, part.GetNodeID(), multipartNodeID, meta) } } - if foundPartID != multipartNodeID { - if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { - return oid.ID{}, err - } - return oid.ID{}, layer.ErrNoNodeToRemove + if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { + return oid.ID{}, err } - return oldObjIDToDelete, c.service.MoveNode(ctx, bktInfo, systemTree, foundPartID, multipartNodeID, meta) + return oid.ID{}, layer.ErrNoNodeToRemove } func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {