[#448] multipart: Support removing duplicated parts #448

Merged
alexvanin merged 1 commit from dkirillov/frostfs-s3-gw:bugfix/split_tree_duplicated_parts into master 2024-09-03 13:20:39 +00:00
9 changed files with 336 additions and 68 deletions

View file

@ -126,6 +126,14 @@ type PartInfo struct {
Created time.Time `json:"created"`
}
type PartInfoExtended struct {
PartInfo
// Timestamp is used to find the latest version of part info in case of tree split
// when there are multiple nodes for the same part.
Timestamp uint64
}
// ToHeaderString form short part representation to use in S3-Completed-Parts header.
func (p *PartInfo) ToHeaderString() string {
// ETag value contains SHA256 checksum which is used while getting object parts attributes.

View file

@ -288,6 +288,21 @@ func completeMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID
return w
}
func abortMultipartUpload(hc *handlerContext, bktName, objName, uploadID string) {
w := abortMultipartUploadBase(hc, bktName, objName, uploadID)
assertStatus(hc.t, w, http.StatusNoContent)
}
func abortMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID string) *httptest.ResponseRecorder {
query := make(url.Values)
query.Set(uploadIDQuery, uploadID)
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
hc.Handler().AbortMultipartUploadHandler(w, r)
return w
}
func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) {
return uploadPartBase(hc, bktName, objName, true, uploadID, num, size)
}

View file

