diff --git a/api/data/tree.go b/api/data/tree.go index 3fd5d1ea..c75d936b 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -126,6 +126,14 @@ type PartInfo struct { Created time.Time `json:"created"` } +type PartInfoExtended struct { + PartInfo + + // Timestamp is used to find the latest version of part info in case of tree split + // when there are multiple nodes for the same part. + Timestamp uint64 +} + // ToHeaderString form short part representation to use in S3-Completed-Parts header. func (p *PartInfo) ToHeaderString() string { // ETag value contains SHA256 checksum which is used while getting object parts attributes. diff --git a/api/handler/encryption_test.go b/api/handler/encryption_test.go index e129bf24..68f76e93 100644 --- a/api/handler/encryption_test.go +++ b/api/handler/encryption_test.go @@ -288,6 +288,21 @@ func completeMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID return w } +func abortMultipartUpload(hc *handlerContext, bktName, objName, uploadID string) { + w := abortMultipartUploadBase(hc, bktName, objName, uploadID) + assertStatus(hc.t, w, http.StatusNoContent) +} + +func abortMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID string) *httptest.ResponseRecorder { + query := make(url.Values) + query.Set(uploadIDQuery, uploadID) + + w, r := prepareTestFullRequest(hc, bktName, objName, query, nil) + hc.Handler().AbortMultipartUploadHandler(w, r) + + return w +} + func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) { return uploadPartBase(hc, bktName, objName, true, uploadID, num, size) } diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go index ac6b0a80..d1b4a936 100644 --- a/api/handler/multipart_upload_test.go +++ b/api/handler/multipart_upload_test.go @@ -17,6 +17,10 @@ import ( s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" "github.com/stretchr/testify/require" ) @@ -122,6 +126,108 @@ func TestMultipartReUploadPart(t *testing.T) { equalDataSlices(t, append(data1, data2...), data) } +func TestMultipartRemovePartsSplit(t *testing.T) { + bktName, objName := "bucket-to-upload-part", "object-multipart" + partSize := 8 + + t.Run("reupload part", func(t *testing.T) { + hc := prepareHandlerContext(t) + bktInfo := createTestBucket(hc, bktName) + uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{}) + + uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + + multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID) + require.NoError(t, err) + + objID := oidtest.ID() + _, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{ + "Number": "1", + "OID": objID.EncodeToString(), + "Owner": usertest.ID().EncodeToString(), + "ETag": "etag", + }) + require.NoError(t, err) + + hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New()) + require.Len(t, hc.tp.Objects(), 2) + + list := listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK) + require.Len(t, list.Parts, 1) + require.Equal(t, `"etag"`, list.Parts[0].ETag) + + etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + list = listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK) + require.Len(t, list.Parts, 1) + require.Equal(t, etag1, list.Parts[0].ETag) + + require.Len(t, hc.tp.Objects(), 1) + }) + + t.Run("abort multipart", func(t *testing.T) { + hc := prepareHandlerContext(t) + bktInfo := createTestBucket(hc, bktName) + uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{}) + + uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + + multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID) + require.NoError(t, err) + + objID := oidtest.ID() + _, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{ + "Number": "1", + "OID": objID.EncodeToString(), + "Owner": usertest.ID().EncodeToString(), + "ETag": "etag", + }) + require.NoError(t, err) + + hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New()) + require.Len(t, hc.tp.Objects(), 2) + + abortMultipartUpload(hc, bktName, objName, uploadInfo.UploadID) + require.Empty(t, hc.tp.Objects()) + }) + + t.Run("complete multipart", func(t *testing.T) { + hc := prepareHandlerContext(t) + bktInfo := createTestBucket(hc, bktName) + uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{}) + + etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize) + + multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID) + require.NoError(t, err) + + objID := oidtest.ID() + _, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{ + "Number": "1", + "OID": objID.EncodeToString(), + "Owner": usertest.ID().EncodeToString(), + "ETag": "etag", + }) + require.NoError(t, err) + + hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New()) + require.Len(t, hc.tp.Objects(), 2) + + completeMultipartUpload(hc, bktName, objName, uploadInfo.UploadID, []string{etag1}) + require.Falsef(t, containsOID(hc.tp.Objects(), objID), "frostfs contains '%s' object, but shouldn't", objID) + }) +} + +func containsOID(objects []*object.Object, objID oid.ID) bool { + for _, o := range objects { + oID, _ := o.ID() + if oID.Equals(objID) { + return true + } + } + + return false +} + func TestListMultipartUploads(t *testing.T) { hc := prepareHandlerContext(t) diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 7f97d3b1..c99173d0 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -290,16 +290,18 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf MD5: hex.EncodeToString(createdObj.MD5Sum), } - oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) + oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove) if err != nil && !oldPartIDNotFound { return nil, err } if !oldPartIDNotFound { - if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { - n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err), - zap.String("cid", bktInfo.CID.EncodeToString()), - zap.String("oid", oldPartID.EncodeToString())) + for _, oldPartID := range oldPartIDs { + if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { + n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err), + zap.String("cid", bktInfo.CID.EncodeToString()), + zap.String("oid", oldPartID.EncodeToString())) + } } } @@ -385,16 +387,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var multipartObjetSize uint64 var encMultipartObjectSize uint64 - parts := make([]*data.PartInfo, 0, len(p.Parts)) + parts := make([]*data.PartInfoExtended, 0, len(p.Parts)) var completedPartsHeader strings.Builder md5Hash := md5.New() for i, part := range p.Parts { - partInfo := partsInfo[part.PartNumber] - if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) { + partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled()) + if partInfo == nil { return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber) } - delete(partsInfo, part.PartNumber) // for the last part we have no minimum size limit if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize { @@ -475,14 +476,16 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var addr oid.Address addr.SetContainer(p.Info.Bkt.CID) - for _, partInfo := range partsInfo { - if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { - n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart, - zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID), - zap.Error(err)) + for _, prts := range partsInfo { + for _, partInfo := range prts { + if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { + n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart, + zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID), + zap.Error(err)) + } + addr.SetObject(partInfo.OID) + n.cache.DeleteObject(addr) } - addr.SetObject(partInfo.OID) - n.cache.DeleteObject(addr) } return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo) @@ -554,10 +557,12 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e return err } - for _, info := range parts { - if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { - n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), - zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) + for _, infos := range parts { + for _, info := range infos { + if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { + n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), + zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) + } } } @@ -581,7 +586,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn parts := make([]*Part, 0, len(partsInfo)) - for _, partInfo := range partsInfo { + for _, infos := range partsInfo { + sort.Slice(infos, func(i, j int) bool { + return infos[i].Timestamp < infos[j].Timestamp + }) + + partInfo := infos[len(infos)-1] parts = append(parts, &Part{ ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())), LastModified: partInfo.Created.UTC().Format(time.RFC3339), @@ -618,7 +628,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn return &res, nil } -func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) { +type PartsInfo map[int][]*data.PartInfoExtended + +func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended { + parts := p[part] + + for i, info := range parts { + if info.GetETag(md5Enabled) == etag { + p[part] = append(parts[:i], parts[i+1:]...) + return info + } + } + + return nil +} + +func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID) if err != nil { if errors.Is(err, ErrNodeNotFound) { @@ -632,11 +657,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data. return nil, nil, err } - res := make(map[int]*data.PartInfo, len(parts)) + res := make(map[int][]*data.PartInfoExtended, len(parts)) partsNumbers := make([]int, len(parts)) oids := make([]string, len(parts)) for i, part := range parts { - res[part.Number] = part + res[part.Number] = append(res[part.Number], part) partsNumbers[i] = part.Number oids[i] = part.OID.EncodeToString() } diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index b3b56b28..8a85ffbb 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -6,6 +6,7 @@ import ( "io" "sort" "strings" + "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -33,7 +34,7 @@ type TreeServiceMock struct { locks map[string]map[uint64]*data.LockInfo tags map[string]map[uint64]map[string]string multiparts map[string]map[string][]*data.MultipartInfo - parts map[string]map[int]*data.PartInfo + parts map[string]map[int]*data.PartInfoExtended } func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) { @@ -92,7 +93,7 @@ func NewTreeService() *TreeServiceMock { locks: make(map[string]map[uint64]*data.LockInfo), tags: make(map[string]map[uint64]map[string]string), multiparts: make(map[string]map[string][]*data.MultipartInfo), - parts: make(map[string]map[int]*data.PartInfo), + parts: make(map[string]map[int]*data.PartInfoExtended), } } @@ -346,28 +347,31 @@ func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, bktInfo *data.Bu return nil, ErrNodeNotFound } -func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { +func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) { multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID) if err != nil { - return oid.ID{}, err + return nil, err } if multipartInfo.ID != multipartNodeID { - return oid.ID{}, fmt.Errorf("invalid multipart info id") + return nil, fmt.Errorf("invalid multipart info id") } partsMap, ok := t.parts[info.UploadID] if !ok { - partsMap = make(map[int]*data.PartInfo) + partsMap = make(map[int]*data.PartInfoExtended) } - partsMap[info.Number] = info + partsMap[info.Number] = &data.PartInfoExtended{ + PartInfo: *info, + Timestamp: uint64(time.Now().UnixMicro()), + } t.parts[info.UploadID] = partsMap - return oid.ID{}, nil + return nil, nil } -func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { +func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) { cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] var foundMultipart *data.MultipartInfo @@ -387,7 +391,7 @@ LOOP: } partsMap := t.parts[foundMultipart.UploadID] - result := make([]*data.PartInfo, 0, len(partsMap)) + result := make([]*data.PartInfoExtended, 0, len(partsMap)) for _, part := range partsMap { result = append(result, part) } diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 5c74f64a..5a2047ad 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -57,11 +57,11 @@ type TreeService interface { GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) // AddPart puts a node to a system tree as a child of appropriate multipart upload - // and returns objectID of a previous part which must be deleted in FrostFS. + // and returns objectIDs of a previous part/s which must be deleted in FrostFS. // - // If object id to remove is not found returns ErrNoNodeToRemove error. - AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) - GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) + // If object ids to remove is not found returns ErrNoNodeToRemove error. + AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) + GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index a95a8f69..f78180a2 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -161,4 +161,5 @@ const ( WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped" WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped" WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped" + FailedToRemoveOldPartNode = "failed to remove old part node" ) diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 2329179b..c89d6af1 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -156,6 +156,10 @@ type NodeResponse interface { } func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { + if err := validateNodeResponse(nodeInfo); err != nil { + return nil, err + } + tNode := &treeNode{ ID: nodeInfo.GetNodeID(), ParentID: nodeInfo.GetParentID(), @@ -163,14 +167,6 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { Meta: make(map[string]string, len(nodeInfo.GetMeta())), } - if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 { - return nil, errors.New("invalid tree node: missing id") - } - - if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) { - return nil, errors.New("invalid tree node: length multiple ids mismatch") - } - for _, kv := range nodeInfo.GetMeta() { switch kv.GetKey() { case oidKV: @@ -377,6 +373,10 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr } func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) { + if err := validateNodeResponse(node); err != nil { + return nil, err + } + if len(node.GetNodeID()) != 1 { return nil, errors.New("invalid multipart node: this is split node") } @@ -426,10 +426,36 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, return multipartInfo, nil } -func newPartInfo(node NodeResponse) (*data.PartInfo, error) { - var err error - partInfo := &data.PartInfo{} +func validateNodeResponse(node NodeResponse) error { + ids := node.GetNodeID() + parentIDs := node.GetParentID() + timestamps := node.GetTimestamp() + if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 { + return errors.New("invalid node response: missing ids") + } + + if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) { + return errors.New("invalid node response: multiple ids length mismatch") + } + + return nil +} + +func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) { + if err := validateNodeResponse(node); err != nil { + return nil, err + } + + if len(node.GetNodeID()) != 1 { + return nil, errors.New("invalid part node: this is split node") + } + + partInfo := &data.PartInfoExtended{ + Timestamp: node.GetTimestamp()[0], + } + + var err error for _, kv := range node.GetMeta() { value := string(kv.GetValue()) switch kv.GetKey() { @@ -1397,10 +1423,10 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, return nil, layer.ErrNodeNotFound } -func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { +func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) { parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false) if err != nil { - return oid.ID{}, err + return nil, err } meta := map[string]string{ @@ -1412,48 +1438,76 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN md5KV: info.MD5, } + objToDelete := make([]oid.ID, 0, 1) + partsToDelete := make([]uint64, 0, 1) + var ( + latestPartID uint64 + maxTimestamp uint64 + ) + + multiNodeID := MultiID{multipartNodeID} + for _, part := range parts { - if len(part.GetNodeID()) != 1 { - // multipart parts nodeID shouldn't have multiple values - c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts, - zap.String("key", info.Key), - zap.String("upload id", info.UploadID), - zap.Uint64("multipart node id ", multipartNodeID), - zap.Uint64s("node ids", part.GetNodeID())) - continue - } - nodeID := part.GetNodeID()[0] - if nodeID == multipartNodeID { + if multiNodeID.Equal(part.GetNodeID()) { continue } + partInfo, err := newPartInfo(part) if err != nil { c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo, zap.String("key", info.Key), zap.String("upload id", info.UploadID), zap.Uint64("multipart node id ", multipartNodeID), + zap.Uint64s("id", part.GetNodeID()), zap.Error(err)) continue } if partInfo.Number == info.Number { - return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta) + nodeID := part.GetNodeID()[0] + objToDelete = append(objToDelete, partInfo.OID) + partsToDelete = append(partsToDelete, nodeID) + timestamp := partInfo.Timestamp + if timestamp > maxTimestamp { + maxTimestamp = timestamp + latestPartID = nodeID + } } } - if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { - return oid.ID{}, err + if len(objToDelete) != 0 { + if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil { + return nil, fmt.Errorf("move part node: %w", err) + } + + for _, nodeID := range partsToDelete { + if nodeID == latestPartID { + continue + } + if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil { + c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode, + zap.String("key", info.Key), + zap.String("upload id", info.UploadID), + zap.Uint64("id", nodeID)) + } + } + + return objToDelete, nil } - return oid.ID{}, layer.ErrNoNodeToRemove + if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { + return nil, err + } + + return nil, layer.ErrNoNodeToRemove } -func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { +func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) { parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false) if err != nil { return nil, err } - result := make([]*data.PartInfo, 0, len(parts)) + result := make([]*data.PartInfoExtended, 0, len(parts)) for _, part := range parts { if len(part.GetNodeID()) != 1 { // multipart parts nodeID shouldn't have multiple values diff --git a/pkg/service/tree/tree_test.go b/pkg/service/tree/tree_test.go index 973eb129..058e46a9 100644 --- a/pkg/service/tree/tree_test.go +++ b/pkg/service/tree/tree_test.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -304,3 +305,57 @@ func TestGetLatestNode(t *testing.T) { }) } } + +func TestSplitTreeMultiparts(t *testing.T) { + ctx := context.Background() + + memCli, err := NewTreeServiceClientMemory() + require.NoError(t, err) + treeService := NewTree(memCli, zaptest.NewLogger(t)) + + bktInfo := &data.BucketInfo{ + CID: cidtest.ID(), + } + + multipartInfo := &data.MultipartInfo{ + Key: "multipart", + UploadID: "id", + Meta: map[string]string{}, + Owner: usertest.ID(), + } + + err = treeService.CreateMultipartUpload(ctx, bktInfo, multipartInfo) + require.NoError(t, err) + + multipartInfo, err = treeService.GetMultipartUpload(ctx, bktInfo, multipartInfo.Key, multipartInfo.UploadID) + require.NoError(t, err) + + var objIDs []oid.ID + for i := 0; i < 2; i++ { + objID := oidtest.ID() + _, err = memCli.AddNode(ctx, bktInfo, systemTree, multipartInfo.ID, map[string]string{ + partNumberKV: "1", + oidKV: objID.EncodeToString(), + ownerKV: usertest.ID().EncodeToString(), + }) + require.NoError(t, err) + objIDs = append(objIDs, objID) + } + + parts, err := treeService.GetParts(ctx, bktInfo, multipartInfo.ID) + require.NoError(t, err) + require.Len(t, parts, 2) + + objToDeletes, err := treeService.AddPart(ctx, bktInfo, multipartInfo.ID, &data.PartInfo{ + Key: multipartInfo.Key, + UploadID: multipartInfo.UploadID, + Number: 1, + OID: oidtest.ID(), + }) + require.NoError(t, err) + require.EqualValues(t, objIDs, objToDeletes, "oids to delete mismatched") + + parts, err = treeService.GetParts(ctx, bktInfo, multipartInfo.ID) + require.NoError(t, err) + require.Len(t, parts, 1) +}