From 056f168d77a07feb7717a41290e24b870e9983c4 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 18 Jul 2024 16:40:55 +0300 Subject: [PATCH] [#448] multipart: Support removing duplicated parts Previously after tree split we can have duplicated parts (several objects and tree node referred to the same part number). Some of them couldn't be deleted after abort or compete action. Signed-off-by: Denis Kirillov --- api/data/tree.go | 8 ++ api/handler/encryption_test.go | 15 ++++ api/handler/multipart_upload_test.go | 106 +++++++++++++++++++++++++ api/layer/multipart_upload.go | 73 +++++++++++------ api/layer/tree_mock.go | 24 +++--- api/layer/tree_service.go | 8 +- internal/logs/logs.go | 1 + pkg/service/tree/tree.go | 114 ++++++++++++++++++++------- pkg/service/tree/tree_test.go | 55 +++++++++++++ 9 files changed, 336 insertions(+), 68 deletions(-) diff --git a/api/data/tree.go b/api/data/tree.go index 3fd5d1e..c75d936 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -126,6 +126,14 @@ type PartInfo struct { Created time.Time `json:"created"` } +type PartInfoExtended struct { + PartInfo + + // Timestamp is used to find the latest version of part info in case of tree split + // when there are multiple nodes for the same part. + Timestamp uint64 +} + // ToHeaderString form short part representation to use in S3-Completed-Parts header. func (p *PartInfo) ToHeaderString() string { // ETag value contains SHA256 checksum which is used while getting object parts attributes. diff --git a/api/handler/encryption_test.go b/api/handler/encryption_test.go index e129bf2..68f76e9 100644 --- a/api/handler/encryption_test.go +++ b/api/handler/encryption_test.go @@ -288,6 +288,21 @@ func completeMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID return w } +func abortMultipartUpload(hc *handlerContext, bktName, objName, uploadID string) { + w := abortMultipartUploadBase(hc, bktName, objName, uploadID) + assertStatus(hc.t, w, http.StatusNoContent) +} + +func abortMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID string) *httptest.ResponseRecorder { + query := make(url.Values) + query.Set(uploadIDQuery, uploadID) + + w, r := prepareTestFullRequest(hc, bktName, objName, query, nil) + hc.Handler().AbortMultipartUploadHandler(w, r) + + return w +} + func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) { return uploadPartBase(hc, bktName, objName, true, uploadID, num, size) } diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go index ac6b0a8..d1b4a93 100644 --- a/api/handler/multipart_upload_test.go +++ b/api/handler/multipart_upload_test.go @@ -17,6 +17,10 @@ import ( s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" "github.com/stretchr/testify/require" ) @@ -122,6 +126,108 @@ func TestMultipartReUploadPart(t *testing.T) { equalDataSlices(t, append(data1, data2...), data) } +func TestMultipartRemovePartsSplit(t *testing.T) { + bktName, objName := "bucket-to-upload-part", "object-multipart" + partSize := 8 + + t.Run("reupload part", func(t *testing.T) { + hc := prepareHandlerContext(t) + bktInfo := createTestBucket(hc, bktName) + uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{}) + + uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + + multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID) + require.NoError(t, err) + + objID := oidtest.ID() + _, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{ + "Number": "1", + "OID": objID.EncodeToString(), + "Owner": usertest.ID().EncodeToString(), + "ETag": "etag", + }) + require.NoError(t, err) + + hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New()) + require.Len(t, hc.tp.Objects(), 2) + + list := listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK) + require.Len(t, list.Parts, 1) + require.Equal(t, `"etag"`, list.Parts[0].ETag) + + etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + list = listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK) + require.Len(t, list.Parts, 1) + require.Equal(t, etag1, list.Parts[0].ETag) + + require.Len(t, hc.tp.Objects(), 1) + }) + + t.Run("abort multipart", func(t *testing.T) { + hc := prepareHandlerContext(t) + bktInfo := createTestBucket(hc, bktName) + uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{}) + + uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + + multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID) + require.NoError(t, err) + + objID := oidtest.ID() + _, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{ + "Number": "1", + "OID": objID.EncodeToString(), + "Owner": usertest.ID().EncodeToString(), + "ETag": "etag", + }) + require.NoError(t, err) + + hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New()) + require.Len(t, hc.tp.Objects(), 2) + + abortMultipartUpload(hc, bktName, objName, uploadInfo.UploadID) + require.Empty(t, hc.tp.Objects()) + }) + + t.Run("complete multipart", func(t *testing.T) { + hc := prepareHandlerContext(t) + bktInfo := createTestBucket(hc, bktName) + uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{}) + + etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + + multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID) + require.NoError(t, err) + + objID := oidtest.ID() + _, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{ + "Number": "1", + "OID": objID.EncodeToString(), + "Owner": usertest.ID().EncodeToString(), + "ETag": "etag", + }) + require.NoError(t, err) + + hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New()) + require.Len(t, hc.tp.Objects(), 2) + + completeMultipartUpload(hc, bktName, objName, uploadInfo.UploadID, []string{etag1}) + require.Falsef(t, containsOID(hc.tp.Objects(), objID), "frostfs contains '%s' object, but shouldn't", objID) + }) +} + +func containsOID(objects []*object.Object, objID oid.ID) bool { + for _, o := range objects { + oID, _ := o.ID() + if oID.Equals(objID) { + return true + } + } + + return false +} + func TestListMultipartUploads(t *testing.T) { hc := prepareHandlerContext(t) diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 7f97d3b..c99173d 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -290,16 +290,18 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf MD5: hex.EncodeToString(createdObj.MD5Sum), } - oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) + oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove) if err != nil && !oldPartIDNotFound { return nil, err } if !oldPartIDNotFound { - if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { - n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err), - zap.String("cid", bktInfo.CID.EncodeToString()), - zap.String("oid", oldPartID.EncodeToString())) + for _, oldPartID := range oldPartIDs { + if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { + n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err), + zap.String("cid", bktInfo.CID.EncodeToString()), + zap.String("oid", oldPartID.EncodeToString())) + } } } @@ -385,16 +387,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var multipartObjetSize uint64 var encMultipartObjectSize uint64 - parts := make([]*data.PartInfo, 0, len(p.Parts)) + parts := make([]*data.PartInfoExtended, 0, len(p.Parts)) var completedPartsHeader strings.Builder md5Hash := md5.New() for i, part := range p.Parts { - partInfo := partsInfo[part.PartNumber] - if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) { + partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled()) + if partInfo == nil { return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber) } - delete(partsInfo, part.PartNumber) // for the last part we have no minimum size limit if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize { @@ -475,14 +476,16 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var addr oid.Address addr.SetContainer(p.Info.Bkt.CID) - for _, partInfo := range partsInfo { - if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { - n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart, - zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID), - zap.Error(err)) + for _, prts := range partsInfo { + for _, partInfo := range prts { + if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { + n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart, + zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID), + zap.Error(err)) + } + addr.SetObject(partInfo.OID) + n.cache.DeleteObject(addr) } - addr.SetObject(partInfo.OID) - n.cache.DeleteObject(addr) } return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo) @@ -554,10 +557,12 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e return err } - for _, info := range parts { - if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { - n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), - zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) + for _, infos := range parts { + for _, info := range infos { + if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { + n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), + zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) + } } } @@ -581,7 +586,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn parts := make([]*Part, 0, len(partsInfo)) - for _, partInfo := range partsInfo { + for _, infos := range partsInfo { + sort.Slice(infos, func(i, j int) bool { + return infos[i].Timestamp < infos[j].Timestamp + }) + + partInfo := infos[len(infos)-1] parts = append(parts, &Part{ ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())), LastModified: partInfo.Created.UTC().Format(time.RFC3339), @@ -618,7 +628,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn return &res, nil } -func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) { +type PartsInfo map[int][]*data.PartInfoExtended + +func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended { + parts := p[part] + + for i, info := range parts { + if info.GetETag(md5Enabled) == etag { + p[part] = append(parts[:i], parts[i+1:]...) + return info + } + } + + return nil +} + +func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID) if err != nil { if errors.Is(err, ErrNodeNotFound) { @@ -632,11 +657,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data. return nil, nil, err } - res := make(map[int]*data.PartInfo, len(parts)) + res := make(map[int][]*data.PartInfoExtended, len(parts)) partsNumbers := make([]int, len(parts)) oids := make([]string, len(parts)) for i, part := range parts { - res[part.Number] = part + res[part.Number] = append(res[part.Number], part) partsNumbers[i] = part.Number oids[i] = part.OID.EncodeToString() } diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index b3b56b2..8a85ffb 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -6,6 +6,7 @@ import ( "io" "sort" "strings" + "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -33,7 +34,7 @@ type TreeServiceMock struct { locks map[string]map[uint64]*data.LockInfo tags map[string]map[uint64]map[string]string multiparts map[string]map[string][]*data.MultipartInfo - parts map[string]map[int]*data.PartInfo + parts map[string]map[int]*data.PartInfoExtended } func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) { @@ -92,7 +93,7 @@ func NewTreeService() *TreeServiceMock { locks: make(map[string]map[uint64]*data.LockInfo), tags: make(map[string]map[uint64]map[string]string), multiparts: make(map[string]map[string][]*data.MultipartInfo), - parts: make(map[string]map[int]*data.PartInfo), + parts: make(map[string]map[int]*data.PartInfoExtended), } } @@ -346,28 +347,31 @@ func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, bktInfo *data.Bu return nil, ErrNodeNotFound } -func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { +func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) { multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID) if err != nil { - return oid.ID{}, err + return nil, err } if multipartInfo.ID != multipartNodeID { - return oid.ID{}, fmt.Errorf("invalid multipart info id") + return nil, fmt.Errorf("invalid multipart info id") } partsMap, ok := t.parts[info.UploadID] if !ok { - partsMap = make(map[int]*data.PartInfo) + partsMap = make(map[int]*data.PartInfoExtended) } - partsMap[info.Number] = info + partsMap[info.Number] = &data.PartInfoExtended{ + PartInfo: *info, + Timestamp: uint64(time.Now().UnixMicro()), + } t.parts[info.UploadID] = partsMap - return oid.ID{}, nil + return nil, nil } -func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { +func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) { cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] var foundMultipart *data.MultipartInfo @@ -387,7 +391,7 @@ LOOP: } partsMap := t.parts[foundMultipart.UploadID] - result := make([]*data.PartInfo, 0, len(partsMap)) + result := make([]*data.PartInfoExtended, 0, len(partsMap)) for _, part := range partsMap { result = append(result, part) } diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 5c74f64..5a2047a 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -57,11 +57,11 @@ type TreeService interface { GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) // AddPart puts a node to a system tree as a child of appropriate multipart upload - // and returns objectID of a previous part which must be deleted in FrostFS. + // and returns objectIDs of a previous part/s which must be deleted in FrostFS. // - // If object id to remove is not found returns ErrNoNodeToRemove error. - AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) - GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) + // If object ids to remove is not found returns ErrNoNodeToRemove error. + AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) + GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index a95a8f6..f78180a 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -161,4 +161,5 @@ const ( WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped" WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped" WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped" + FailedToRemoveOldPartNode = "failed to remove old part node" ) diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 2329179..c89d6af 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -156,6 +156,10 @@ type NodeResponse interface { } func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { + if err := validateNodeResponse(nodeInfo); err != nil { + return nil, err + } + tNode := &treeNode{ ID: nodeInfo.GetNodeID(), ParentID: nodeInfo.GetParentID(), @@ -163,14 +167,6 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { Meta: make(map[string]string, len(nodeInfo.GetMeta())), } - if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 { - return nil, errors.New("invalid tree node: missing id") - } - - if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) { - return nil, errors.New("invalid tree node: length multiple ids mismatch") - } - for _, kv := range nodeInfo.GetMeta() { switch kv.GetKey() { case oidKV: @@ -377,6 +373,10 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr } func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) { + if err := validateNodeResponse(node); err != nil { + return nil, err + } + if len(node.GetNodeID()) != 1 { return nil, errors.New("invalid multipart node: this is split node") } @@ -426,10 +426,36 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, return multipartInfo, nil } -func newPartInfo(node NodeResponse) (*data.PartInfo, error) { - var err error - partInfo := &data.PartInfo{} +func validateNodeResponse(node NodeResponse) error { + ids := node.GetNodeID() + parentIDs := node.GetParentID() + timestamps := node.GetTimestamp() + if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 { + return errors.New("invalid node response: missing ids") + } + + if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) { + return errors.New("invalid node response: multiple ids length mismatch") + } + + return nil +} + +func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) { + if err := validateNodeResponse(node); err != nil { + return nil, err + } + + if len(node.GetNodeID()) != 1 { + return nil, errors.New("invalid part node: this is split node") + } + + partInfo := &data.PartInfoExtended{ + Timestamp: node.GetTimestamp()[0], + } + + var err error for _, kv := range node.GetMeta() { value := string(kv.GetValue()) switch kv.GetKey() { @@ -1397,10 +1423,10 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, return nil, layer.ErrNodeNotFound } -func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { +func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) { parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false) if err != nil { - return oid.ID{}, err + return nil, err } meta := map[string]string{ @@ -1412,48 +1438,76 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN md5KV: info.MD5, } + objToDelete := make([]oid.ID, 0, 1) + partsToDelete := make([]uint64, 0, 1) + var ( + latestPartID uint64 + maxTimestamp uint64 + ) + + multiNodeID := MultiID{multipartNodeID} + for _, part := range parts { - if len(part.GetNodeID()) != 1 { - // multipart parts nodeID shouldn't have multiple values - c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts, - zap.String("key", info.Key), - zap.String("upload id", info.UploadID), - zap.Uint64("multipart node id ", multipartNodeID), - zap.Uint64s("node ids", part.GetNodeID())) - continue - } - nodeID := part.GetNodeID()[0] - if nodeID == multipartNodeID { + if multiNodeID.Equal(part.GetNodeID()) { continue } + partInfo, err := newPartInfo(part) if err != nil { c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo, zap.String("key", info.Key), zap.String("upload id", info.UploadID), zap.Uint64("multipart node id ", multipartNodeID), + zap.Uint64s("id", part.GetNodeID()), zap.Error(err)) continue } if partInfo.Number == info.Number { - return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta) + nodeID := part.GetNodeID()[0] + objToDelete = append(objToDelete, partInfo.OID) + partsToDelete = append(partsToDelete, nodeID) + timestamp := partInfo.Timestamp + if timestamp > maxTimestamp { + maxTimestamp = timestamp + latestPartID = nodeID + } } } - if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { - return oid.ID{}, err + if len(objToDelete) != 0 { + if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil { + return nil, fmt.Errorf("move part node: %w", err) + } + + for _, nodeID := range partsToDelete { + if nodeID == latestPartID { + continue + } + if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil { + c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode, + zap.String("key", info.Key), + zap.String("upload id", info.UploadID), + zap.Uint64("id", nodeID)) + } + } + + return objToDelete, nil } - return oid.ID{}, layer.ErrNoNodeToRemove + if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { + return nil, err + } + + return nil, layer.ErrNoNodeToRemove } -func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { +func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) { parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false) if err != nil { return nil, err } - result := make([]*data.PartInfo, 0, len(parts)) + result := make([]*data.PartInfoExtended, 0, len(parts)) for _, part := range parts { if len(part.GetNodeID()) != 1 { // multipart parts nodeID shouldn't have multiple values diff --git a/pkg/service/tree/tree_test.go b/pkg/service/tree/tree_test.go index 973eb12..058e46a 100644 --- a/pkg/service/tree/tree_test.go +++ b/pkg/service/tree/tree_test.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -304,3 +305,57 @@ func TestGetLatestNode(t *testing.T) { }) } } + +func TestSplitTreeMultiparts(t *testing.T) { + ctx := context.Background() + + memCli, err := NewTreeServiceClientMemory() + require.NoError(t, err) + treeService := NewTree(memCli, zaptest.NewLogger(t)) + + bktInfo := &data.BucketInfo{ + CID: cidtest.ID(), + } + + multipartInfo := &data.MultipartInfo{ + Key: "multipart", + UploadID: "id", + Meta: map[string]string{}, + Owner: usertest.ID(), + } + + err = treeService.CreateMultipartUpload(ctx, bktInfo, multipartInfo) + require.NoError(t, err) + + multipartInfo, err = treeService.GetMultipartUpload(ctx, bktInfo, multipartInfo.Key, multipartInfo.UploadID) + require.NoError(t, err) + + var objIDs []oid.ID + for i := 0; i < 2; i++ { + objID := oidtest.ID() + _, err = memCli.AddNode(ctx, bktInfo, systemTree, multipartInfo.ID, map[string]string{ + partNumberKV: "1", + oidKV: objID.EncodeToString(), + ownerKV: usertest.ID().EncodeToString(), + }) + require.NoError(t, err) + objIDs = append(objIDs, objID) + } + + parts, err := treeService.GetParts(ctx, bktInfo, multipartInfo.ID) + require.NoError(t, err) + require.Len(t, parts, 2) + + objToDeletes, err := treeService.AddPart(ctx, bktInfo, multipartInfo.ID, &data.PartInfo{ + Key: multipartInfo.Key, + UploadID: multipartInfo.UploadID, + Number: 1, + OID: oidtest.ID(), + }) + require.NoError(t, err) + require.EqualValues(t, objIDs, objToDeletes, "oids to delete mismatched") + + parts, err = treeService.GetParts(ctx, bktInfo, multipartInfo.ID) + require.NoError(t, err) + require.Len(t, parts, 1) +}