[#448] multipart: Support removing duplicated parts #448

Merged
alexvanin merged 1 commit from dkirillov/frostfs-s3-gw:bugfix/split_tree_duplicated_parts into master 2024-09-03 13:20:39 +00:00
9 changed files with 336 additions and 68 deletions

View file

@ -126,6 +126,14 @@ type PartInfo struct {
Created time.Time `json:"created"` Created time.Time `json:"created"`
} }
type PartInfoExtended struct {
PartInfo
// Timestamp is used to find the latest version of part info in case of tree split
// when there are multiple nodes for the same part.
Timestamp uint64
}
// ToHeaderString form short part representation to use in S3-Completed-Parts header. // ToHeaderString form short part representation to use in S3-Completed-Parts header.
func (p *PartInfo) ToHeaderString() string { func (p *PartInfo) ToHeaderString() string {
// ETag value contains SHA256 checksum which is used while getting object parts attributes. // ETag value contains SHA256 checksum which is used while getting object parts attributes.

View file

@ -288,6 +288,21 @@ func completeMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID
return w return w
} }
func abortMultipartUpload(hc *handlerContext, bktName, objName, uploadID string) {
w := abortMultipartUploadBase(hc, bktName, objName, uploadID)
assertStatus(hc.t, w, http.StatusNoContent)
}
func abortMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID string) *httptest.ResponseRecorder {
query := make(url.Values)
query.Set(uploadIDQuery, uploadID)
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
hc.Handler().AbortMultipartUploadHandler(w, r)
return w
}
func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) { func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) {
return uploadPartBase(hc, bktName, objName, true, uploadID, num, size) return uploadPartBase(hc, bktName, objName, true, uploadID, num, size)
} }

View file

