forked from TrueCloudLab/frostfs-s3-gw
[#176] multipart: Replace part on re-upload
We want to have exactly one object and tree node for each part number Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
1a09041cd1
commit
52931663e1
3 changed files with 68 additions and 10 deletions
|
@ -10,6 +10,7 @@ This document outlines major changes between releases.
|
||||||
- Use specific s3 errors instead of `InternalError` where possible (#143)
|
- Use specific s3 errors instead of `InternalError` where possible (#143)
|
||||||
- `grpc` schemas in tree configuration (#166)
|
- `grpc` schemas in tree configuration (#166)
|
||||||
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
|
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
|
||||||
|
- Replace part on re-upload when use multipart upload (#176)
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- Support dump metrics descriptions (#80)
|
- Support dump metrics descriptions (#80)
|
||||||
|
|
|
@ -3,6 +3,8 @@ package handler
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -61,3 +63,64 @@ func TestMultipartUploadInvalidPart(t *testing.T) {
|
||||||
w := completeMultipartUploadBase(hc, bktName, objName, multipartUpload.UploadID, []string{etag1, etag2})
|
w := completeMultipartUploadBase(hc, bktName, objName, multipartUpload.UploadID, []string{etag1, etag2})
|
||||||
assertS3Error(hc.t, w, s3Errors.GetAPIError(s3Errors.ErrEntityTooSmall))
|
assertS3Error(hc.t, w, s3Errors.GetAPIError(s3Errors.ErrEntityTooSmall))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultipartReUploadPart(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-to-upload-part", "object-multipart"
|
||||||
|
bktInfo := createTestBucket(hc, bktName)
|
||||||
|
partSizeLast := 8 // less than min part size
|
||||||
|
partSizeFirst := 5 * 1024 * 1024
|
||||||
|
|
||||||
|
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||||
|
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSizeLast)
|
||||||
|
etag2, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 2, partSizeFirst)
|
||||||
|
|
||||||
|
list := listParts(hc, bktName, objName, uploadInfo.UploadID)
|
||||||
|
require.Len(t, list.Parts, 2)
|
||||||
|
require.Equal(t, etag1, list.Parts[0].ETag)
|
||||||
|
require.Equal(t, etag2, list.Parts[1].ETag)
|
||||||
|
|
||||||
|
w := completeMultipartUploadBase(hc, bktName, objName, uploadInfo.UploadID, []string{etag1, etag2})
|
||||||
|
assertS3Error(hc.t, w, s3Errors.GetAPIError(s3Errors.ErrEntityTooSmall))
|
||||||
|
|
||||||
|
etag1, data1 := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSizeFirst)
|
||||||
|
etag2, data2 := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 2, partSizeLast)
|
||||||
|
|
||||||
|
list = listParts(hc, bktName, objName, uploadInfo.UploadID)
|
||||||
|
require.Len(t, list.Parts, 2)
|
||||||
|
require.Equal(t, etag1, list.Parts[0].ETag)
|
||||||
|
require.Equal(t, etag2, list.Parts[1].ETag)
|
||||||
|
|
||||||
|
innerUploadInfo, err := hc.tree.GetMultipartUpload(hc.context, bktInfo, objName, uploadInfo.UploadID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
treeParts, err := hc.tree.GetParts(hc.Context(), bktInfo, innerUploadInfo.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, treeParts, len(list.Parts))
|
||||||
|
|
||||||
|
w = completeMultipartUploadBase(hc, bktName, objName, uploadInfo.UploadID, []string{etag1, etag2})
|
||||||
|
assertStatus(hc.t, w, http.StatusOK)
|
||||||
|
|
||||||
|
data, _ := getObject(hc, bktName, objName)
|
||||||
|
equalDataSlices(t, append(data1, data2...), data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func listParts(hc *handlerContext, bktName, objName string, uploadID string) *ListPartsResponse {
|
||||||
|
return listPartsBase(hc, bktName, objName, false, uploadID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func listPartsBase(hc *handlerContext, bktName, objName string, encrypted bool, uploadID string) *ListPartsResponse {
|
||||||
|
query := make(url.Values)
|
||||||
|
query.Set(uploadIDQuery, uploadID)
|
||||||
|
|
||||||
|
w, r := prepareTestRequestWithQuery(hc, bktName, objName, query, nil)
|
||||||
|
if encrypted {
|
||||||
|
setEncryptHeaders(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
hc.Handler().ListPartsHandler(w, r)
|
||||||
|
listPartsResponse := &ListPartsResponse{}
|
||||||
|
readResponse(hc.t, w, http.StatusOK, listPartsResponse)
|
||||||
|
|
||||||
|
return listPartsResponse
|
||||||
|
}
|
||||||
|
|
|
@ -933,7 +933,6 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
|
||||||
etagKV: info.ETag,
|
etagKV: info.ETag,
|
||||||
}
|
}
|
||||||
|
|
||||||
var foundPartID uint64
|
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
if part.GetNodeID() == multipartNodeID {
|
if part.GetNodeID() == multipartNodeID {
|
||||||
continue
|
continue
|
||||||
|
@ -943,20 +942,15 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if partInfo.Number == info.Number {
|
if partInfo.Number == info.Number {
|
||||||
foundPartID = part.GetNodeID()
|
return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, part.GetNodeID(), multipartNodeID, meta)
|
||||||
oldObjIDToDelete = partInfo.OID
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if foundPartID != multipartNodeID {
|
|
||||||
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
|
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
|
||||||
return oid.ID{}, err
|
return oid.ID{}, err
|
||||||
}
|
}
|
||||||
return oid.ID{}, layer.ErrNoNodeToRemove
|
|
||||||
}
|
|
||||||
|
|
||||||
return oldObjIDToDelete, c.service.MoveNode(ctx, bktInfo, systemTree, foundPartID, multipartNodeID, meta)
|
return oid.ID{}, layer.ErrNoNodeToRemove
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
||||||
|
|
Loading…
Reference in a new issue