[#448] multipart: Support removing duplicated parts #448

Merged
alexvanin merged 1 commit from dkirillov/frostfs-s3-gw:bugfix/split_tree_duplicated_parts into master 2024-09-03 13:20:39 +00:00
9 changed files with 336 additions and 68 deletions

View file

@ -126,6 +126,14 @@ type PartInfo struct {
Created time.Time `json:"created"` Created time.Time `json:"created"`
} }
type PartInfoExtended struct {
PartInfo
alexvanin marked this conversation as resolved Outdated

Let's write some comment about timestamp or whole PartInfoExtended structure, why we need that at all. If I understand correctly:

Timestamp is used to find latest version of part info in case of tree split when there are multiple nodes for the same part.
Let's write some comment about timestamp or whole `PartInfoExtended` structure, why we need that at all. If I understand correctly: ``` Timestamp is used to find latest version of part info in case of tree split when there are multiple nodes for the same part. ```
// Timestamp is used to find the latest version of part info in case of tree split
// when there are multiple nodes for the same part.
Timestamp uint64
}
// ToHeaderString form short part representation to use in S3-Completed-Parts header. // ToHeaderString form short part representation to use in S3-Completed-Parts header.
func (p *PartInfo) ToHeaderString() string { func (p *PartInfo) ToHeaderString() string {
// ETag value contains SHA256 checksum which is used while getting object parts attributes. // ETag value contains SHA256 checksum which is used while getting object parts attributes.

View file

@ -288,6 +288,21 @@ func completeMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID
return w return w
} }
func abortMultipartUpload(hc *handlerContext, bktName, objName, uploadID string) {
w := abortMultipartUploadBase(hc, bktName, objName, uploadID)
assertStatus(hc.t, w, http.StatusNoContent)
}
func abortMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID string) *httptest.ResponseRecorder {
query := make(url.Values)
query.Set(uploadIDQuery, uploadID)
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
hc.Handler().AbortMultipartUploadHandler(w, r)
return w
}
func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) { func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) {
return uploadPartBase(hc, bktName, objName, true, uploadID, num, size) return uploadPartBase(hc, bktName, objName, true, uploadID, num, size)
} }

View file

@ -17,6 +17,10 @@ import (
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -122,6 +126,108 @@ func TestMultipartReUploadPart(t *testing.T) {
equalDataSlices(t, append(data1, data2...), data) equalDataSlices(t, append(data1, data2...), data)
} }
func TestMultipartRemovePartsSplit(t *testing.T) {
bktName, objName := "bucket-to-upload-part", "object-multipart"
partSize := 8
t.Run("reupload part", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
list := listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
require.Len(t, list.Parts, 1)
require.Equal(t, `"etag"`, list.Parts[0].ETag)
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
require.Len(t, list.Parts, 1)
require.Equal(t, etag1, list.Parts[0].ETag)
require.Len(t, hc.tp.Objects(), 1)
})
t.Run("abort multipart", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
abortMultipartUpload(hc, bktName, objName, uploadInfo.UploadID)
require.Empty(t, hc.tp.Objects())
})
t.Run("complete multipart", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
completeMultipartUpload(hc, bktName, objName, uploadInfo.UploadID, []string{etag1})
require.Falsef(t, containsOID(hc.tp.Objects(), objID), "frostfs contains '%s' object, but shouldn't", objID)
})
}
func containsOID(objects []*object.Object, objID oid.ID) bool {
for _, o := range objects {
oID, _ := o.ID()
if oID.Equals(objID) {
return true
}
}
return false
}
func TestListMultipartUploads(t *testing.T) { func TestListMultipartUploads(t *testing.T) {
hc := prepareHandlerContext(t) hc := prepareHandlerContext(t)

View file

@ -290,16 +290,18 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
MD5: hex.EncodeToString(createdObj.MD5Sum), MD5: hex.EncodeToString(createdObj.MD5Sum),
} }
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove) oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !oldPartIDNotFound { if err != nil && !oldPartIDNotFound {
return nil, err return nil, err
} }
if !oldPartIDNotFound { if !oldPartIDNotFound {
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { for _, oldPartID := range oldPartIDs {
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err), if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
zap.String("cid", bktInfo.CID.EncodeToString()), n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
zap.String("oid", oldPartID.EncodeToString())) zap.String("cid", bktInfo.CID.EncodeToString()),
zap.String("oid", oldPartID.EncodeToString()))
}
} }
} }
@ -385,16 +387,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var multipartObjetSize uint64 var multipartObjetSize uint64
var encMultipartObjectSize uint64 var encMultipartObjectSize uint64
parts := make([]*data.PartInfo, 0, len(p.Parts)) parts := make([]*data.PartInfoExtended, 0, len(p.Parts))
var completedPartsHeader strings.Builder var completedPartsHeader strings.Builder
md5Hash := md5.New() md5Hash := md5.New()
for i, part := range p.Parts { for i, part := range p.Parts {
partInfo := partsInfo[part.PartNumber] partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled())
if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) { if partInfo == nil {
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber) return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
} }
delete(partsInfo, part.PartNumber)
// for the last part we have no minimum size limit // for the last part we have no minimum size limit
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize { if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
@ -475,14 +476,16 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var addr oid.Address var addr oid.Address
addr.SetContainer(p.Info.Bkt.CID) addr.SetContainer(p.Info.Bkt.CID)
for _, partInfo := range partsInfo { for _, prts := range partsInfo {
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { for _, partInfo := range prts {
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart, if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID), n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
zap.Error(err)) zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
zap.Error(err))
}
addr.SetObject(partInfo.OID)
n.cache.DeleteObject(addr)
} }
addr.SetObject(partInfo.OID)
n.cache.DeleteObject(addr)
} }
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo) return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
@ -554,10 +557,12 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
return err return err
} }
for _, info := range parts { for _, infos := range parts {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { for _, info := range infos {
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
}
} }
} }
@ -581,7 +586,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
parts := make([]*Part, 0, len(partsInfo)) parts := make([]*Part, 0, len(partsInfo))
for _, partInfo := range partsInfo { for _, infos := range partsInfo {
sort.Slice(infos, func(i, j int) bool {
return infos[i].Timestamp < infos[j].Timestamp
})
partInfo := infos[len(infos)-1]
parts = append(parts, &Part{ parts = append(parts, &Part{
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())), ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
LastModified: partInfo.Created.UTC().Format(time.RFC3339), LastModified: partInfo.Created.UTC().Format(time.RFC3339),
@ -618,7 +628,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
return &res, nil return &res, nil
} }
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) { type PartsInfo map[int][]*data.PartInfoExtended
func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended {
parts := p[part]
for i, info := range parts {
if info.GetETag(md5Enabled) == etag {
p[part] = append(parts[:i], parts[i+1:]...)
return info
}
}
return nil
}
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID) multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
if err != nil { if err != nil {
if errors.Is(err, ErrNodeNotFound) { if errors.Is(err, ErrNodeNotFound) {
@ -632,11 +657,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
return nil, nil, err return nil, nil, err
} }
res := make(map[int]*data.PartInfo, len(parts)) res := make(map[int][]*data.PartInfoExtended, len(parts))
partsNumbers := make([]int, len(parts)) partsNumbers := make([]int, len(parts))
oids := make([]string, len(parts)) oids := make([]string, len(parts))
for i, part := range parts { for i, part := range parts {
res[part.Number] = part res[part.Number] = append(res[part.Number], part)
partsNumbers[i] = part.Number partsNumbers[i] = part.Number
oids[i] = part.OID.EncodeToString() oids[i] = part.OID.EncodeToString()
} }

View file

@ -6,6 +6,7 @@ import (
"io" "io"
"sort" "sort"
"strings" "strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -33,7 +34,7 @@ type TreeServiceMock struct {
locks map[string]map[uint64]*data.LockInfo locks map[string]map[uint64]*data.LockInfo
tags map[string]map[uint64]map[string]string tags map[string]map[uint64]map[string]string
multiparts map[string]map[string][]*data.MultipartInfo multiparts map[string]map[string][]*data.MultipartInfo
parts map[string]map[int]*data.PartInfo parts map[string]map[int]*data.PartInfoExtended
} }
func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) { func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
@ -92,7 +93,7 @@ func NewTreeService() *TreeServiceMock {
locks: make(map[string]map[uint64]*data.LockInfo), locks: make(map[string]map[uint64]*data.LockInfo),
tags: make(map[string]map[uint64]map[string]string), tags: make(map[string]map[uint64]map[string]string),
multiparts: make(map[string]map[string][]*data.MultipartInfo), multiparts: make(map[string]map[string][]*data.MultipartInfo),
parts: make(map[string]map[int]*data.PartInfo), parts: make(map[string]map[int]*data.PartInfoExtended),
} }
} }
@ -346,28 +347,31 @@ func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, bktInfo *data.Bu
return nil, ErrNodeNotFound return nil, ErrNodeNotFound
} }
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID) multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID)
if err != nil { if err != nil {
return oid.ID{}, err return nil, err
} }
if multipartInfo.ID != multipartNodeID { if multipartInfo.ID != multipartNodeID {
return oid.ID{}, fmt.Errorf("invalid multipart info id") return nil, fmt.Errorf("invalid multipart info id")
} }
partsMap, ok := t.parts[info.UploadID] partsMap, ok := t.parts[info.UploadID]
if !ok { if !ok {
partsMap = make(map[int]*data.PartInfo) partsMap = make(map[int]*data.PartInfoExtended)
} }
partsMap[info.Number] = info partsMap[info.Number] = &data.PartInfoExtended{
PartInfo: *info,
Timestamp: uint64(time.Now().UnixMicro()),
}
t.parts[info.UploadID] = partsMap t.parts[info.UploadID] = partsMap
return oid.ID{}, nil return nil, nil
} }
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
var foundMultipart *data.MultipartInfo var foundMultipart *data.MultipartInfo
@ -387,7 +391,7 @@ LOOP:
} }
partsMap := t.parts[foundMultipart.UploadID] partsMap := t.parts[foundMultipart.UploadID]
result := make([]*data.PartInfo, 0, len(partsMap)) result := make([]*data.PartInfoExtended, 0, len(partsMap))
for _, part := range partsMap { for _, part := range partsMap {
result = append(result, part) result = append(result, part)
} }

View file

@ -57,11 +57,11 @@ type TreeService interface {
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
// AddPart puts a node to a system tree as a child of appropriate multipart upload // AddPart puts a node to a system tree as a child of appropriate multipart upload
// and returns objectID of a previous part which must be deleted in FrostFS. // and returns objectIDs of a previous part/s which must be deleted in FrostFS.
// //
// If object id to remove is not found returns ErrNoNodeToRemove error. // If object ids to remove is not found returns ErrNoNodeToRemove error.
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error)
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error)
PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error)
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error)

View file

@ -161,4 +161,5 @@ const (
WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped" WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped"
WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped" WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped"
WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped" WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped"
FailedToRemoveOldPartNode = "failed to remove old part node"
) )

View file

@ -156,6 +156,10 @@ type NodeResponse interface {
} }
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
if err := validateNodeResponse(nodeInfo); err != nil {
return nil, err
}
tNode := &treeNode{ tNode := &treeNode{
ID: nodeInfo.GetNodeID(), ID: nodeInfo.GetNodeID(),
ParentID: nodeInfo.GetParentID(), ParentID: nodeInfo.GetParentID(),
@ -163,14 +167,6 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
Meta: make(map[string]string, len(nodeInfo.GetMeta())), Meta: make(map[string]string, len(nodeInfo.GetMeta())),
} }
if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 {
return nil, errors.New("invalid tree node: missing id")
}
if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) {
return nil, errors.New("invalid tree node: length multiple ids mismatch")
}
for _, kv := range nodeInfo.GetMeta() { for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() { switch kv.GetKey() {
case oidKV: case oidKV:
@ -377,6 +373,10 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
} }
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) { func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 { if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid multipart node: this is split node") return nil, errors.New("invalid multipart node: this is split node")
} }
@ -426,10 +426,36 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
return multipartInfo, nil return multipartInfo, nil
} }
func newPartInfo(node NodeResponse) (*data.PartInfo, error) { func validateNodeResponse(node NodeResponse) error {
var err error ids := node.GetNodeID()
partInfo := &data.PartInfo{} parentIDs := node.GetParentID()
timestamps := node.GetTimestamp()
if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 {
return errors.New("invalid node response: missing ids")
}
if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) {
return errors.New("invalid node response: multiple ids length mismatch")
}
return nil
}
func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid part node: this is split node")
}
partInfo := &data.PartInfoExtended{
Timestamp: node.GetTimestamp()[0],
}
var err error
for _, kv := range node.GetMeta() { for _, kv := range node.GetMeta() {
value := string(kv.GetValue()) value := string(kv.GetValue())
switch kv.GetKey() { switch kv.GetKey() {
@ -1397,10 +1423,10 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
return nil, layer.ErrNodeNotFound return nil, layer.ErrNodeNotFound
} }
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false) parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil { if err != nil {
return oid.ID{}, err return nil, err
} }
meta := map[string]string{ meta := map[string]string{
@ -1412,48 +1438,76 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
md5KV: info.MD5, md5KV: info.MD5,
} }
objToDelete := make([]oid.ID, 0, 1)
partsToDelete := make([]uint64, 0, 1)
var (
latestPartID uint64
maxTimestamp uint64
)
multiNodeID := MultiID{multipartNodeID}
for _, part := range parts { for _, part := range parts {
if len(part.GetNodeID()) != 1 { if multiNodeID.Equal(part.GetNodeID()) {
// multipart parts nodeID shouldn't have multiple values
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("node ids", part.GetNodeID()))
continue
}
nodeID := part.GetNodeID()[0]
if nodeID == multipartNodeID {
continue continue
} }
partInfo, err := newPartInfo(part) partInfo, err := newPartInfo(part)
if err != nil { if err != nil {
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo, c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
zap.String("key", info.Key), zap.String("key", info.Key),
zap.String("upload id", info.UploadID), zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID), zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("id", part.GetNodeID()),
zap.Error(err)) zap.Error(err))
continue continue
} }
if partInfo.Number == info.Number { if partInfo.Number == info.Number {
return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta) nodeID := part.GetNodeID()[0]
objToDelete = append(objToDelete, partInfo.OID)
partsToDelete = append(partsToDelete, nodeID)
timestamp := partInfo.Timestamp
if timestamp > maxTimestamp {
maxTimestamp = timestamp
latestPartID = nodeID
}
} }
} }
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { if len(objToDelete) != 0 {
return oid.ID{}, err if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil {
return nil, fmt.Errorf("move part node: %w", err)
}
for _, nodeID := range partsToDelete {
if nodeID == latestPartID {
continue
}
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("id", nodeID))
}
}
return objToDelete, nil
} }
return oid.ID{}, layer.ErrNoNodeToRemove if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
return nil, err
}
return nil, layer.ErrNoNodeToRemove
} }
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false) parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
result := make([]*data.PartInfo, 0, len(parts)) result := make([]*data.PartInfoExtended, 0, len(parts))
for _, part := range parts { for _, part := range parts {
if len(part.GetNodeID()) != 1 { if len(part.GetNodeID()) != 1 {
// multipart parts nodeID shouldn't have multiple values // multipart parts nodeID shouldn't have multiple values

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -304,3 +305,57 @@ func TestGetLatestNode(t *testing.T) {
}) })
} }
} }
func TestSplitTreeMultiparts(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
bktInfo := &data.BucketInfo{
CID: cidtest.ID(),
}
multipartInfo := &data.MultipartInfo{
Key: "multipart",
UploadID: "id",
Meta: map[string]string{},
Owner: usertest.ID(),
}
err = treeService.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
require.NoError(t, err)
multipartInfo, err = treeService.GetMultipartUpload(ctx, bktInfo, multipartInfo.Key, multipartInfo.UploadID)
require.NoError(t, err)
var objIDs []oid.ID
for i := 0; i < 2; i++ {
objID := oidtest.ID()
_, err = memCli.AddNode(ctx, bktInfo, systemTree, multipartInfo.ID, map[string]string{
partNumberKV: "1",
oidKV: objID.EncodeToString(),
ownerKV: usertest.ID().EncodeToString(),
})
require.NoError(t, err)
objIDs = append(objIDs, objID)
}
parts, err := treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
require.NoError(t, err)
require.Len(t, parts, 2)
objToDeletes, err := treeService.AddPart(ctx, bktInfo, multipartInfo.ID, &data.PartInfo{
Key: multipartInfo.Key,
UploadID: multipartInfo.UploadID,
Number: 1,
OID: oidtest.ID(),
})
require.NoError(t, err)
require.EqualValues(t, objIDs, objToDeletes, "oids to delete mismatched")
parts, err = treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
require.NoError(t, err)
require.Len(t, parts, 1)
}