@ -17,6 +17,10 @@ import (
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -122,6 +126,108 @@ func TestMultipartReUploadPart(t *testing.T) {
equalDataSlices(t, append(data1, data2...), data) equalDataSlices(t, append(data1, data2...), data)
} }
func TestMultipartRemovePartsSplit(t *testing.T) {
bktName, objName := "bucket-to-upload-part", "object-multipart"
partSize := 8
t.Run("reupload part", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
list := listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
require.Len(t, list.Parts, 1)
require.Equal(t, `"etag"`, list.Parts[0].ETag)
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
require.Len(t, list.Parts, 1)
require.Equal(t, etag1, list.Parts[0].ETag)
require.Len(t, hc.tp.Objects(), 1)
})
t.Run("abort multipart", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
abortMultipartUpload(hc, bktName, objName, uploadInfo.UploadID)
require.Empty(t, hc.tp.Objects())
})
t.Run("complete multipart", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
completeMultipartUpload(hc, bktName, objName, uploadInfo.UploadID, []string{etag1})
require.Falsef(t, containsOID(hc.tp.Objects(), objID), "frostfs contains '%s' object, but shouldn't", objID)
})
}
func containsOID(objects []*object.Object, objID oid.ID) bool {
for _, o := range objects {
oID, _ := o.ID()
if oID.Equals(objID) {
return true
}
}
return false
}
func TestListMultipartUploads(t *testing.T) { func TestListMultipartUploads(t *testing.T) {
hc := prepareHandlerContext(t) hc := prepareHandlerContext(t)

View file

@ -290,18 +290,20 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
MD5: hex.EncodeToString(createdObj.MD5Sum), MD5: hex.EncodeToString(createdObj.MD5Sum),
} }
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove) oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !oldPartIDNotFound { if err != nil && !oldPartIDNotFound {
return nil, err return nil, err
} }
if !oldPartIDNotFound { if !oldPartIDNotFound {
for _, oldPartID := range oldPartIDs {
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err), n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
zap.String("cid", bktInfo.CID.EncodeToString()), zap.String("cid", bktInfo.CID.EncodeToString()),
zap.String("oid", oldPartID.EncodeToString())) zap.String("oid", oldPartID.EncodeToString()))
} }
} }
}
objInfo := &data.ObjectInfo{ objInfo := &data.ObjectInfo{
ID: createdObj.ID, ID: createdObj.ID,
@ -385,16 +387,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var multipartObjetSize uint64 var multipartObjetSize uint64
var encMultipartObjectSize uint64 var encMultipartObjectSize uint64
parts := make([]*data.PartInfo, 0, len(p.Parts)) parts := make([]*data.PartInfoExtended, 0, len(p.Parts))
var completedPartsHeader strings.Builder var completedPartsHeader strings.Builder
md5Hash := md5.New() md5Hash := md5.New()
for i, part := range p.Parts { for i, part := range p.Parts {
partInfo := partsInfo[part.PartNumber] partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled())
if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) { if partInfo == nil {
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber) return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
} }
delete(partsInfo, part.PartNumber)
// for the last part we have no minimum size limit // for the last part we have no minimum size limit
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize { if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
@ -475,7 +476,8 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var addr oid.Address var addr oid.Address
addr.SetContainer(p.Info.Bkt.CID) addr.SetContainer(p.Info.Bkt.CID)
for _, partInfo := range partsInfo { for _, prts := range partsInfo {
for _, partInfo := range prts {
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart, n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID), zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
@ -484,6 +486,7 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
addr.SetObject(partInfo.OID) addr.SetObject(partInfo.OID)
n.cache.DeleteObject(addr) n.cache.DeleteObject(addr)
} }
}
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo) return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
} }
@ -554,12 +557,14 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
return err return err
} }
for _, info := range parts { for _, infos := range parts {
for _, info := range infos {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
} }
} }
}
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo) return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
} }
@ -581,7 +586,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
parts := make([]*Part, 0, len(partsInfo)) parts := make([]*Part, 0, len(partsInfo))
for _, partInfo := range partsInfo { for _, infos := range partsInfo {
sort.Slice(infos, func(i, j int) bool {
return infos[i].Timestamp < infos[j].Timestamp
})
partInfo := infos[len(infos)-1]
parts = append(parts, &Part{ parts = append(parts, &Part{
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())), ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
LastModified: partInfo.Created.UTC().Format(time.RFC3339), LastModified: partInfo.Created.UTC().Format(time.RFC3339),
@ -618,7 +628,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
return &res, nil return &res, nil
} }
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) { type PartsInfo map[int][]*data.PartInfoExtended
func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended {
parts := p[part]
for i, info := range parts {
if info.GetETag(md5Enabled) == etag {
p[part] = append(parts[:i], parts[i+1:]...)
return info
}
}
return nil
}
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID) multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
if err != nil { if err != nil {
if errors.Is(err, ErrNodeNotFound) { if errors.Is(err, ErrNodeNotFound) {
@ -632,11 +657,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
return nil, nil, err return nil, nil, err
} }
res := make(map[int]*data.PartInfo, len(parts)) res := make(map[int][]*data.PartInfoExtended, len(parts))
partsNumbers := make([]int, len(parts)) partsNumbers := make([]int, len(parts))
oids := make([]string, len(parts)) oids := make([]string, len(parts))
for i, part := range parts { for i, part := range parts {
res[part.Number] = part res[part.Number] = append(res[part.Number], part)
partsNumbers[i] = part.Number partsNumbers[i] = part.Number
oids[i] = part.OID.EncodeToString() oids[i] = part.OID.EncodeToString()
} }

View file

@ -6,6 +6,7 @@ import (
"io" "io"
"sort" "sort"
"strings" "strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -33,7 +34,7 @@ type TreeServiceMock struct {
locks map[string]map[uint64]*data.LockInfo locks map[string]map[uint64]*data.LockInfo
tags map[string]map[uint64]map[string]string tags map[string]map[uint64]map[string]string
multiparts map[string]map[string][]*data.MultipartInfo multiparts map[string]map[string][]*data.MultipartInfo
parts map[string]map[int]*data.PartInfo parts map[string]map[int]*data.PartInfoExtended
} }
func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) { func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
@ -92,7 +93,7 @@ func NewTreeService() *TreeServiceMock {
locks: make(map[string]map[uint64]*data.LockInfo), locks: make(map[string]map[uint64]*data.LockInfo),
tags: make(map[string]map[uint64]map[string]string), tags: make(map[string]map[uint64]map[string]string),
multiparts: make(map[string]map[string][]*data.MultipartInfo), multiparts: make(map[string]map[string][]*data.MultipartInfo),
parts: make(map[string]map[int]*data.PartInfo), parts: make(map[string]map[int]*data.PartInfoExtended),
} }
} }
@ -346,28 +347,31 @@ func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, bktInfo *data.Bu
return nil, ErrNodeNotFound return nil, ErrNodeNotFound
} }
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID) multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID)
if err != nil { if err != nil {
return oid.ID{}, err return nil, err
} }
if multipartInfo.ID != multipartNodeID { if multipartInfo.ID != multipartNodeID {
return oid.ID{}, fmt.Errorf("invalid multipart info id") return nil, fmt.Errorf("invalid multipart info id")
} }
partsMap, ok := t.parts[info.UploadID] partsMap, ok := t.parts[info.UploadID]
if !ok { if !ok {
partsMap = make(map[int]*data.PartInfo) partsMap = make(map[int]*data.PartInfoExtended)
} }
partsMap[info.Number] = info partsMap[info.Number] = &data.PartInfoExtended{
PartInfo: *info,
Timestamp: uint64(time.Now().UnixMicro()),
}
t.parts[info.UploadID] = partsMap t.parts[info.UploadID] = partsMap
return oid.ID{}, nil return nil, nil
} }
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
var foundMultipart *data.MultipartInfo var foundMultipart *data.MultipartInfo
@ -387,7 +391,7 @@ LOOP:
} }
partsMap := t.parts[foundMultipart.UploadID] partsMap := t.parts[foundMultipart.UploadID]
result := make([]*data.PartInfo, 0, len(partsMap)) result := make([]*data.PartInfoExtended, 0, len(partsMap))
for _, part := range partsMap { for _, part := range partsMap {
result = append(result, part) result = append(result, part)
} }

View file

@ -57,11 +57,11 @@ type TreeService interface {
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
// AddPart puts a node to a system tree as a child of appropriate multipart upload // AddPart puts a node to a system tree as a child of appropriate multipart upload
// and returns objectID of a previous part which must be deleted in FrostFS. // and returns objectIDs of a previous part/s which must be deleted in FrostFS.
// //
// If object id to remove is not found returns ErrNoNodeToRemove error. // If object ids to remove is not found returns ErrNoNodeToRemove error.
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error)
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error)
PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error)
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error)

View file

@ -161,4 +161,5 @@ const (
WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped" WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped"
WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped" WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped"
WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped" WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped"
FailedToRemoveOldPartNode = "failed to remove old part node"
) )

View file

@ -156,6 +156,10 @@ type NodeResponse interface {
} }
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
if err := validateNodeResponse(nodeInfo); err != nil {
return nil, err
}
tNode := &treeNode{ tNode := &treeNode{
ID: nodeInfo.GetNodeID(), ID: nodeInfo.GetNodeID(),
ParentID: nodeInfo.GetParentID(), ParentID: nodeInfo.GetParentID(),
@ -163,14 +167,6 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
Meta: make(map[string]string, len(nodeInfo.GetMeta())), Meta: make(map[string]string, len(nodeInfo.GetMeta())),
} }
if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 {
return nil, errors.New("invalid tree node: missing id")
}
if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) {
return nil, errors.New("invalid tree node: length multiple ids mismatch")
}
for _, kv := range nodeInfo.GetMeta() { for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() { switch kv.GetKey() {
case oidKV: case oidKV:
@ -377,6 +373,10 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
} }
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) { func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 { if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid multipart node: this is split node") return nil, errors.New("invalid multipart node: this is split node")
} }
@ -426,10 +426,36 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
return multipartInfo, nil return multipartInfo, nil
} }
func newPartInfo(node NodeResponse) (*data.PartInfo, error) { func validateNodeResponse(node NodeResponse) error {
var err error ids := node.GetNodeID()
partInfo := &data.PartInfo{} parentIDs := node.GetParentID()
timestamps := node.GetTimestamp()
if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 {
return errors.New("invalid node response: missing ids")
}
if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) {
return errors.New("invalid node response: multiple ids length mismatch")
}
return nil
}
func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid part node: this is split node")
}
partInfo := &data.PartInfoExtended{
Timestamp: node.GetTimestamp()[0],
}
var err error
for _, kv := range node.GetMeta() { for _, kv := range node.GetMeta() {
value := string(kv.GetValue()) value := string(kv.GetValue())
switch kv.GetKey() { switch kv.GetKey() {
@ -1397,10 +1423,10 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
return nil, layer.ErrNodeNotFound return nil, layer.ErrNodeNotFound
} }
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false) parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil { if err != nil {
return oid.ID{}, err return nil, err
} }
meta := map[string]string{ meta := map[string]string{
@ -1412,48 +1438,76 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
md5KV: info.MD5, md5KV: info.MD5,
} }
objToDelete := make([]oid.ID, 0, 1)
partsToDelete := make([]uint64, 0, 1)
var (
latestPartID uint64
maxTimestamp uint64
)
multiNodeID := MultiID{multipartNodeID}
for _, part := range parts { for _, part := range parts {
if len(part.GetNodeID()) != 1 { if multiNodeID.Equal(part.GetNodeID()) {
// multipart parts nodeID shouldn't have multiple values
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("node ids", part.GetNodeID()))
continue
}
nodeID := part.GetNodeID()[0]
if nodeID == multipartNodeID {
continue continue
} }
partInfo, err := newPartInfo(part) partInfo, err := newPartInfo(part)
if err != nil { if err != nil {
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo, c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
zap.String("key", info.Key), zap.String("key", info.Key),
zap.String("upload id", info.UploadID), zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID), zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("id", part.GetNodeID()),
zap.Error(err)) zap.Error(err))
continue continue
} }
if partInfo.Number == info.Number { if partInfo.Number == info.Number {
return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta) nodeID := part.GetNodeID()[0]
objToDelete = append(objToDelete, partInfo.OID)
partsToDelete = append(partsToDelete, nodeID)
timestamp := partInfo.Timestamp
if timestamp > maxTimestamp {
maxTimestamp = timestamp
latestPartID = nodeID
} }
} }
}
if len(objToDelete) != 0 {
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil {
return nil, fmt.Errorf("move part node: %w", err)
}
for _, nodeID := range partsToDelete {
if nodeID == latestPartID {
continue
}
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("id", nodeID))
}
}
return objToDelete, nil
}
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
return oid.ID{}, err return nil, err
} }
return oid.ID{}, layer.ErrNoNodeToRemove return nil, layer.ErrNoNodeToRemove
} }
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false) parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
result := make([]*data.PartInfo, 0, len(parts)) result := make([]*data.PartInfoExtended, 0, len(parts))
for _, part := range parts { for _, part := range parts {
if len(part.GetNodeID()) != 1 { if len(part.GetNodeID()) != 1 {
// multipart parts nodeID shouldn't have multiple values // multipart parts nodeID shouldn't have multiple values

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -304,3 +305,57 @@ func TestGetLatestNode(t *testing.T) {
}) })
} }
} }
func TestSplitTreeMultiparts(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
bktInfo := &data.BucketInfo{
CID: cidtest.ID(),
}
multipartInfo := &data.MultipartInfo{
Key: "multipart",
UploadID: "id",
Meta: map[string]string{},
Owner: usertest.ID(),
}
err = treeService.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
require.NoError(t, err)
multipartInfo, err = treeService.GetMultipartUpload(ctx, bktInfo, multipartInfo.Key, multipartInfo.UploadID)
require.NoError(t, err)
var objIDs []oid.ID
for i := 0; i < 2; i++ {
objID := oidtest.ID()
_, err = memCli.AddNode(ctx, bktInfo, systemTree, multipartInfo.ID, map[string]string{
partNumberKV: "1",
oidKV: objID.EncodeToString(),
ownerKV: usertest.ID().EncodeToString(),
})
require.NoError(t, err)
objIDs = append(objIDs, objID)
}
parts, err := treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
require.NoError(t, err)
require.Len(t, parts, 2)
objToDeletes, err := treeService.AddPart(ctx, bktInfo, multipartInfo.ID, &data.PartInfo{
Key: multipartInfo.Key,
UploadID: multipartInfo.UploadID,
Number: 1,
OID: oidtest.ID(),
})
require.NoError(t, err)
require.EqualValues(t, objIDs, objToDeletes, "oids to delete mismatched")
parts, err = treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
require.NoError(t, err)
require.Len(t, parts, 1)
}