@ -17,6 +17,10 @@ import (
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/stretchr/testify/require"
)
@ -122,6 +126,108 @@ func TestMultipartReUploadPart(t *testing.T) {
equalDataSlices(t, append(data1, data2...), data)
}
func TestMultipartRemovePartsSplit(t *testing.T) {
bktName, objName := "bucket-to-upload-part", "object-multipart"
partSize := 8
t.Run("reupload part", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
list := listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
require.Len(t, list.Parts, 1)
require.Equal(t, `"etag"`, list.Parts[0].ETag)
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
require.Len(t, list.Parts, 1)
require.Equal(t, etag1, list.Parts[0].ETag)
require.Len(t, hc.tp.Objects(), 1)
})
t.Run("abort multipart", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
abortMultipartUpload(hc, bktName, objName, uploadInfo.UploadID)
require.Empty(t, hc.tp.Objects())
})
t.Run("complete multipart", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
completeMultipartUpload(hc, bktName, objName, uploadInfo.UploadID, []string{etag1})
require.Falsef(t, containsOID(hc.tp.Objects(), objID), "frostfs contains '%s' object, but shouldn't", objID)
})
}
func containsOID(objects []*object.Object, objID oid.ID) bool {
for _, o := range objects {
oID, _ := o.ID()
if oID.Equals(objID) {
return true
}
}
return false
}
func TestListMultipartUploads(t *testing.T) {
hc := prepareHandlerContext(t)

View file

@ -290,16 +290,18 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
MD5: hex.EncodeToString(createdObj.MD5Sum),
}
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !oldPartIDNotFound {
return nil, err
}
if !oldPartIDNotFound {
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
zap.String("cid", bktInfo.CID.EncodeToString()),
zap.String("oid", oldPartID.EncodeToString()))
for _, oldPartID := range oldPartIDs {
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
zap.String("cid", bktInfo.CID.EncodeToString()),
zap.String("oid", oldPartID.EncodeToString()))
}
}
}
@ -385,16 +387,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var multipartObjetSize uint64
var encMultipartObjectSize uint64
parts := make([]*data.PartInfo, 0, len(p.Parts))
parts := make([]*data.PartInfoExtended, 0, len(p.Parts))
var completedPartsHeader strings.Builder
md5Hash := md5.New()
for i, part := range p.Parts {
partInfo := partsInfo[part.PartNumber]
if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) {
partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled())
if partInfo == nil {
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
}
delete(partsInfo, part.PartNumber)
// for the last part we have no minimum size limit
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
@ -475,14 +476,16 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var addr oid.Address
addr.SetContainer(p.Info.Bkt.CID)
for _, partInfo := range partsInfo {
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
zap.Error(err))
for _, prts := range partsInfo {
for _, partInfo := range prts {
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
zap.Error(err))
}
addr.SetObject(partInfo.OID)
n.cache.DeleteObject(addr)
}
addr.SetObject(partInfo.OID)
n.cache.DeleteObject(addr)
}
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
@ -554,10 +557,12 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
return err
}
for _, info := range parts {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
for _, infos := range parts {
for _, info := range infos {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
}
}
}
@ -581,7 +586,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
parts := make([]*Part, 0, len(partsInfo))
for _, partInfo := range partsInfo {
for _, infos := range partsInfo {
sort.Slice(infos, func(i, j int) bool {
return infos[i].Timestamp < infos[j].Timestamp
})
partInfo := infos[len(infos)-1]
parts = append(parts, &Part{
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
LastModified: partInfo.Created.UTC().Format(time.RFC3339),
@ -618,7 +628,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
return &res, nil
}
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) {
type PartsInfo map[int][]*data.PartInfoExtended
func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended {
parts := p[part]
for i, info := range parts {
if info.GetETag(md5Enabled) == etag {
p[part] = append(parts[:i], parts[i+1:]...)
return info
}
}
return nil
}
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
@ -632,11 +657,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
return nil, nil, err
}
res := make(map[int]*data.PartInfo, len(parts))
res := make(map[int][]*data.PartInfoExtended, len(parts))
partsNumbers := make([]int, len(parts))
oids := make([]string, len(parts))
for i, part := range parts {
res[part.Number] = part
res[part.Number] = append(res[part.Number], part)
partsNumbers[i] = part.Number
oids[i] = part.OID.EncodeToString()
}

View file

@ -6,6 +6,7 @@ import (
"io"
"sort"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -33,7 +34,7 @@ type TreeServiceMock struct {
locks map[string]map[uint64]*data.LockInfo
tags map[string]map[uint64]map[string]string
multiparts map[string]map[string][]*data.MultipartInfo
parts map[string]map[int]*data.PartInfo
parts map[string]map[int]*data.PartInfoExtended
}
func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
@ -92,7 +93,7 @@ func NewTreeService() *TreeServiceMock {
locks: make(map[string]map[uint64]*data.LockInfo),
tags: make(map[string]map[uint64]map[string]string),
multiparts: make(map[string]map[string][]*data.MultipartInfo),
parts: make(map[string]map[int]*data.PartInfo),
parts: make(map[string]map[int]*data.PartInfoExtended),
}
}
@ -346,28 +347,31 @@ func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, bktInfo *data.Bu
return nil, ErrNodeNotFound
}
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID)
if err != nil {
return oid.ID{}, err
return nil, err
}
if multipartInfo.ID != multipartNodeID {
return oid.ID{}, fmt.Errorf("invalid multipart info id")
return nil, fmt.Errorf("invalid multipart info id")
}
partsMap, ok := t.parts[info.UploadID]
if !ok {
partsMap = make(map[int]*data.PartInfo)
partsMap = make(map[int]*data.PartInfoExtended)
}
partsMap[info.Number] = info
partsMap[info.Number] = &data.PartInfoExtended{
PartInfo: *info,
Timestamp: uint64(time.Now().UnixMicro()),
}
t.parts[info.UploadID] = partsMap
return oid.ID{}, nil
return nil, nil
}
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
var foundMultipart *data.MultipartInfo
@ -387,7 +391,7 @@ LOOP:
}
partsMap := t.parts[foundMultipart.UploadID]
result := make([]*data.PartInfo, 0, len(partsMap))
result := make([]*data.PartInfoExtended, 0, len(partsMap))
for _, part := range partsMap {
result = append(result, part)
}

View file

@ -57,11 +57,11 @@ type TreeService interface {
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
// AddPart puts a node to a system tree as a child of appropriate multipart upload
// and returns objectID of a previous part which must be deleted in FrostFS.
// and returns objectIDs of a previous part/s which must be deleted in FrostFS.
//
// If object id to remove is not found returns ErrNoNodeToRemove error.
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error)
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error)
// If object ids to remove is not found returns ErrNoNodeToRemove error.
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error)
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error)
PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error)
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error)

View file

@ -161,4 +161,5 @@ const (
WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped"
WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped"
WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped"
FailedToRemoveOldPartNode = "failed to remove old part node"
)

View file

