[#448] multipart: Support removing duplicated parts
All checks were successful
/ DCO (pull_request) Successful in 2m33s
/ Vulncheck (pull_request) Successful in 2m53s
/ Builds (1.21) (pull_request) Successful in 3m6s
/ Builds (1.22) (pull_request) Successful in 2m59s
/ Lint (pull_request) Successful in 3m10s
/ Tests (1.21) (pull_request) Successful in 2m57s
/ Tests (1.22) (pull_request) Successful in 3m0s
All checks were successful
/ DCO (pull_request) Successful in 2m33s
/ Vulncheck (pull_request) Successful in 2m53s
/ Builds (1.21) (pull_request) Successful in 3m6s
/ Builds (1.22) (pull_request) Successful in 2m59s
/ Lint (pull_request) Successful in 3m10s
/ Tests (1.21) (pull_request) Successful in 2m57s
/ Tests (1.22) (pull_request) Successful in 3m0s
Previously after tree split we can have duplicated parts (several objects and tree node referred to the same part number). Some of them couldn't be deleted after abort or compete action. Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
c34680d157
commit
5bf79a2e63
9 changed files with 333 additions and 68 deletions
|
@ -124,6 +124,11 @@ type PartInfo struct {
|
||||||
Created time.Time `json:"created"`
|
Created time.Time `json:"created"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PartInfoExtended struct {
|
||||||
|
PartInfo
|
||||||
|
Timestamp uint64
|
||||||
|
}
|
||||||
|
|
||||||
// ToHeaderString form short part representation to use in S3-Completed-Parts header.
|
// ToHeaderString form short part representation to use in S3-Completed-Parts header.
|
||||||
func (p *PartInfo) ToHeaderString() string {
|
func (p *PartInfo) ToHeaderString() string {
|
||||||
// ETag value contains SHA256 checksum which is used while getting object parts attributes.
|
// ETag value contains SHA256 checksum which is used while getting object parts attributes.
|
||||||
|
|
|
@ -288,6 +288,21 @@ func completeMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func abortMultipartUpload(hc *handlerContext, bktName, objName, uploadID string) {
|
||||||
|
w := abortMultipartUploadBase(hc, bktName, objName, uploadID)
|
||||||
|
assertStatus(hc.t, w, http.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func abortMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID string) *httptest.ResponseRecorder {
|
||||||
|
query := make(url.Values)
|
||||||
|
query.Set(uploadIDQuery, uploadID)
|
||||||
|
|
||||||
|
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
|
||||||
|
hc.Handler().AbortMultipartUploadHandler(w, r)
|
||||||
|
|
||||||
|
return w
|
||||||
|
}
|
||||||
|
|
||||||
func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) {
|
func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) {
|
||||||
return uploadPartBase(hc, bktName, objName, true, uploadID, num, size)
|
return uploadPartBase(hc, bktName, objName, true, uploadID, num, size)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,10 @@ import (
|
||||||
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -122,6 +126,108 @@ func TestMultipartReUploadPart(t *testing.T) {
|
||||||
equalDataSlices(t, append(data1, data2...), data)
|
equalDataSlices(t, append(data1, data2...), data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultipartRemovePartsSplit(t *testing.T) {
|
||||||
|
bktName, objName := "bucket-to-upload-part", "object-multipart"
|
||||||
|
partSize := 8
|
||||||
|
|
||||||
|
t.Run("reupload part", func(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
bktInfo := createTestBucket(hc, bktName)
|
||||||
|
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||||
|
|
||||||
|
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||||
|
|
||||||
|
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objID := oidtest.ID()
|
||||||
|
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
|
||||||
|
"Number": "1",
|
||||||
|
"OID": objID.EncodeToString(),
|
||||||
|
"Owner": usertest.ID().EncodeToString(),
|
||||||
|
"ETag": "etag",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
|
||||||
|
require.Len(t, hc.tp.Objects(), 2)
|
||||||
|
|
||||||
|
list := listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
|
||||||
|
require.Len(t, list.Parts, 1)
|
||||||
|
require.Equal(t, `"etag"`, list.Parts[0].ETag)
|
||||||
|
|
||||||
|
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||||
|
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
|
||||||
|
require.Len(t, list.Parts, 1)
|
||||||
|
require.Equal(t, etag1, list.Parts[0].ETag)
|
||||||
|
|
||||||
|
require.Len(t, hc.tp.Objects(), 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("abort multipart", func(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
bktInfo := createTestBucket(hc, bktName)
|
||||||
|
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||||
|
|
||||||
|
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||||
|
|
||||||
|
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objID := oidtest.ID()
|
||||||
|
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
|
||||||
|
"Number": "1",
|
||||||
|
"OID": objID.EncodeToString(),
|
||||||
|
"Owner": usertest.ID().EncodeToString(),
|
||||||
|
"ETag": "etag",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
|
||||||
|
require.Len(t, hc.tp.Objects(), 2)
|
||||||
|
|
||||||
|
abortMultipartUpload(hc, bktName, objName, uploadInfo.UploadID)
|
||||||
|
require.Empty(t, hc.tp.Objects())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("complete multipart", func(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
bktInfo := createTestBucket(hc, bktName)
|
||||||
|
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||||
|
|
||||||
|
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||||
|
|
||||||
|
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objID := oidtest.ID()
|
||||||
|
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
|
||||||
|
"Number": "1",
|
||||||
|
"OID": objID.EncodeToString(),
|
||||||
|
"Owner": usertest.ID().EncodeToString(),
|
||||||
|
"ETag": "etag",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
|
||||||
|
require.Len(t, hc.tp.Objects(), 2)
|
||||||
|
|
||||||
|
completeMultipartUpload(hc, bktName, objName, uploadInfo.UploadID, []string{etag1})
|
||||||
|
require.Falsef(t, containsOID(hc.tp.Objects(), objID), "frostfs contains '%s' object, but shouldn't", objID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func containsOID(objects []*object.Object, objID oid.ID) bool {
|
||||||
|
for _, o := range objects {
|
||||||
|
oID, _ := o.ID()
|
||||||
|
if oID.Equals(objID) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func TestListMultipartUploads(t *testing.T) {
|
func TestListMultipartUploads(t *testing.T) {
|
||||||
hc := prepareHandlerContext(t)
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
|
|
@ -284,16 +284,18 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
MD5: hex.EncodeToString(md5Hash),
|
MD5: hex.EncodeToString(md5Hash),
|
||||||
}
|
}
|
||||||
|
|
||||||
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
||||||
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
|
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
|
||||||
if err != nil && !oldPartIDNotFound {
|
if err != nil && !oldPartIDNotFound {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !oldPartIDNotFound {
|
if !oldPartIDNotFound {
|
||||||
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
|
for _, oldPartID := range oldPartIDs {
|
||||||
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
|
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
|
||||||
zap.String("cid", bktInfo.CID.EncodeToString()),
|
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
|
||||||
zap.String("oid", oldPartID.EncodeToString()))
|
zap.String("cid", bktInfo.CID.EncodeToString()),
|
||||||
|
zap.String("oid", oldPartID.EncodeToString()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -379,16 +381,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
|
|
||||||
var multipartObjetSize uint64
|
var multipartObjetSize uint64
|
||||||
var encMultipartObjectSize uint64
|
var encMultipartObjectSize uint64
|
||||||
parts := make([]*data.PartInfo, 0, len(p.Parts))
|
parts := make([]*data.PartInfoExtended, 0, len(p.Parts))
|
||||||
|
|
||||||
var completedPartsHeader strings.Builder
|
var completedPartsHeader strings.Builder
|
||||||
md5Hash := md5.New()
|
md5Hash := md5.New()
|
||||||
for i, part := range p.Parts {
|
for i, part := range p.Parts {
|
||||||
partInfo := partsInfo[part.PartNumber]
|
partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled())
|
||||||
if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) {
|
if partInfo == nil {
|
||||||
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
|
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
|
||||||
}
|
}
|
||||||
delete(partsInfo, part.PartNumber)
|
|
||||||
|
|
||||||
// for the last part we have no minimum size limit
|
// for the last part we have no minimum size limit
|
||||||
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
|
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
|
||||||
|
@ -469,14 +470,16 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(p.Info.Bkt.CID)
|
addr.SetContainer(p.Info.Bkt.CID)
|
||||||
for _, partInfo := range partsInfo {
|
for _, prts := range partsInfo {
|
||||||
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
|
for _, partInfo := range prts {
|
||||||
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
|
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
|
||||||
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
|
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
|
||||||
zap.Error(err))
|
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
addr.SetObject(partInfo.OID)
|
||||||
|
n.cache.DeleteObject(addr)
|
||||||
}
|
}
|
||||||
addr.SetObject(partInfo.OID)
|
|
||||||
n.cache.DeleteObject(addr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
|
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
|
||||||
|
@ -548,10 +551,12 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, info := range parts {
|
for _, infos := range parts {
|
||||||
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
|
for _, info := range infos {
|
||||||
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
|
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
|
||||||
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
|
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
|
||||||
|
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -575,7 +580,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
|
||||||
|
|
||||||
parts := make([]*Part, 0, len(partsInfo))
|
parts := make([]*Part, 0, len(partsInfo))
|
||||||
|
|
||||||
for _, partInfo := range partsInfo {
|
for _, infos := range partsInfo {
|
||||||
|
sort.Slice(infos, func(i, j int) bool {
|
||||||
|
return infos[i].Timestamp < infos[j].Timestamp
|
||||||
|
})
|
||||||
|
|
||||||
|
partInfo := infos[len(infos)-1]
|
||||||
parts = append(parts, &Part{
|
parts = append(parts, &Part{
|
||||||
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
|
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
|
||||||
LastModified: partInfo.Created.UTC().Format(time.RFC3339),
|
LastModified: partInfo.Created.UTC().Format(time.RFC3339),
|
||||||
|
@ -612,7 +622,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) {
|
type PartsInfo map[int][]*data.PartInfoExtended
|
||||||
|
|
||||||
|
func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended {
|
||||||
|
parts := p[part]
|
||||||
|
|
||||||
|
for i, info := range parts {
|
||||||
|
if info.GetETag(md5Enabled) == etag {
|
||||||
|
p[part] = append(parts[:i], parts[i+1:]...)
|
||||||
|
return info
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) {
|
||||||
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
|
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, ErrNodeNotFound) {
|
if errors.Is(err, ErrNodeNotFound) {
|
||||||
|
@ -626,11 +651,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res := make(map[int]*data.PartInfo, len(parts))
|
res := make(map[int][]*data.PartInfoExtended, len(parts))
|
||||||
partsNumbers := make([]int, len(parts))
|
partsNumbers := make([]int, len(parts))
|
||||||
oids := make([]string, len(parts))
|
oids := make([]string, len(parts))
|
||||||
for i, part := range parts {
|
for i, part := range parts {
|
||||||
res[part.Number] = part
|
res[part.Number] = append(res[part.Number], part)
|
||||||
partsNumbers[i] = part.Number
|
partsNumbers[i] = part.Number
|
||||||
oids[i] = part.OID.EncodeToString()
|
oids[i] = part.OID.EncodeToString()
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -33,7 +34,7 @@ type TreeServiceMock struct {
|
||||||
locks map[string]map[uint64]*data.LockInfo
|
locks map[string]map[uint64]*data.LockInfo
|
||||||
tags map[string]map[uint64]map[string]string
|
tags map[string]map[uint64]map[string]string
|
||||||
multiparts map[string]map[string][]*data.MultipartInfo
|
multiparts map[string]map[string][]*data.MultipartInfo
|
||||||
parts map[string]map[int]*data.PartInfo
|
parts map[string]map[int]*data.PartInfoExtended
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
|
func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
|
||||||
|
@ -92,7 +93,7 @@ func NewTreeService() *TreeServiceMock {
|
||||||
locks: make(map[string]map[uint64]*data.LockInfo),
|
locks: make(map[string]map[uint64]*data.LockInfo),
|
||||||
tags: make(map[string]map[uint64]map[string]string),
|
tags: make(map[string]map[uint64]map[string]string),
|
||||||
multiparts: make(map[string]map[string][]*data.MultipartInfo),
|
multiparts: make(map[string]map[string][]*data.MultipartInfo),
|
||||||
parts: make(map[string]map[int]*data.PartInfo),
|
parts: make(map[string]map[int]*data.PartInfoExtended),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,28 +347,31 @@ func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, bktInfo *data.Bu
|
||||||
return nil, ErrNodeNotFound
|
return nil, ErrNodeNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
|
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
|
||||||
multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID)
|
multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oid.ID{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if multipartInfo.ID != multipartNodeID {
|
if multipartInfo.ID != multipartNodeID {
|
||||||
return oid.ID{}, fmt.Errorf("invalid multipart info id")
|
return nil, fmt.Errorf("invalid multipart info id")
|
||||||
}
|
}
|
||||||
|
|
||||||
partsMap, ok := t.parts[info.UploadID]
|
partsMap, ok := t.parts[info.UploadID]
|
||||||
if !ok {
|
if !ok {
|
||||||
partsMap = make(map[int]*data.PartInfo)
|
partsMap = make(map[int]*data.PartInfoExtended)
|
||||||
}
|
}
|
||||||
|
|
||||||
partsMap[info.Number] = info
|
partsMap[info.Number] = &data.PartInfoExtended{
|
||||||
|
PartInfo: *info,
|
||||||
|
Timestamp: uint64(time.Now().UnixMicro()),
|
||||||
|
}
|
||||||
|
|
||||||
t.parts[info.UploadID] = partsMap
|
t.parts[info.UploadID] = partsMap
|
||||||
return oid.ID{}, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
|
||||||
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
|
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
|
||||||
|
|
||||||
var foundMultipart *data.MultipartInfo
|
var foundMultipart *data.MultipartInfo
|
||||||
|
@ -387,7 +391,7 @@ LOOP:
|
||||||
}
|
}
|
||||||
|
|
||||||
partsMap := t.parts[foundMultipart.UploadID]
|
partsMap := t.parts[foundMultipart.UploadID]
|
||||||
result := make([]*data.PartInfo, 0, len(partsMap))
|
result := make([]*data.PartInfoExtended, 0, len(partsMap))
|
||||||
for _, part := range partsMap {
|
for _, part := range partsMap {
|
||||||
result = append(result, part)
|
result = append(result, part)
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,11 +57,11 @@ type TreeService interface {
|
||||||
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
||||||
|
|
||||||
// AddPart puts a node to a system tree as a child of appropriate multipart upload
|
// AddPart puts a node to a system tree as a child of appropriate multipart upload
|
||||||
// and returns objectID of a previous part which must be deleted in FrostFS.
|
// and returns objectIDs of a previous part/s which must be deleted in FrostFS.
|
||||||
//
|
//
|
||||||
// If object id to remove is not found returns ErrNoNodeToRemove error.
|
// If object ids to remove is not found returns ErrNoNodeToRemove error.
|
||||||
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error)
|
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error)
|
||||||
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error)
|
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error)
|
||||||
|
|
||||||
// Compound methods for optimizations
|
// Compound methods for optimizations
|
||||||
|
|
||||||
|
|
|
@ -154,4 +154,5 @@ const (
|
||||||
FailedToParsePartInfo = "failed to parse part info"
|
FailedToParsePartInfo = "failed to parse part info"
|
||||||
CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info"
|
CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info"
|
||||||
CloseCredsObjectPayload = "close creds object payload"
|
CloseCredsObjectPayload = "close creds object payload"
|
||||||
|
FailedToRemoveOldPartNode = "failed to remove old part node"
|
||||||
)
|
)
|
||||||
|
|
|
@ -152,6 +152,10 @@ type NodeResponse interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
||||||
|
if err := validateNodeResponse(nodeInfo); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
tNode := &treeNode{
|
tNode := &treeNode{
|
||||||
ID: nodeInfo.GetNodeID(),
|
ID: nodeInfo.GetNodeID(),
|
||||||
ParentID: nodeInfo.GetParentID(),
|
ParentID: nodeInfo.GetParentID(),
|
||||||
|
@ -159,14 +163,6 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
||||||
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
|
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 {
|
|
||||||
return nil, errors.New("invalid tree node: missing id")
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) {
|
|
||||||
return nil, errors.New("invalid tree node: length multiple ids mismatch")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, kv := range nodeInfo.GetMeta() {
|
for _, kv := range nodeInfo.GetMeta() {
|
||||||
switch kv.GetKey() {
|
switch kv.GetKey() {
|
||||||
case oidKV:
|
case oidKV:
|
||||||
|
@ -357,6 +353,10 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
|
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
|
||||||
|
if err := validateNodeResponse(node); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if len(node.GetNodeID()) != 1 {
|
if len(node.GetNodeID()) != 1 {
|
||||||
return nil, errors.New("invalid multipart node: this is split node")
|
return nil, errors.New("invalid multipart node: this is split node")
|
||||||
}
|
}
|
||||||
|
@ -400,10 +400,36 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
|
||||||
return multipartInfo, nil
|
return multipartInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
|
func validateNodeResponse(node NodeResponse) error {
|
||||||
var err error
|
ids := node.GetNodeID()
|
||||||
partInfo := &data.PartInfo{}
|
parentIDs := node.GetParentID()
|
||||||
|
timestamps := node.GetTimestamp()
|
||||||
|
|
||||||
|
if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 {
|
||||||
|
return errors.New("invalid node response: missing ids")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) {
|
||||||
|
return errors.New("invalid node response: multiple ids length mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) {
|
||||||
|
if err := validateNodeResponse(node); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(node.GetNodeID()) != 1 {
|
||||||
|
return nil, errors.New("invalid part node: this is split node")
|
||||||
|
}
|
||||||
|
|
||||||
|
partInfo := &data.PartInfoExtended{
|
||||||
|
Timestamp: node.GetTimestamp()[0],
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
for _, kv := range node.GetMeta() {
|
for _, kv := range node.GetMeta() {
|
||||||
value := string(kv.GetValue())
|
value := string(kv.GetValue())
|
||||||
switch kv.GetKey() {
|
switch kv.GetKey() {
|
||||||
|
@ -1365,10 +1391,10 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
return nil, layer.ErrNodeNotFound
|
return nil, layer.ErrNodeNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
|
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
|
||||||
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2)
|
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oid.ID{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
meta := map[string]string{
|
meta := map[string]string{
|
||||||
|
@ -1380,48 +1406,76 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
|
||||||
md5KV: info.MD5,
|
md5KV: info.MD5,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
objToDelete := make([]oid.ID, 0, 1)
|
||||||
|
partsToDelete := make([]uint64, 0, 1)
|
||||||
|
var (
|
||||||
|
latestPartID uint64
|
||||||
|
maxTimestamp uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
multiNodeID := MultiID{multipartNodeID}
|
||||||
|
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
if len(part.GetNodeID()) != 1 {
|
if multiNodeID.Equal(part.GetNodeID()) {
|
||||||
// multipart parts nodeID shouldn't have multiple values
|
|
||||||
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
|
|
||||||
zap.String("key", info.Key),
|
|
||||||
zap.String("upload id", info.UploadID),
|
|
||||||
zap.Uint64("multipart node id ", multipartNodeID),
|
|
||||||
zap.Uint64s("node ids", part.GetNodeID()))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
nodeID := part.GetNodeID()[0]
|
|
||||||
if nodeID == multipartNodeID {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
partInfo, err := newPartInfo(part)
|
partInfo, err := newPartInfo(part)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
|
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
|
||||||
zap.String("key", info.Key),
|
zap.String("key", info.Key),
|
||||||
zap.String("upload id", info.UploadID),
|
zap.String("upload id", info.UploadID),
|
||||||
zap.Uint64("multipart node id ", multipartNodeID),
|
zap.Uint64("multipart node id ", multipartNodeID),
|
||||||
|
zap.Uint64s("id", part.GetNodeID()),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if partInfo.Number == info.Number {
|
if partInfo.Number == info.Number {
|
||||||
return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta)
|
nodeID := part.GetNodeID()[0]
|
||||||
|
objToDelete = append(objToDelete, partInfo.OID)
|
||||||
|
partsToDelete = append(partsToDelete, nodeID)
|
||||||
|
timestamp := partInfo.Timestamp
|
||||||
|
if timestamp > maxTimestamp {
|
||||||
|
maxTimestamp = timestamp
|
||||||
|
latestPartID = nodeID
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
|
if len(objToDelete) != 0 {
|
||||||
return oid.ID{}, err
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil {
|
||||||
|
return nil, fmt.Errorf("move part node: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, nodeID := range partsToDelete {
|
||||||
|
if nodeID == latestPartID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil {
|
||||||
|
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode,
|
||||||
|
zap.String("key", info.Key),
|
||||||
|
zap.String("upload id", info.UploadID),
|
||||||
|
zap.Uint64("id", nodeID))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return objToDelete, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return oid.ID{}, layer.ErrNoNodeToRemove
|
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, layer.ErrNoNodeToRemove
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
|
||||||
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2)
|
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make([]*data.PartInfo, 0, len(parts))
|
result := make([]*data.PartInfoExtended, 0, len(parts))
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
if len(part.GetNodeID()) != 1 {
|
if len(part.GetNodeID()) != 1 {
|
||||||
// multipart parts nodeID shouldn't have multiple values
|
// multipart parts nodeID shouldn't have multiple values
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
@ -304,3 +305,57 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSplitTreeMultiparts(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
memCli, err := NewTreeServiceClientMemory()
|
||||||
|
require.NoError(t, err)
|
||||||
|
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||||
|
|
||||||
|
bktInfo := &data.BucketInfo{
|
||||||
|
CID: cidtest.ID(),
|
||||||
|
}
|
||||||
|
|
||||||
|
multipartInfo := &data.MultipartInfo{
|
||||||
|
Key: "multipart",
|
||||||
|
UploadID: "id",
|
||||||
|
Meta: map[string]string{},
|
||||||
|
Owner: usertest.ID(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = treeService.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
multipartInfo, err = treeService.GetMultipartUpload(ctx, bktInfo, multipartInfo.Key, multipartInfo.UploadID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var objIDs []oid.ID
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
objID := oidtest.ID()
|
||||||
|
_, err = memCli.AddNode(ctx, bktInfo, systemTree, multipartInfo.ID, map[string]string{
|
||||||
|
partNumberKV: "1",
|
||||||
|
oidKV: objID.EncodeToString(),
|
||||||
|
ownerKV: usertest.ID().EncodeToString(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
objIDs = append(objIDs, objID)
|
||||||
|
}
|
||||||
|
|
||||||
|
parts, err := treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, parts, 2)
|
||||||
|
|
||||||
|
objToDeletes, err := treeService.AddPart(ctx, bktInfo, multipartInfo.ID, &data.PartInfo{
|
||||||
|
Key: multipartInfo.Key,
|
||||||
|
UploadID: multipartInfo.UploadID,
|
||||||
|
Number: 1,
|
||||||
|
OID: oidtest.ID(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, objIDs, objToDeletes, "oids to delete mismatched")
|
||||||
|
|
||||||
|
parts, err = treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, parts, 1)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue