From 5d0eef5a5612be20d844810de73743532d2df739 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 18 Jul 2024 16:40:55 +0300 Subject: [PATCH] [#XX] multipart: Support removing duplicated parts Previously after tree split we can have duplicated parts (several objects and tree node referred to the same part number). Some of them couldn't be deleted after abort or compete action. Signed-off-by: Denis Kirillov --- api/data/tree.go | 5 ++ api/handler/encryption_test.go | 15 ++++ api/handler/multipart_upload_test.go | 106 +++++++++++++++++++++++++ api/layer/multipart_upload.go | 73 +++++++++++------ api/layer/tree_mock.go | 24 +++--- api/layer/tree_service.go | 8 +- internal/logs/logs.go | 1 + pkg/service/tree/tree.go | 114 ++++++++++++++++++++------- pkg/service/tree/tree_test.go | 55 +++++++++++++ 9 files changed, 333 insertions(+), 68 deletions(-) diff --git a/api/data/tree.go b/api/data/tree.go index 0164062..f655923 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -124,6 +124,11 @@ type PartInfo struct { Created time.Time `json:"created"` } +type PartInfoExtended struct { + PartInfo + Timestamp uint64 +} + // ToHeaderString form short part representation to use in S3-Completed-Parts header. func (p *PartInfo) ToHeaderString() string { // ETag value contains SHA256 checksum which is used while getting object parts attributes. diff --git a/api/handler/encryption_test.go b/api/handler/encryption_test.go index e129bf2..68f76e9 100644 --- a/api/handler/encryption_test.go +++ b/api/handler/encryption_test.go @@ -288,6 +288,21 @@ func completeMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID return w } +func abortMultipartUpload(hc *handlerContext, bktName, objName, uploadID string) { + w := abortMultipartUploadBase(hc, bktName, objName, uploadID) + assertStatus(hc.t, w, http.StatusNoContent) +} + +func abortMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID string) *httptest.ResponseRecorder { + query := make(url.Values) + query.Set(uploadIDQuery, uploadID) + + w, r := prepareTestFullRequest(hc, bktName, objName, query, nil) + hc.Handler().AbortMultipartUploadHandler(w, r) + + return w +} + func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) { return uploadPartBase(hc, bktName, objName, true, uploadID, num, size) } diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go index aef7347..62e9ead 100644 --- a/api/handler/multipart_upload_test.go +++ b/api/handler/multipart_upload_test.go @@ -17,6 +17,10 @@ import ( s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" "github.com/stretchr/testify/require" ) @@ -122,6 +126,108 @@ func TestMultipartReUploadPart(t *testing.T) { equalDataSlices(t, append(data1, data2...), data) } +func TestMultipartRemovePartsSplit(t *testing.T) { + bktName, objName := "bucket-to-upload-part", "object-multipart" + partSize := 8 + + t.Run("reupload part", func(t *testing.T) { + hc := prepareHandlerContext(t) + bktInfo := createTestBucket(hc, bktName) + uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{}) + + uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + + multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID) + require.NoError(t, err) + + objID := oidtest.ID() + _, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{ + "Number": "1", + "OID": objID.EncodeToString(), + "Owner": usertest.ID().EncodeToString(), + "ETag": "etag", + }) + require.NoError(t, err) + + hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New()) + require.Len(t, hc.tp.Objects(), 2) + + list := listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK) + require.Len(t, list.Parts, 1) + require.Equal(t, `"etag"`, list.Parts[0].ETag) + + etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + list = listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK) + require.Len(t, list.Parts, 1) + require.Equal(t, etag1, list.Parts[0].ETag) + + require.Len(t, hc.tp.Objects(), 1) + }) + + t.Run("abort multipart", func(t *testing.T) { + hc := prepareHandlerContext(t) + bktInfo := createTestBucket(hc, bktName) + uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{}) + + uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + + multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID) + require.NoError(t, err) + + objID := oidtest.ID() + _, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{ + "Number": "1", + "OID": objID.EncodeToString(), + "Owner": usertest.ID().EncodeToString(), + "ETag": "etag", + }) + require.NoError(t, err) + + hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New()) + require.Len(t, hc.tp.Objects(), 2) + + abortMultipartUpload(hc, bktName, objName, uploadInfo.UploadID) + require.Empty(t, hc.tp.Objects()) + }) + + t.Run("complete multipart", func(t *testing.T) { + hc := prepareHandlerContext(t) + bktInfo := createTestBucket(hc, bktName) + uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{}) + + etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + + multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID) + require.NoError(t, err) + + objID := oidtest.ID() + _, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{ + "Number": "1", + "OID": objID.EncodeToString(), + "Owner": usertest.ID().EncodeToString(), + "ETag": "etag", + }) + require.NoError(t, err) + + hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New()) + require.Len(t, hc.tp.Objects(), 2) + + completeMultipartUpload(hc, bktName, objName, uploadInfo.UploadID, []string{etag1}) + require.Falsef(t, containsOID(hc.tp.Objects(), objID), "frostfs contains '%s' object, but shouldn't", objID) + }) +} + +func containsOID(objects []*object.Object, objID oid.ID) bool { + for _, o := range objects { + oID, _ := o.ID() + if oID.Equals(objID) { + return true + } + } + + return false +} + func TestListMultipartUploads(t *testing.T) { hc := prepareHandlerContext(t) diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index aeb6759..90d3eba 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -284,16 +284,18 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf MD5: hex.EncodeToString(md5Hash), } - oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) + oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove) if err != nil && !oldPartIDNotFound { return nil, err } if !oldPartIDNotFound { - if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { - n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err), - zap.String("cid", bktInfo.CID.EncodeToString()), - zap.String("oid", oldPartID.EncodeToString())) + for _, oldPartID := range oldPartIDs { + if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { + n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err), + zap.String("cid", bktInfo.CID.EncodeToString()), + zap.String("oid", oldPartID.EncodeToString())) + } } } @@ -379,16 +381,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var multipartObjetSize uint64 var encMultipartObjectSize uint64 - parts := make([]*data.PartInfo, 0, len(p.Parts)) + parts := make([]*data.PartInfoExtended, 0, len(p.Parts)) var completedPartsHeader strings.Builder md5Hash := md5.New() for i, part := range p.Parts { - partInfo := partsInfo[part.PartNumber] - if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) { + partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled()) + if partInfo == nil { return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber) } - delete(partsInfo, part.PartNumber) // for the last part we have no minimum size limit if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize { @@ -469,14 +470,16 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var addr oid.Address addr.SetContainer(p.Info.Bkt.CID) - for _, partInfo := range partsInfo { - if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { - n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart, - zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID), - zap.Error(err)) + for _, prts := range partsInfo { + for _, partInfo := range prts { + if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { + n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart, + zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID), + zap.Error(err)) + } + addr.SetObject(partInfo.OID) + n.cache.DeleteObject(addr) } - addr.SetObject(partInfo.OID) - n.cache.DeleteObject(addr) } return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo) @@ -548,10 +551,12 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e return err } - for _, info := range parts { - if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { - n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), - zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) + for _, infos := range parts { + for _, info := range infos { + if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { + n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), + zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) + } } } @@ -575,7 +580,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn parts := make([]*Part, 0, len(partsInfo)) - for _, partInfo := range partsInfo { + for _, infos := range partsInfo { + sort.Slice(infos, func(i, j int) bool { + return infos[i].Timestamp < infos[j].Timestamp + }) + + partInfo := infos[len(infos)-1] parts = append(parts, &Part{ ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())), LastModified: partInfo.Created.UTC().Format(time.RFC3339), @@ -612,7 +622,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn return &res, nil } -func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) { +type PartsInfo map[int][]*data.PartInfoExtended + +func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended { + parts := p[part] + + for i, info := range parts { + if info.GetETag(md5Enabled) == etag { + p[part] = append(parts[:i], parts[i+1:]...) + return info + } + } + + return nil +} + +func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID) if err != nil { if errors.Is(err, ErrNodeNotFound) { @@ -626,11 +651,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data. return nil, nil, err } - res := make(map[int]*data.PartInfo, len(parts)) + res := make(map[int][]*data.PartInfoExtended, len(parts)) partsNumbers := make([]int, len(parts)) oids := make([]string, len(parts)) for i, part := range parts { - res[part.Number] = part + res[part.Number] = append(res[part.Number], part) partsNumbers[i] = part.Number oids[i] = part.OID.EncodeToString() } diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index daac689..d72a036 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -6,6 +6,7 @@ import ( "io" "sort" "strings" + "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -33,7 +34,7 @@ type TreeServiceMock struct { locks map[string]map[uint64]*data.LockInfo tags map[string]map[uint64]map[string]string multiparts map[string]map[string][]*data.MultipartInfo - parts map[string]map[int]*data.PartInfo + parts map[string]map[int]*data.PartInfoExtended } func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) { @@ -92,7 +93,7 @@ func NewTreeService() *TreeServiceMock { locks: make(map[string]map[uint64]*data.LockInfo), tags: make(map[string]map[uint64]map[string]string), multiparts: make(map[string]map[string][]*data.MultipartInfo), - parts: make(map[string]map[int]*data.PartInfo), + parts: make(map[string]map[int]*data.PartInfoExtended), } } @@ -346,28 +347,31 @@ func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, bktInfo *data.Bu return nil, ErrNodeNotFound } -func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { +func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) { multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID) if err != nil { - return oid.ID{}, err + return nil, err } if multipartInfo.ID != multipartNodeID { - return oid.ID{}, fmt.Errorf("invalid multipart info id") + return nil, fmt.Errorf("invalid multipart info id") } partsMap, ok := t.parts[info.UploadID] if !ok { - partsMap = make(map[int]*data.PartInfo) + partsMap = make(map[int]*data.PartInfoExtended) } - partsMap[info.Number] = info + partsMap[info.Number] = &data.PartInfoExtended{ + PartInfo: *info, + Timestamp: uint64(time.Now().UnixMicro()), + } t.parts[info.UploadID] = partsMap - return oid.ID{}, nil + return nil, nil } -func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { +func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) { cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] var foundMultipart *data.MultipartInfo @@ -387,7 +391,7 @@ LOOP: } partsMap := t.parts[foundMultipart.UploadID] - result := make([]*data.PartInfo, 0, len(partsMap)) + result := make([]*data.PartInfoExtended, 0, len(partsMap)) for _, part := range partsMap { result = append(result, part) } diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 9f31051..71b68c1 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -57,11 +57,11 @@ type TreeService interface { GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) // AddPart puts a node to a system tree as a child of appropriate multipart upload - // and returns objectID of a previous part which must be deleted in FrostFS. + // and returns objectIDs of a previous part/s which must be deleted in FrostFS. // - // If object id to remove is not found returns ErrNoNodeToRemove error. - AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) - GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) + // If object ids to remove is not found returns ErrNoNodeToRemove error. + AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) + GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) // Compound methods for optimizations diff --git a/internal/logs/logs.go b/internal/logs/logs.go index d875eb7..3a92543 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -154,4 +154,5 @@ const ( FailedToParsePartInfo = "failed to parse part info" CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info" CloseCredsObjectPayload = "close creds object payload" + FailedToRemoveOldPartNode = "failed to remove old part node" ) diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 259a473..811c8b1 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -152,6 +152,10 @@ type NodeResponse interface { } func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { + if err := validateNodeResponse(nodeInfo); err != nil { + return nil, err + } + tNode := &treeNode{ ID: nodeInfo.GetNodeID(), ParentID: nodeInfo.GetParentID(), @@ -159,14 +163,6 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { Meta: make(map[string]string, len(nodeInfo.GetMeta())), } - if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 { - return nil, errors.New("invalid tree node: missing id") - } - - if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) { - return nil, errors.New("invalid tree node: length multiple ids mismatch") - } - for _, kv := range nodeInfo.GetMeta() { switch kv.GetKey() { case oidKV: @@ -357,6 +353,10 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr } func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) { + if err := validateNodeResponse(node); err != nil { + return nil, err + } + if len(node.GetNodeID()) != 1 { return nil, errors.New("invalid multipart node: this is split node") } @@ -400,10 +400,36 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, return multipartInfo, nil } -func newPartInfo(node NodeResponse) (*data.PartInfo, error) { - var err error - partInfo := &data.PartInfo{} +func validateNodeResponse(node NodeResponse) error { + ids := node.GetNodeID() + parentIDs := node.GetParentID() + timestamps := node.GetTimestamp() + if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 { + return errors.New("invalid node response: missing ids") + } + + if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) { + return errors.New("invalid node response: multiple ids length mismatch") + } + + return nil +} + +func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) { + if err := validateNodeResponse(node); err != nil { + return nil, err + } + + if len(node.GetNodeID()) != 1 { + return nil, errors.New("invalid part node: this is split node") + } + + partInfo := &data.PartInfoExtended{ + Timestamp: node.GetTimestamp()[0], + } + + var err error for _, kv := range node.GetMeta() { value := string(kv.GetValue()) switch kv.GetKey() { @@ -1365,10 +1391,10 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, return nil, layer.ErrNodeNotFound } -func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { +func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) { parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2) if err != nil { - return oid.ID{}, err + return nil, err } meta := map[string]string{ @@ -1380,48 +1406,76 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN md5KV: info.MD5, } + objToDelete := make([]oid.ID, 0, 1) + partsToDelete := make([]uint64, 0, 1) + var ( + latestPartID uint64 + maxTimestamp uint64 + ) + + multiNodeID := MultiID{multipartNodeID} + for _, part := range parts { - if len(part.GetNodeID()) != 1 { - // multipart parts nodeID shouldn't have multiple values - c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts, - zap.String("key", info.Key), - zap.String("upload id", info.UploadID), - zap.Uint64("multipart node id ", multipartNodeID), - zap.Uint64s("node ids", part.GetNodeID())) - continue - } - nodeID := part.GetNodeID()[0] - if nodeID == multipartNodeID { + if multiNodeID.Equal(part.GetNodeID()) { continue } + partInfo, err := newPartInfo(part) if err != nil { c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo, zap.String("key", info.Key), zap.String("upload id", info.UploadID), zap.Uint64("multipart node id ", multipartNodeID), + zap.Uint64s("id", part.GetNodeID()), zap.Error(err)) continue } if partInfo.Number == info.Number { - return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta) + nodeID := part.GetNodeID()[0] + objToDelete = append(objToDelete, partInfo.OID) + partsToDelete = append(partsToDelete, nodeID) + timestamp := partInfo.Timestamp + if timestamp > maxTimestamp { + maxTimestamp = timestamp + latestPartID = nodeID + } } } - if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { - return oid.ID{}, err + if len(objToDelete) != 0 { + if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil { + return nil, fmt.Errorf("move part node: %w", err) + } + + for _, nodeID := range partsToDelete { + if nodeID == latestPartID { + continue + } + if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil { + c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode, + zap.String("key", info.Key), + zap.String("upload id", info.UploadID), + zap.Uint64("id", nodeID)) + } + } + + return objToDelete, nil } - return oid.ID{}, layer.ErrNoNodeToRemove + if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { + return nil, err + } + + return nil, layer.ErrNoNodeToRemove } -func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { +func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) { parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2) if err != nil { return nil, err } - result := make([]*data.PartInfo, 0, len(parts)) + result := make([]*data.PartInfoExtended, 0, len(parts)) for _, part := range parts { if len(part.GetNodeID()) != 1 { // multipart parts nodeID shouldn't have multiple values diff --git a/pkg/service/tree/tree_test.go b/pkg/service/tree/tree_test.go index 973eb12..058e46a 100644 --- a/pkg/service/tree/tree_test.go +++ b/pkg/service/tree/tree_test.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -304,3 +305,57 @@ func TestGetLatestNode(t *testing.T) { }) } } + +func TestSplitTreeMultiparts(t *testing.T) { + ctx := context.Background() + + memCli, err := NewTreeServiceClientMemory() + require.NoError(t, err) + treeService := NewTree(memCli, zaptest.NewLogger(t)) + + bktInfo := &data.BucketInfo{ + CID: cidtest.ID(), + } + + multipartInfo := &data.MultipartInfo{ + Key: "multipart", + UploadID: "id", + Meta: map[string]string{}, + Owner: usertest.ID(), + } + + err = treeService.CreateMultipartUpload(ctx, bktInfo, multipartInfo) + require.NoError(t, err) + + multipartInfo, err = treeService.GetMultipartUpload(ctx, bktInfo, multipartInfo.Key, multipartInfo.UploadID) + require.NoError(t, err) + + var objIDs []oid.ID + for i := 0; i < 2; i++ { + objID := oidtest.ID() + _, err = memCli.AddNode(ctx, bktInfo, systemTree, multipartInfo.ID, map[string]string{ + partNumberKV: "1", + oidKV: objID.EncodeToString(), + ownerKV: usertest.ID().EncodeToString(), + }) + require.NoError(t, err) + objIDs = append(objIDs, objID) + } + + parts, err := treeService.GetParts(ctx, bktInfo, multipartInfo.ID) + require.NoError(t, err) + require.Len(t, parts, 2) + + objToDeletes, err := treeService.AddPart(ctx, bktInfo, multipartInfo.ID, &data.PartInfo{ + Key: multipartInfo.Key, + UploadID: multipartInfo.UploadID, + Number: 1, + OID: oidtest.ID(), + }) + require.NoError(t, err) + require.EqualValues(t, objIDs, objToDeletes, "oids to delete mismatched") + + parts, err = treeService.GetParts(ctx, bktInfo, multipartInfo.ID) + require.NoError(t, err) + require.Len(t, parts, 1) +}