@ -156,6 +156,10 @@ type NodeResponse interface {
}
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
if err := validateNodeResponse(nodeInfo); err != nil {
return nil, err
}
tNode := &treeNode{
ID: nodeInfo.GetNodeID(),
ParentID: nodeInfo.GetParentID(),
@ -163,14 +167,6 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
}
if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 {
return nil, errors.New("invalid tree node: missing id")
}
if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) {
return nil, errors.New("invalid tree node: length multiple ids mismatch")
}
for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() {
case oidKV:
@ -377,6 +373,10 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
}
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid multipart node: this is split node")
}
@ -426,10 +426,36 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
return multipartInfo, nil
}
func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
var err error
partInfo := &data.PartInfo{}
func validateNodeResponse(node NodeResponse) error {
ids := node.GetNodeID()
parentIDs := node.GetParentID()
timestamps := node.GetTimestamp()
if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 {
return errors.New("invalid node response: missing ids")
}
if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) {
return errors.New("invalid node response: multiple ids length mismatch")
}
return nil
}
func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid part node: this is split node")
}
partInfo := &data.PartInfoExtended{
Timestamp: node.GetTimestamp()[0],
}
var err error
for _, kv := range node.GetMeta() {
value := string(kv.GetValue())
switch kv.GetKey() {
@ -1397,10 +1423,10 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
return nil, layer.ErrNodeNotFound
}
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil {
return oid.ID{}, err
return nil, err
}
meta := map[string]string{
@ -1412,48 +1438,76 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
md5KV: info.MD5,
}
objToDelete := make([]oid.ID, 0, 1)
partsToDelete := make([]uint64, 0, 1)
var (
latestPartID uint64
maxTimestamp uint64
)
multiNodeID := MultiID{multipartNodeID}
for _, part := range parts {
if len(part.GetNodeID()) != 1 {
// multipart parts nodeID shouldn't have multiple values
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("node ids", part.GetNodeID()))
continue
}
nodeID := part.GetNodeID()[0]
if nodeID == multipartNodeID {
if multiNodeID.Equal(part.GetNodeID()) {
continue
}
partInfo, err := newPartInfo(part)
if err != nil {
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("id", part.GetNodeID()),
zap.Error(err))
continue
}
if partInfo.Number == info.Number {
return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta)
nodeID := part.GetNodeID()[0]
objToDelete = append(objToDelete, partInfo.OID)
partsToDelete = append(partsToDelete, nodeID)
timestamp := partInfo.Timestamp
if timestamp > maxTimestamp {
maxTimestamp = timestamp
latestPartID = nodeID
}
}
}
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
return oid.ID{}, err
if len(objToDelete) != 0 {
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil {
return nil, fmt.Errorf("move part node: %w", err)
}
for _, nodeID := range partsToDelete {
if nodeID == latestPartID {
continue
}
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("id", nodeID))
}
}
return objToDelete, nil
}
return oid.ID{}, layer.ErrNoNodeToRemove
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
return nil, err
}
return nil, layer.ErrNoNodeToRemove
}
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil {
return nil, err
}
result := make([]*data.PartInfo, 0, len(parts))
result := make([]*data.PartInfoExtended, 0, len(parts))
for _, part := range parts {
if len(part.GetNodeID()) != 1 {
// multipart parts nodeID shouldn't have multiple values

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -304,3 +305,57 @@ func TestGetLatestNode(t *testing.T) {
})
}
}
func TestSplitTreeMultiparts(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
bktInfo := &data.BucketInfo{
CID: cidtest.ID(),
}
multipartInfo := &data.MultipartInfo{
Key: "multipart",
UploadID: "id",
Meta: map[string]string{},
Owner: usertest.ID(),
}
err = treeService.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
require.NoError(t, err)
multipartInfo, err = treeService.GetMultipartUpload(ctx, bktInfo, multipartInfo.Key, multipartInfo.UploadID)
require.NoError(t, err)
var objIDs []oid.ID
for i := 0; i < 2; i++ {
objID := oidtest.ID()
_, err = memCli.AddNode(ctx, bktInfo, systemTree, multipartInfo.ID, map[string]string{
partNumberKV: "1",
oidKV: objID.EncodeToString(),
ownerKV: usertest.ID().EncodeToString(),
})
require.NoError(t, err)
objIDs = append(objIDs, objID)
}
parts, err := treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
require.NoError(t, err)
require.Len(t, parts, 2)
objToDeletes, err := treeService.AddPart(ctx, bktInfo, multipartInfo.ID, &data.PartInfo{
Key: multipartInfo.Key,
UploadID: multipartInfo.UploadID,
Number: 1,
OID: oidtest.ID(),
})
require.NoError(t, err)
require.EqualValues(t, objIDs, objToDeletes, "oids to delete mismatched")
parts, err = treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
require.NoError(t, err)
require.Len(t, parts, 1)
}