From d6b506f6d95a89b5c82291b3804308e44c6806a1 Mon Sep 17 00:00:00 2001 From: Marina Biryukova Date: Tue, 20 Aug 2024 12:35:07 +0300 Subject: [PATCH] [#466] Implement PATCH for multipart objects Signed-off-by: Marina Biryukova --- CHANGELOG.md | 1 + api/handler/patch.go | 13 +- api/handler/patch_test.go | 232 ++++++++++++++++++++++++++++++++++ api/layer/frostfs.go | 2 +- api/layer/frostfs_mock.go | 8 +- api/layer/patch.go | 242 +++++++++++++++++++++++++++++++----- docs/extensions.md | 2 +- internal/frostfs/frostfs.go | 8 +- 8 files changed, 468 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd9e2a1..6b9d2b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ This document outlines major changes between releases. ### Added - Add support for virtual hosted style addressing (#446, #449) - Support new param `frostfs.graceful_close_on_switch_timeout` (#475) +- Support patch object method (#479) ### Changed - Update go version to go1.19 (#470) diff --git a/api/handler/patch.go b/api/handler/patch.go index 220cfc2..a9f71b7 100644 --- a/api/handler/patch.go +++ b/api/handler/patch.go @@ -95,13 +95,19 @@ func (h *handler) PatchObjectHandler(w http.ResponseWriter, r *http.Request) { } params := &layer.PatchObjectParams{ - Object: srcObjInfo, + Object: extendedSrcObjInfo, BktInfo: bktInfo, NewBytes: r.Body, Range: byteRange, VersioningEnabled: settings.VersioningEnabled(), } + params.CopiesNumbers, err = h.pickCopiesNumbers(nil, reqInfo.Namespace, bktInfo.LocationConstraint) + if err != nil { + h.logAndSendError(w, "invalid copies number", reqInfo, err) + return + } + extendedObjInfo, err := h.obj.PatchObject(ctx, params) if err != nil { if isErrObjectLocked(err) { @@ -112,7 +118,10 @@ func (h *handler) PatchObjectHandler(w http.ResponseWriter, r *http.Request) { return } - w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID()) + if settings.VersioningEnabled() { + w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID()) + } + w.Header().Set(api.ETag, data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled()))) resp := PatchObjectResult{ diff --git a/api/handler/patch_test.go b/api/handler/patch_test.go index 65b12f2..32b0e13 100644 --- a/api/handler/patch_test.go +++ b/api/handler/patch_test.go @@ -3,6 +3,7 @@ package handler import ( "bytes" "crypto/md5" + "crypto/rand" "crypto/sha256" "encoding/hex" "encoding/xml" @@ -107,6 +108,237 @@ func TestPatch(t *testing.T) { } } +func TestPatchMultipartObject(t *testing.T) { + tc := prepareHandlerContextWithMinCache(t) + tc.config.md5Enabled = true + + bktName, objName, partSize := "bucket-for-multipart-patch", "object-for-multipart-patch", 5*1024*1024 + createTestBucket(tc, bktName) + + t.Run("patch beginning of the first part", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) + etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) + etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) + + patchSize := partSize / 2 + patchBody := make([]byte, patchSize) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(patchSize-1)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, bytes.Join([][]byte{patchBody, data1[patchSize:], data2, data3}, []byte("")), object) + require.Equal(t, partSize*3, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) + }) + + t.Run("patch middle of the first part", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) + etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) + etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) + + patchSize := partSize / 2 + patchBody := make([]byte, patchSize) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/4)+"-"+strconv.Itoa(partSize*3/4-1)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/4], patchBody, data1[partSize*3/4:], data2, data3}, []byte("")), object) + require.Equal(t, partSize*3, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) + }) + + t.Run("patch first and second parts", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) + etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) + etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) + + patchSize := partSize / 2 + patchBody := make([]byte, patchSize) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*3/4)+"-"+strconv.Itoa(partSize*5/4-1)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, bytes.Join([][]byte{data1[:partSize*3/4], patchBody, data2[partSize/4:], data3}, []byte("")), object) + require.Equal(t, partSize*3, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) + }) + + t.Run("patch all parts", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) + etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) + etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) + + patchSize := partSize * 2 + patchBody := make([]byte, patchSize) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/2-1)+"-"+strconv.Itoa(partSize/2+patchSize-2)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/2-1], patchBody, data3[partSize/2-1:]}, []byte("")), object) + require.Equal(t, partSize*3, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) + }) + + t.Run("patch all parts and append bytes", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) + etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) + etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) + + patchSize := partSize * 3 + patchBody := make([]byte, patchSize) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/2)+"-"+strconv.Itoa(partSize/2+patchSize-1)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/2], patchBody}, []byte("")), object) + require.Equal(t, partSize*7/2, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) + }) + + t.Run("patch second part", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) + etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) + etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) + + patchBody := make([]byte, partSize) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize)+"-"+strconv.Itoa(partSize*2-1)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, bytes.Join([][]byte{data1, patchBody, data3}, []byte("")), object) + require.Equal(t, partSize*3, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) + }) + + t.Run("patch last part, equal size", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) + etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) + etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) + + patchBody := make([]byte, partSize) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2)+"-"+strconv.Itoa(partSize*3-1)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, bytes.Join([][]byte{data1, data2, patchBody}, []byte("")), object) + require.Equal(t, partSize*3, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) + }) + + t.Run("patch last part, increase size", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) + etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) + etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) + + patchBody := make([]byte, partSize+1) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2)+"-"+strconv.Itoa(partSize*3)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, bytes.Join([][]byte{data1, data2, patchBody}, []byte("")), object) + require.Equal(t, partSize*3+1, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) + }) + + t.Run("patch last part with offset and append bytes", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) + etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) + etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) + + patchBody := make([]byte, partSize) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2+3)+"-"+strconv.Itoa(partSize*3+2)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, bytes.Join([][]byte{data1, data2, data3[:3], patchBody}, []byte("")), object) + require.Equal(t, partSize*3+3, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) + }) + + t.Run("append bytes", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) + etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) + etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) + + patchBody := make([]byte, partSize) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*3)+"-"+strconv.Itoa(partSize*4-1)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, bytes.Join([][]byte{data1, data2, data3, patchBody}, []byte("")), object) + require.Equal(t, partSize*4, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) + }) + + t.Run("patch empty multipart", func(t *testing.T) { + multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) + etag, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, 0) + completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag}) + + patchBody := make([]byte, partSize) + _, err := rand.Read(patchBody) + require.NoError(t, err) + + patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(partSize-1)+"/*", patchBody, nil) + object, header := getObject(tc, bktName, objName) + contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) + require.NoError(t, err) + equalDataSlices(t, patchBody, object) + require.Equal(t, partSize, contentLen) + require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-1")) + }) +} + func TestPatchWithVersion(t *testing.T) { hc := prepareHandlerContextWithMinCache(t) bktName, objName := "bucket", "obj" diff --git a/api/layer/frostfs.go b/api/layer/frostfs.go index 32f53ef..3f01f2f 100644 --- a/api/layer/frostfs.go +++ b/api/layer/frostfs.go @@ -215,7 +215,7 @@ type PrmObjectPatch struct { Payload io.Reader // Object range to patch. - Range *RangeParams + Offset, Length uint64 // Size of original object payload. ObjectSize uint64 diff --git a/api/layer/frostfs_mock.go b/api/layer/frostfs_mock.go index c41297b..2cd3295 100644 --- a/api/layer/frostfs_mock.go +++ b/api/layer/frostfs_mock.go @@ -430,12 +430,12 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm PrmObjectPatch) (oid. } var newPayload []byte - if prm.Range.Start > 0 { - newPayload = append(newPayload, obj.Payload()[:prm.Range.Start]...) + if prm.Offset > 0 { + newPayload = append(newPayload, obj.Payload()[:prm.Offset]...) } newPayload = append(newPayload, patchBytes...) - if prm.Range.End < obj.PayloadSize()-1 { - newPayload = append(newPayload, obj.Payload()[prm.Range.End+1:]...) + if prm.Offset+prm.Length < obj.PayloadSize() { + newPayload = append(newPayload, obj.Payload()[prm.Offset+prm.Length:]...) } newObj.SetPayload(newPayload) newObj.SetPayloadSize(uint64(len(newPayload))) diff --git a/api/layer/patch.go b/api/layer/patch.go index ab67cf5..44d4e4f 100644 --- a/api/layer/patch.go +++ b/api/layer/patch.go @@ -1,78 +1,264 @@ package layer import ( + "bytes" "context" "encoding/hex" + "encoding/json" "fmt" "io" + "strconv" + "strings" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" ) type PatchObjectParams struct { - Object *data.ObjectInfo + Object *data.ExtendedObjectInfo BktInfo *data.BucketInfo NewBytes io.Reader Range *RangeParams VersioningEnabled bool + CopiesNumbers []uint32 } func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) { - if p.Object.Headers[AttributeDecryptedSize] != "" { + if p.Object.ObjectInfo.Headers[AttributeDecryptedSize] != "" { return nil, fmt.Errorf("patch encrypted object") } - if p.Object.Headers[MultipartObjectSize] != "" { - // TODO: support multipart object patch - return nil, fmt.Errorf("patch multipart object") + if p.Object.ObjectInfo.Headers[MultipartObjectSize] != "" { + return n.patchMultipartObject(ctx, p) } prmPatch := PrmObjectPatch{ Container: p.BktInfo.CID, - Object: p.Object.ID, + Object: p.Object.ObjectInfo.ID, Payload: p.NewBytes, - Range: p.Range, - ObjectSize: p.Object.Size, + Offset: p.Range.Start, + Length: p.Range.End - p.Range.Start + 1, + ObjectSize: p.Object.ObjectInfo.Size, } n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner) - objID, err := n.frostFS.PatchObject(ctx, prmPatch) + createdObj, err := n.patchObject(ctx, prmPatch) if err != nil { return nil, fmt.Errorf("patch object: %w", err) } - obj, err := n.objectHead(ctx, p.BktInfo, objID) + newVersion := &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + OID: createdObj.ID, + ETag: hex.EncodeToString(createdObj.HashSum), + FilePath: p.Object.ObjectInfo.Name, + Size: createdObj.Size, + Created: &p.Object.ObjectInfo.Created, + Owner: &n.gateOwner, + CreationEpoch: p.Object.NodeVersion.CreationEpoch, + }, + IsUnversioned: !p.VersioningEnabled, + IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "", + } + + if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { + return nil, fmt.Errorf("couldn't add new version to tree service: %w", err) + } + + p.Object.ObjectInfo.ID = createdObj.ID + p.Object.ObjectInfo.Size = createdObj.Size + p.Object.ObjectInfo.MD5Sum = "" + p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum) + p.Object.NodeVersion = newVersion + + return p.Object, nil +} + +func (n *Layer) patchObject(ctx context.Context, p PrmObjectPatch) (*data.CreatedObjectInfo, error) { + objID, err := n.frostFS.PatchObject(ctx, p) + if err != nil { + return nil, fmt.Errorf("patch object: %w", err) + } + + prmHead := PrmObjectHead{ + PrmAuth: p.PrmAuth, + Container: p.Container, + Object: objID, + } + obj, err := n.frostFS.HeadObject(ctx, prmHead) if err != nil { return nil, fmt.Errorf("head object: %w", err) } payloadChecksum, _ := obj.PayloadChecksum() - hashSum := hex.EncodeToString(payloadChecksum.Value()) + + return &data.CreatedObjectInfo{ + ID: objID, + Size: obj.PayloadSize(), + HashSum: payloadChecksum.Value(), + }, nil +} + +func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) { + combinedObj, err := n.objectGet(ctx, p.BktInfo, p.Object.ObjectInfo.ID) + if err != nil { + return nil, fmt.Errorf("get combined object '%s': %w", p.Object.ObjectInfo.ID.EncodeToString(), err) + } + + var parts []*data.PartInfo + if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil { + return nil, fmt.Errorf("unmarshal combined object parts: %w", err) + } + + prmPatch := PrmObjectPatch{ + Container: p.BktInfo.CID, + } + n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner) + + off, ln := p.Range.Start, p.Range.End-p.Range.Start+1 + var multipartObjectSize uint64 + for i, part := range parts { + if off > part.Size || (off == part.Size && i != len(parts)-1) || ln == 0 { + multipartObjectSize += part.Size + if ln != 0 { + off -= part.Size + } + continue + } + + var createdObj *data.CreatedObjectInfo + createdObj, off, ln, err = n.patchPart(ctx, part, p, &prmPatch, off, ln, i == len(parts)-1) + if err != nil { + return nil, fmt.Errorf("patch part: %w", err) + } + + parts[i].OID = createdObj.ID + parts[i].Size = createdObj.Size + parts[i].MD5 = "" + parts[i].ETag = hex.EncodeToString(createdObj.HashSum) + + multipartObjectSize += createdObj.Size + } + + return n.updateCombinedObject(ctx, parts, multipartObjectSize, p) +} + +// Returns patched part info, updated offset and length. +func (n *Layer) patchPart(ctx context.Context, part *data.PartInfo, p *PatchObjectParams, prmPatch *PrmObjectPatch, off, ln uint64, lastPart bool) (*data.CreatedObjectInfo, uint64, uint64, error) { + if off == 0 && ln >= part.Size { + curLen := part.Size + if lastPart { + curLen = ln + } + prm := PrmObjectCreate{ + Container: p.BktInfo.CID, + Payload: io.LimitReader(p.NewBytes, int64(curLen)), + CreationTime: part.Created, + CopiesNumber: p.CopiesNumbers, + } + + createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo) + if err != nil { + return nil, 0, 0, fmt.Errorf("put new part object '%s': %w", part.OID.EncodeToString(), err) + } + + ln -= curLen + + return createdObj, off, ln, err + } + + curLen := ln + if off+curLen > part.Size && !lastPart { + curLen = part.Size - off + } + prmPatch.Object = part.OID + prmPatch.ObjectSize = part.Size + prmPatch.Offset = off + prmPatch.Length = curLen + + prmPatch.Payload = io.LimitReader(p.NewBytes, int64(prmPatch.Length)) + + createdObj, err := n.patchObject(ctx, *prmPatch) + if err != nil { + return nil, 0, 0, fmt.Errorf("patch part object '%s': %w", part.OID.EncodeToString(), err) + } + + ln -= curLen + off = 0 + + return createdObj, off, ln, nil +} + +func (n *Layer) updateCombinedObject(ctx context.Context, parts []*data.PartInfo, fullObjSize uint64, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) { + newParts, err := json.Marshal(parts) + if err != nil { + return nil, fmt.Errorf("marshal parts for combined object: %w", err) + } + + var headerParts strings.Builder + for i, part := range parts { + headerPart := part.ToHeaderString() + if i != len(parts)-1 { + headerPart += "," + } + headerParts.WriteString(headerPart) + } + + prm := PrmObjectCreate{ + Container: p.BktInfo.CID, + PayloadSize: fullObjSize, + Filepath: p.Object.ObjectInfo.Name, + Payload: bytes.NewReader(newParts), + CreationTime: p.Object.ObjectInfo.Created, + CopiesNumber: p.CopiesNumbers, + } + + prm.Attributes = make([][2]string, 0, len(p.Object.ObjectInfo.Headers)+1) + + for k, v := range p.Object.ObjectInfo.Headers { + switch k { + case MultipartObjectSize: + prm.Attributes = append(prm.Attributes, [2]string{MultipartObjectSize, strconv.FormatUint(fullObjSize, 10)}) + case UploadCompletedParts: + prm.Attributes = append(prm.Attributes, [2]string{UploadCompletedParts, headerParts.String()}) + case api.ContentType: + default: + prm.Attributes = append(prm.Attributes, [2]string{k, v}) + } + } + prm.Attributes = append(prm.Attributes, [2]string{api.ContentType, p.Object.ObjectInfo.ContentType}) + + createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo) + if err != nil { + return nil, fmt.Errorf("put new combined object: %w", err) + } + newVersion := &data.NodeVersion{ BaseNodeVersion: data.BaseNodeVersion{ - OID: objID, - ETag: hashSum, - FilePath: p.Object.Name, - Size: obj.PayloadSize(), - Created: &p.Object.Created, - Owner: &n.gateOwner, - // TODO: Add creation epoch + OID: createdObj.ID, + ETag: hex.EncodeToString(createdObj.HashSum), + MD5: hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts)), + FilePath: p.Object.ObjectInfo.Name, + Size: fullObjSize, + Created: &p.Object.ObjectInfo.Created, + Owner: &n.gateOwner, + CreationEpoch: p.Object.NodeVersion.CreationEpoch, }, IsUnversioned: !p.VersioningEnabled, - IsCombined: p.Object.Headers[MultipartObjectSize] != "", + IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "", } if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { - return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err) + return nil, fmt.Errorf("couldn't add new version to tree service: %w", err) } - p.Object.ID = objID - p.Object.Size = obj.PayloadSize() - p.Object.MD5Sum = "" - p.Object.HashSum = hashSum + p.Object.ObjectInfo.ID = createdObj.ID + p.Object.ObjectInfo.Size = createdObj.Size + p.Object.ObjectInfo.MD5Sum = hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts)) + p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum) + p.Object.ObjectInfo.Headers[MultipartObjectSize] = strconv.FormatUint(fullObjSize, 10) + p.Object.ObjectInfo.Headers[UploadCompletedParts] = headerParts.String() + p.Object.NodeVersion = newVersion - return &data.ExtendedObjectInfo{ - ObjectInfo: p.Object, - NodeVersion: newVersion, - }, nil + return p.Object, nil } diff --git a/docs/extensions.md b/docs/extensions.md index fa8cf1d..6c90e2e 100644 --- a/docs/extensions.md +++ b/docs/extensions.md @@ -153,7 +153,7 @@ The request returns the following data in XML format. - **ETag** - Patched object tag. Always in SHA-256 format. + Patched object tag. For regular objects always in SHA-256 format. If the bucket is versioned, the **_x-amz-version-id_** header is returned with the version of the created object. diff --git a/internal/frostfs/frostfs.go b/internal/frostfs/frostfs.go index bee2e88..5fd935b 100644 --- a/internal/frostfs/frostfs.go +++ b/internal/frostfs/frostfs.go @@ -412,10 +412,10 @@ func (x *FrostFS) PatchObject(ctx context.Context, prm layer.PrmObjectPatch) (oi prmPatch.SetAddress(addr) var rng object.Range - rng.SetOffset(prm.Range.Start) - rng.SetLength(prm.Range.End - prm.Range.Start + 1) - if prm.Range.End >= prm.ObjectSize { - rng.SetLength(prm.ObjectSize - prm.Range.Start) + rng.SetOffset(prm.Offset) + rng.SetLength(prm.Length) + if prm.Length+prm.Offset > prm.ObjectSize { + rng.SetLength(prm.ObjectSize - prm.Offset) } prmPatch.SetRange(&rng)