[#448] multipart: Support removing duplicated parts #448
9 changed files with 336 additions and 68 deletions
|
@ -126,6 +126,14 @@ type PartInfo struct {
|
||||||
Created time.Time `json:"created"`
|
Created time.Time `json:"created"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PartInfoExtended struct {
|
||||||
|
PartInfo
|
||||||
|
|
||||||
alexvanin marked this conversation as resolved
Outdated
|
|||||||
|
// Timestamp is used to find the latest version of part info in case of tree split
|
||||||
|
// when there are multiple nodes for the same part.
|
||||||
|
Timestamp uint64
|
||||||
|
}
|
||||||
|
|
||||||
// ToHeaderString form short part representation to use in S3-Completed-Parts header.
|
// ToHeaderString form short part representation to use in S3-Completed-Parts header.
|
||||||
func (p *PartInfo) ToHeaderString() string {
|
func (p *PartInfo) ToHeaderString() string {
|
||||||
// ETag value contains SHA256 checksum which is used while getting object parts attributes.
|
// ETag value contains SHA256 checksum which is used while getting object parts attributes.
|
||||||
|
|
|
@ -288,6 +288,21 @@ func completeMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func abortMultipartUpload(hc *handlerContext, bktName, objName, uploadID string) {
|
||||||
|
w := abortMultipartUploadBase(hc, bktName, objName, uploadID)
|
||||||
|
assertStatus(hc.t, w, http.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func abortMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID string) *httptest.ResponseRecorder {
|
||||||
|
query := make(url.Values)
|
||||||
|
query.Set(uploadIDQuery, uploadID)
|
||||||
|
|
||||||
|
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
|
||||||
|
hc.Handler().AbortMultipartUploadHandler(w, r)
|
||||||
|
|
||||||
|
return w
|
||||||
|
}
|
||||||
|
|
||||||
func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) {
|
func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) {
|
||||||
return uploadPartBase(hc, bktName, objName, true, uploadID, num, size)
|
return uploadPartBase(hc, bktName, objName, true, uploadID, num, size)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,10 @@ import (
|
||||||
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -122,6 +126,108 @@ func TestMultipartReUploadPart(t *testing.T) {
|
||||||
equalDataSlices(t, append(data1, data2...), data)
|
equalDataSlices(t, append(data1, data2...), data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultipartRemovePartsSplit(t *testing.T) {
|
||||||
|
bktName, objName := "bucket-to-upload-part", "object-multipart"
|
||||||
|
partSize := 8
|
||||||
|
|
||||||
|
t.Run("reupload part", func(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
bktInfo := createTestBucket(hc, bktName)
|
||||||
|
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||||
|
|
||||||
|
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||||
|
|
||||||
|
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objID := oidtest.ID()
|
||||||
|
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
|
||||||
|
"Number": "1",
|
||||||
|
"OID": objID.EncodeToString(),
|
||||||
|
"Owner": usertest.ID().EncodeToString(),
|
||||||
|
"ETag": "etag",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
|
||||||
|
require.Len(t, hc.tp.Objects(), 2)
|
||||||
|
|
||||||
|
list := listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
|
||||||
|
require.Len(t, list.Parts, 1)
|
||||||
|
require.Equal(t, `"etag"`, list.Parts[0].ETag)
|
||||||
|
|
||||||
|
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||||
|
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
|
||||||
|
require.Len(t, list.Parts, 1)
|
||||||
|
require.Equal(t, etag1, list.Parts[0].ETag)
|
||||||
|
|
||||||
|
require.Len(t, hc.tp.Objects(), 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("abort multipart", func(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
bktInfo := createTestBucket(hc, bktName)
|
||||||
|
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||||
|
|
||||||
|
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||||
|
|
||||||
|
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objID := oidtest.ID()
|
||||||
|
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
|
||||||
|
"Number": "1",
|
||||||
|
"OID": objID.EncodeToString(),
|
||||||
|
"Owner": usertest.ID().EncodeToString(),
|
||||||
|
"ETag": "etag",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
|
||||||
|
require.Len(t, hc.tp.Objects(), 2)
|
||||||
|
|
||||||
|
abortMultipartUpload(hc, bktName, objName, uploadInfo.UploadID)
|
||||||
|
require.Empty(t, hc.tp.Objects())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("complete multipart", func(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
bktInfo := createTestBucket(hc, bktName)
|
||||||
|
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||||
|
|
||||||
|
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||||
|
|
||||||
|
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objID := oidtest.ID()
|
||||||
|
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
|
||||||
|
"Number": "1",
|
||||||
|
"OID": objID.EncodeToString(),
|
||||||
|
"Owner": usertest.ID().EncodeToString(),
|
||||||
|
"ETag": "etag",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
|
||||||
|
require.Len(t, hc.tp.Objects(), 2)
|
||||||
|
|
||||||
|
completeMultipartUpload(hc, bktName, objName, uploadInfo.UploadID, []string{etag1})
|
||||||
|
require.Falsef(t, containsOID(hc.tp.Objects(), objID), "frostfs contains '%s' object, but shouldn't", objID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func containsOID(objects []*object.Object, objID oid.ID) bool {
|
||||||
|
for _, o := range objects {
|
||||||
|
oID, _ := o.ID()
|
||||||
|
if oID.Equals(objID) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func TestListMultipartUploads(t *testing.T) {
|
func TestListMultipartUploads(t *testing.T) {
|
||||||
hc := prepareHandlerContext(t)
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
|
|
@ -290,18 +290,20 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
MD5: hex.EncodeToString(createdObj.MD5Sum),
|
MD5: hex.EncodeToString(createdObj.MD5Sum),
|
||||||
}
|
}
|
||||||
|
|
||||||
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
||||||
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
|
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
|
||||||
if err != nil && !oldPartIDNotFound {
|
if err != nil && !oldPartIDNotFound {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !oldPartIDNotFound {
|
if !oldPartIDNotFound {
|
||||||
|
for _, oldPartID := range oldPartIDs {
|
||||||
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
|
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
|
||||||
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
|
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
|
||||||
zap.String("cid", bktInfo.CID.EncodeToString()),
|
zap.String("cid", bktInfo.CID.EncodeToString()),
|
||||||
zap.String("oid", oldPartID.EncodeToString()))
|
zap.String("oid", oldPartID.EncodeToString()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
objInfo := &data.ObjectInfo{
|
objInfo := &data.ObjectInfo{
|
||||||
ID: createdObj.ID,
|
ID: createdObj.ID,
|
||||||
|
@ -385,16 +387,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
|
|
||||||
var multipartObjetSize uint64
|
var multipartObjetSize uint64
|
||||||
var encMultipartObjectSize uint64
|
var encMultipartObjectSize uint64
|
||||||
parts := make([]*data.PartInfo, 0, len(p.Parts))
|
parts := make([]*data.PartInfoExtended, 0, len(p.Parts))
|
||||||
|
|
||||||
var completedPartsHeader strings.Builder
|
var completedPartsHeader strings.Builder
|
||||||
md5Hash := md5.New()
|
md5Hash := md5.New()
|
||||||
for i, part := range p.Parts {
|
for i, part := range p.Parts {
|
||||||
partInfo := partsInfo[part.PartNumber]
|
partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled())
|
||||||
if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) {
|
if partInfo == nil {
|
||||||
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
|
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
|
||||||
}
|
}
|
||||||
delete(partsInfo, part.PartNumber)
|
|
||||||
|
|
||||||
// for the last part we have no minimum size limit
|
// for the last part we have no minimum size limit
|
||||||
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
|
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
|
||||||
|
@ -475,7 +476,8 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(p.Info.Bkt.CID)
|
addr.SetContainer(p.Info.Bkt.CID)
|
||||||
for _, partInfo := range partsInfo {
|
for _, prts := range partsInfo {
|
||||||
|
for _, partInfo := range prts {
|
||||||
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
|
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
|
||||||
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
|
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
|
||||||
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
|
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
|
||||||
|
@ -484,6 +486,7 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
addr.SetObject(partInfo.OID)
|
addr.SetObject(partInfo.OID)
|
||||||
n.cache.DeleteObject(addr)
|
n.cache.DeleteObject(addr)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
|
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
|
||||||
}
|
}
|
||||||
|
@ -554,12 +557,14 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, info := range parts {
|
for _, infos := range parts {
|
||||||
|
for _, info := range infos {
|
||||||
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
|
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
|
||||||
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
|
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
|
||||||
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
|
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
|
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
|
||||||
}
|
}
|
||||||
|
@ -581,7 +586,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
|
||||||
|
|
||||||
parts := make([]*Part, 0, len(partsInfo))
|
parts := make([]*Part, 0, len(partsInfo))
|
||||||
|
|
||||||
for _, partInfo := range partsInfo {
|
for _, infos := range partsInfo {
|
||||||
|
sort.Slice(infos, func(i, j int) bool {
|
||||||
|
return infos[i].Timestamp < infos[j].Timestamp
|
||||||
|
})
|
||||||
|
|
||||||
|
partInfo := infos[len(infos)-1]
|
||||||
parts = append(parts, &Part{
|
parts = append(parts, &Part{
|
||||||
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
|
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
|
||||||
LastModified: partInfo.Created.UTC().Format(time.RFC3339),
|
LastModified: partInfo.Created.UTC().Format(time.RFC3339),
|
||||||
|
@ -618,7 +628,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) {
|
type PartsInfo map[int][]*data.PartInfoExtended
|
||||||
|
|
||||||
|
func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended {
|
||||||
|
parts := p[part]
|
||||||
|
|
||||||
|
for i, info := range parts {
|
||||||
|
if info.GetETag(md5Enabled) == etag {
|
||||||
|
p[part] = append(parts[:i], parts[i+1:]...)
|
||||||
|
return info
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) {
|
||||||
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
|
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, ErrNodeNotFound) {
|
if errors.Is(err, ErrNodeNotFound) {
|
||||||
|
@ -632,11 +657,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res := make(map[int]*data.PartInfo, len(parts))
|
res := make(map[int][]*data.PartInfoExtended, len(parts))
|
||||||
partsNumbers := make([]int, len(parts))
|
partsNumbers := make([]int, len(parts))
|
||||||
oids := make([]string, len(parts))
|
oids := make([]string, len(parts))
|
||||||
for i, part := range parts {
|
for i, part := range parts {
|
||||||
res[part.Number] = part
|
res[part.Number] = append(res[part.Number], part)
|
||||||
partsNumbers[i] = part.Number
|
partsNumbers[i] = part.Number
|
||||||
oids[i] = part.OID.EncodeToString()
|
oids[i] = part.OID.EncodeToString()
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -33,7 +34,7 @@ type TreeServiceMock struct {
|
||||||
locks map[string]map[uint64]*data.LockInfo
|
locks map[string]map[uint64]*data.LockInfo
|
||||||
tags map[string]map[uint64]map[string]string
|
tags map[string]map[uint64]map[string]string
|
||||||
multiparts map[string]map[string][]*data.MultipartInfo
|
multiparts map[string]map[string][]*data.MultipartInfo
|
||||||
parts map[string]map[int]*data.PartInfo
|
parts map[string]map[int]*data.PartInfoExtended
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
|
func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
|
||||||
|
@ -92,7 +93,7 @@ func NewTreeService() *TreeServiceMock {
|
||||||
locks: make(map[string]map[uint64]*data.LockInfo),
|
locks: make(map[string]map[uint64]*data.LockInfo),
|
||||||
tags: make(map[string]map[uint64]map[string]string),
|
tags: make(map[string]map[uint64]map[string]string),
|
||||||
multiparts: make(map[string]map[string][]*data.MultipartInfo),
|
multiparts: make(map[string]map[string][]*data.MultipartInfo),
|
||||||
parts: make(map[string]map[int]*data.PartInfo),
|
parts: make(map[string]map[int]*data.PartInfoExtended),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,28 +347,31 @@ func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, bktInfo *data.Bu
|
||||||
return nil, ErrNodeNotFound
|
return nil, ErrNodeNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
|
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
|
||||||
multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID)
|
multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oid.ID{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if multipartInfo.ID != multipartNodeID {
|
if multipartInfo.ID != multipartNodeID {
|
||||||
return oid.ID{}, fmt.Errorf("invalid multipart info id")
|
return nil, fmt.Errorf("invalid multipart info id")
|
||||||
}
|
}
|
||||||
|
|
||||||
partsMap, ok := t.parts[info.UploadID]
|
partsMap, ok := t.parts[info.UploadID]
|
||||||
if !ok {
|
if !ok {
|
||||||
partsMap = make(map[int]*data.PartInfo)
|
partsMap = make(map[int]*data.PartInfoExtended)
|
||||||
}
|
}
|
||||||
|
|
||||||
partsMap[info.Number] = info
|
partsMap[info.Number] = &data.PartInfoExtended{
|
||||||
|
PartInfo: *info,
|
||||||
|
Timestamp: uint64(time.Now().UnixMicro()),
|
||||||
|
}
|
||||||
|
|
||||||
t.parts[info.UploadID] = partsMap
|
t.parts[info.UploadID] = partsMap
|
||||||
return oid.ID{}, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
|
||||||
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
|
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
|
||||||
|
|
||||||
var foundMultipart *data.MultipartInfo
|
var foundMultipart *data.MultipartInfo
|
||||||
|
@ -387,7 +391,7 @@ LOOP:
|
||||||
}
|
}
|
||||||
|
|
||||||
partsMap := t.parts[foundMultipart.UploadID]
|
partsMap := t.parts[foundMultipart.UploadID]
|
||||||
result := make([]*data.PartInfo, 0, len(partsMap))
|
result := make([]*data.PartInfoExtended, 0, len(partsMap))
|
||||||
for _, part := range partsMap {
|
for _, part := range partsMap {
|
||||||
result = append(result, part)
|
result = append(result, part)
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,11 +57,11 @@ type TreeService interface {
|
||||||
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
||||||
|
|
||||||
// AddPart puts a node to a system tree as a child of appropriate multipart upload
|
// AddPart puts a node to a system tree as a child of appropriate multipart upload
|
||||||
// and returns objectID of a previous part which must be deleted in FrostFS.
|
// and returns objectIDs of a previous part/s which must be deleted in FrostFS.
|
||||||
//
|
//
|
||||||
// If object id to remove is not found returns ErrNoNodeToRemove error.
|
// If object ids to remove is not found returns ErrNoNodeToRemove error.
|
||||||
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error)
|
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error)
|
||||||
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error)
|
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error)
|
||||||
|
|
||||||
PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error)
|
PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error)
|
||||||
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error)
|
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error)
|
||||||
|
|
|
@ -161,4 +161,5 @@ const (
|
||||||
WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped"
|
WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped"
|
||||||
WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped"
|
WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped"
|
||||||
WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped"
|
WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped"
|
||||||
|
FailedToRemoveOldPartNode = "failed to remove old part node"
|
||||||
)
|
)
|
||||||
|
|
|
@ -156,6 +156,10 @@ type NodeResponse interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
||||||
|
if err := validateNodeResponse(nodeInfo); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
tNode := &treeNode{
|
tNode := &treeNode{
|
||||||
ID: nodeInfo.GetNodeID(),
|
ID: nodeInfo.GetNodeID(),
|
||||||
ParentID: nodeInfo.GetParentID(),
|
ParentID: nodeInfo.GetParentID(),
|
||||||
|
@ -163,14 +167,6 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
||||||
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
|
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 {
|
|
||||||
return nil, errors.New("invalid tree node: missing id")
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) {
|
|
||||||
return nil, errors.New("invalid tree node: length multiple ids mismatch")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, kv := range nodeInfo.GetMeta() {
|
for _, kv := range nodeInfo.GetMeta() {
|
||||||
switch kv.GetKey() {
|
switch kv.GetKey() {
|
||||||
case oidKV:
|
case oidKV:
|
||||||
|
@ -377,6 +373,10 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
|
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
|
||||||
|
if err := validateNodeResponse(node); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if len(node.GetNodeID()) != 1 {
|
if len(node.GetNodeID()) != 1 {
|
||||||
return nil, errors.New("invalid multipart node: this is split node")
|
return nil, errors.New("invalid multipart node: this is split node")
|
||||||
}
|
}
|
||||||
|
@ -426,10 +426,36 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
|
||||||
return multipartInfo, nil
|
return multipartInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
|
func validateNodeResponse(node NodeResponse) error {
|
||||||
var err error
|
ids := node.GetNodeID()
|
||||||
partInfo := &data.PartInfo{}
|
parentIDs := node.GetParentID()
|
||||||
|
timestamps := node.GetTimestamp()
|
||||||
|
|
||||||
|
if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 {
|
||||||
|
return errors.New("invalid node response: missing ids")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) {
|
||||||
|
return errors.New("invalid node response: multiple ids length mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) {
|
||||||
|
if err := validateNodeResponse(node); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(node.GetNodeID()) != 1 {
|
||||||
|
return nil, errors.New("invalid part node: this is split node")
|
||||||
|
}
|
||||||
|
|
||||||
|
partInfo := &data.PartInfoExtended{
|
||||||
|
Timestamp: node.GetTimestamp()[0],
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
for _, kv := range node.GetMeta() {
|
for _, kv := range node.GetMeta() {
|
||||||
value := string(kv.GetValue())
|
value := string(kv.GetValue())
|
||||||
switch kv.GetKey() {
|
switch kv.GetKey() {
|
||||||
|
@ -1397,10 +1423,10 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
return nil, layer.ErrNodeNotFound
|
return nil, layer.ErrNodeNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
|
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
|
||||||
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
|
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oid.ID{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
meta := map[string]string{
|
meta := map[string]string{
|
||||||
|
@ -1412,48 +1438,76 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
|
||||||
md5KV: info.MD5,
|
md5KV: info.MD5,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
objToDelete := make([]oid.ID, 0, 1)
|
||||||
|
partsToDelete := make([]uint64, 0, 1)
|
||||||
|
var (
|
||||||
|
latestPartID uint64
|
||||||
|
maxTimestamp uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
multiNodeID := MultiID{multipartNodeID}
|
||||||
|
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
if len(part.GetNodeID()) != 1 {
|
if multiNodeID.Equal(part.GetNodeID()) {
|
||||||
// multipart parts nodeID shouldn't have multiple values
|
|
||||||
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
|
|
||||||
zap.String("key", info.Key),
|
|
||||||
zap.String("upload id", info.UploadID),
|
|
||||||
zap.Uint64("multipart node id ", multipartNodeID),
|
|
||||||
zap.Uint64s("node ids", part.GetNodeID()))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
nodeID := part.GetNodeID()[0]
|
|
||||||
if nodeID == multipartNodeID {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
partInfo, err := newPartInfo(part)
|
partInfo, err := newPartInfo(part)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
|
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
|
||||||
zap.String("key", info.Key),
|
zap.String("key", info.Key),
|
||||||
zap.String("upload id", info.UploadID),
|
zap.String("upload id", info.UploadID),
|
||||||
zap.Uint64("multipart node id ", multipartNodeID),
|
zap.Uint64("multipart node id ", multipartNodeID),
|
||||||
|
zap.Uint64s("id", part.GetNodeID()),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if partInfo.Number == info.Number {
|
if partInfo.Number == info.Number {
|
||||||
return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta)
|
nodeID := part.GetNodeID()[0]
|
||||||
|
objToDelete = append(objToDelete, partInfo.OID)
|
||||||
|
partsToDelete = append(partsToDelete, nodeID)
|
||||||
|
timestamp := partInfo.Timestamp
|
||||||
|
if timestamp > maxTimestamp {
|
||||||
|
maxTimestamp = timestamp
|
||||||
|
latestPartID = nodeID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(objToDelete) != 0 {
|
||||||
|
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil {
|
||||||
|
return nil, fmt.Errorf("move part node: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, nodeID := range partsToDelete {
|
||||||
|
if nodeID == latestPartID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil {
|
||||||
|
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode,
|
||||||
|
zap.String("key", info.Key),
|
||||||
|
zap.String("upload id", info.UploadID),
|
||||||
|
zap.Uint64("id", nodeID))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return objToDelete, nil
|
||||||
|
}
|
||||||
|
|
||||||
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
|
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
|
||||||
return oid.ID{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return oid.ID{}, layer.ErrNoNodeToRemove
|
return nil, layer.ErrNoNodeToRemove
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
|
||||||
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
|
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make([]*data.PartInfo, 0, len(parts))
|
result := make([]*data.PartInfoExtended, 0, len(parts))
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
if len(part.GetNodeID()) != 1 {
|
if len(part.GetNodeID()) != 1 {
|
||||||
// multipart parts nodeID shouldn't have multiple values
|
// multipart parts nodeID shouldn't have multiple values
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
@ -304,3 +305,57 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSplitTreeMultiparts(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
memCli, err := NewTreeServiceClientMemory()
|
||||||
|
require.NoError(t, err)
|
||||||
|
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||||
|
|
||||||
|
bktInfo := &data.BucketInfo{
|
||||||
|
CID: cidtest.ID(),
|
||||||
|
}
|
||||||
|
|
||||||
|
multipartInfo := &data.MultipartInfo{
|
||||||
|
Key: "multipart",
|
||||||
|
UploadID: "id",
|
||||||
|
Meta: map[string]string{},
|
||||||
|
Owner: usertest.ID(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = treeService.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
multipartInfo, err = treeService.GetMultipartUpload(ctx, bktInfo, multipartInfo.Key, multipartInfo.UploadID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var objIDs []oid.ID
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
objID := oidtest.ID()
|
||||||
|
_, err = memCli.AddNode(ctx, bktInfo, systemTree, multipartInfo.ID, map[string]string{
|
||||||
|
partNumberKV: "1",
|
||||||
|
oidKV: objID.EncodeToString(),
|
||||||
|
ownerKV: usertest.ID().EncodeToString(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
objIDs = append(objIDs, objID)
|
||||||
|
}
|
||||||
|
|
||||||
|
parts, err := treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, parts, 2)
|
||||||
|
|
||||||
|
objToDeletes, err := treeService.AddPart(ctx, bktInfo, multipartInfo.ID, &data.PartInfo{
|
||||||
|
Key: multipartInfo.Key,
|
||||||
|
UploadID: multipartInfo.UploadID,
|
||||||
|
Number: 1,
|
||||||
|
OID: oidtest.ID(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, objIDs, objToDeletes, "oids to delete mismatched")
|
||||||
|
|
||||||
|
parts, err = treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, parts, 1)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue
Let's write some comment about timestamp or whole
PartInfoExtended
structure, why we need that at all. If I understand correctly: