[#466] Implement PATCH for multipart objects #466

Merged
alexvanin merged 1 commit from mbiryukova/frostfs-s3-gw:feature/patch_multipart into feature/patch 2024-08-30 06:55:47 +00:00
7 changed files with 446 additions and 40 deletions

View file

@ -95,13 +95,19 @@ func (h *handler) PatchObjectHandler(w http.ResponseWriter, r *http.Request) {
}
params := &layer.PatchObjectParams{
Object: srcObjInfo,
Object: extendedSrcObjInfo,
BktInfo: bktInfo,
NewBytes: r.Body,
Range: byteRange,
VersioningEnabled: settings.VersioningEnabled(),
}
params.CopiesNumbers, err = h.pickCopiesNumbers(nil, reqInfo.Namespace, bktInfo.LocationConstraint)
if err != nil {
h.logAndSendError(w, "invalid copies number", reqInfo, err)
return
}
extendedObjInfo, err := h.obj.PatchObject(ctx, params)
if err != nil {
if isErrObjectLocked(err) {
@ -112,7 +118,10 @@ func (h *handler) PatchObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID())
if settings.VersioningEnabled() {
w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID())
}
w.Header().Set(api.ETag, data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled())))
resp := PatchObjectResult{

View file

@ -3,6 +3,7 @@ package handler
import (
"bytes"
"crypto/md5"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/xml"
@ -107,6 +108,237 @@ func TestPatch(t *testing.T) {
}
}
func TestPatchMultipartObject(t *testing.T) {
tc := prepareHandlerContextWithMinCache(t)
tc.config.md5Enabled = true
bktName, objName, partSize := "bucket-for-multipart-patch", "object-for-multipart-patch", 5*1024*1024
createTestBucket(tc, bktName)
t.Run("patch beginning of the first part", func(t *testing.T) {
dkirillov marked this conversation as resolved Outdated

Maybe it's better to use t.Run

t.Run("patch beginning of the first part", func(t *testing.T) {
	multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
	etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
	etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
	etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
	completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})

	patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(patchSize-1)+"/*", patchBody, nil)
	object, header := getObject(tc, bktName, objName)
	contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
	require.NoError(t, err)
	equalDataSlices(t, bytes.Join([][]byte{patchBody, data1[patchSize:], data2, data3}, []byte("")), object)
	require.Equal(t, partSize*3, contentLen)
	require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
Maybe it's better to use `t.Run` ```golang t.Run("patch beginning of the first part", func(t *testing.T) { multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{}) etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize) etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize) etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize) completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3}) patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(patchSize-1)+"/*", patchBody, nil) object, header := getObject(tc, bktName, objName) contentLen, err := strconv.Atoi(header.Get(api.ContentLength)) require.NoError(t, err) equalDataSlices(t, bytes.Join([][]byte{patchBody, data1[patchSize:], data2, data3}, []byte("")), object) require.Equal(t, partSize*3, contentLen) require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3")) }) ```
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchSize := partSize / 2
patchBody := make([]byte, patchSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(patchSize-1)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{patchBody, data1[patchSize:], data2, data3}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
t.Run("patch middle of the first part", func(t *testing.T) {
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchSize := partSize / 2
patchBody := make([]byte, patchSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/4)+"-"+strconv.Itoa(partSize*3/4-1)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/4], patchBody, data1[partSize*3/4:], data2, data3}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
t.Run("patch first and second parts", func(t *testing.T) {
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchSize := partSize / 2
patchBody := make([]byte, patchSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*3/4)+"-"+strconv.Itoa(partSize*5/4-1)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize*3/4], patchBody, data2[partSize/4:], data3}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
t.Run("patch all parts", func(t *testing.T) {
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchSize := partSize * 2
patchBody := make([]byte, patchSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/2-1)+"-"+strconv.Itoa(partSize/2+patchSize-2)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/2-1], patchBody, data3[partSize/2-1:]}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
t.Run("patch all parts and append bytes", func(t *testing.T) {
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchSize := partSize * 3
patchBody := make([]byte, patchSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/2)+"-"+strconv.Itoa(partSize/2+patchSize-1)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/2], patchBody}, []byte("")), object)
require.Equal(t, partSize*7/2, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
t.Run("patch second part", func(t *testing.T) {
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchBody := make([]byte, partSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize)+"-"+strconv.Itoa(partSize*2-1)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1, patchBody, data3}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
t.Run("patch last part, equal size", func(t *testing.T) {
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchBody := make([]byte, partSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2)+"-"+strconv.Itoa(partSize*3-1)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1, data2, patchBody}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
t.Run("patch last part, increase size", func(t *testing.T) {
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchBody := make([]byte, partSize+1)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2)+"-"+strconv.Itoa(partSize*3)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1, data2, patchBody}, []byte("")), object)
require.Equal(t, partSize*3+1, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
t.Run("patch last part with offset and append bytes", func(t *testing.T) {
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchBody := make([]byte, partSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2+3)+"-"+strconv.Itoa(partSize*3+2)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1, data2, data3[:3], patchBody}, []byte("")), object)
require.Equal(t, partSize*3+3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
t.Run("append bytes", func(t *testing.T) {
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchBody := make([]byte, partSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*3)+"-"+strconv.Itoa(partSize*4-1)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1, data2, data3, patchBody}, []byte("")), object)
require.Equal(t, partSize*4, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
})
t.Run("patch empty multipart", func(t *testing.T) {
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, 0)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag})
patchBody := make([]byte, partSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(partSize-1)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, patchBody, object)
require.Equal(t, partSize, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-1"))
})
}
func TestPatchWithVersion(t *testing.T) {
hc := prepareHandlerContextWithMinCache(t)
bktName, objName := "bucket", "obj"

View file

@ -215,7 +215,7 @@ type PrmObjectPatch struct {
Payload io.Reader
// Object range to patch.
Range *RangeParams
Offset, Length uint64
// Size of original object payload.
ObjectSize uint64

View file

@ -430,12 +430,12 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm PrmObjectPatch) (oid.
}
var newPayload []byte
if prm.Range.Start > 0 {
newPayload = append(newPayload, obj.Payload()[:prm.Range.Start]...)
if prm.Offset > 0 {
newPayload = append(newPayload, obj.Payload()[:prm.Offset]...)
}
newPayload = append(newPayload, patchBytes...)
if prm.Range.End < obj.PayloadSize()-1 {
newPayload = append(newPayload, obj.Payload()[prm.Range.End+1:]...)
if prm.Offset+prm.Length < obj.PayloadSize() {
newPayload = append(newPayload, obj.Payload()[prm.Offset+prm.Length:]...)
}
newObj.SetPayload(newPayload)
newObj.SetPayloadSize(uint64(len(newPayload)))

View file

@ -1,6 +1,7 @@
package layer
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/rand"
@ -163,11 +164,12 @@ type (
}
PatchObjectParams struct {
Object *data.ObjectInfo
Object *data.ExtendedObjectInfo
BktInfo *data.BucketInfo
NewBytes io.Reader
Range *RangeParams
VersioningEnabled bool
CopiesNumbers []uint32
}
// CreateBucketParams stores bucket create request parameters.
@ -546,63 +548,226 @@ func (n *Layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Exte
}
func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
if p.Object.Headers[AttributeDecryptedSize] != "" {
if p.Object.ObjectInfo.Headers[AttributeDecryptedSize] != "" {
return nil, fmt.Errorf("patch encrypted object")
}
if p.Object.Headers[MultipartObjectSize] != "" {
// TODO: support multipart object patch
return nil, fmt.Errorf("patch multipart object")
if p.Object.ObjectInfo.Headers[MultipartObjectSize] != "" {
return n.patchMultipartObject(ctx, p)
}
prmPatch := PrmObjectPatch{
Container: p.BktInfo.CID,
Object: p.Object.ID,
Object: p.Object.ObjectInfo.ID,
Payload: p.NewBytes,
Range: p.Range,
ObjectSize: p.Object.Size,
Offset: p.Range.Start,
Length: p.Range.End - p.Range.Start + 1,
ObjectSize: p.Object.ObjectInfo.Size,
}
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
objID, err := n.frostFS.PatchObject(ctx, prmPatch)
createdObj, err := n.patchObject(ctx, prmPatch)
if err != nil {
return nil, fmt.Errorf("patch object: %w", err)
}
obj, err := n.objectHead(ctx, p.BktInfo, objID)
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: createdObj.ID,
ETag: hex.EncodeToString(createdObj.HashSum),
FilePath: p.Object.ObjectInfo.Name,
Size: createdObj.Size,
Created: &p.Object.ObjectInfo.Created,
Owner: &n.gateOwner,
CreationEpoch: p.Object.NodeVersion.CreationEpoch,
},
IsUnversioned: !p.VersioningEnabled,
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
}
p.Object.ObjectInfo.ID = createdObj.ID
p.Object.ObjectInfo.Size = createdObj.Size
p.Object.ObjectInfo.MD5Sum = ""
p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum)
p.Object.NodeVersion = newVersion
return p.Object, nil
}
func (n *Layer) patchObject(ctx context.Context, p PrmObjectPatch) (*data.CreatedObjectInfo, error) {
objID, err := n.frostFS.PatchObject(ctx, p)
if err != nil {
return nil, fmt.Errorf("patch object: %w", err)
}
prmHead := PrmObjectHead{
PrmAuth: p.PrmAuth,
Container: p.Container,
Object: objID,
}
obj, err := n.frostFS.HeadObject(ctx, prmHead)
if err != nil {
return nil, fmt.Errorf("head object: %w", err)
}
payloadChecksum, _ := obj.PayloadChecksum()
hashSum := hex.EncodeToString(payloadChecksum.Value())
return &data.CreatedObjectInfo{
ID: objID,
Size: obj.PayloadSize(),
HashSum: payloadChecksum.Value(),
}, nil
}
func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
combinedObj, err := n.objectGet(ctx, p.BktInfo, p.Object.ObjectInfo.ID)
dkirillov marked this conversation as resolved Outdated

Should we configure buffer size?

Should we configure buffer size?
if err != nil {
return nil, fmt.Errorf("get combined object '%s': %w", p.Object.ObjectInfo.ID.EncodeToString(), err)
}
var parts []*data.PartInfo
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
}
dkirillov marked this conversation as resolved Outdated

I don't understand why do we need || (p.Range.Start == part.Size && i != len(parts)-1). If we really need this, let's add test for this condition. Currently without this condition tests pass

I don't understand why do we need `|| (p.Range.Start == part.Size && i != len(parts)-1)`. If we really need this, let's add test for this condition. Currently without this condition tests pass

If range starts after the last part - we have to patch the last part. Otherwise, we move on to the next part. I will add test for this condition

If range starts after the last part - we have to patch the last part. Otherwise, we move on to the next part. I will add test for this condition

I still can freely delete this condition

I still can freely delete this condition

Seems that it doesn't affect the result, but reduces number of patch requests. If it's ok, I can count their number and compare it with expected one

Seems that it doesn't affect the result, but reduces number of patch requests. If it's ok, I can count their number and compare it with expected one
prmPatch := PrmObjectPatch{
Container: p.BktInfo.CID,
}
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
off, ln := p.Range.Start, p.Range.End-p.Range.Start+1
var multipartObjectSize uint64
for i, part := range parts {
dkirillov marked this conversation as resolved Outdated

Can we use brand new range param. It's not easy to catch idea with p.Range.Start -= part.Size etc.

Can we use brand new range param. It's not easy to catch idea with `p.Range.Start -= part.Size` etc.
dkirillov marked this conversation as resolved Outdated

Probably it's matter of taste but what do you think about writing something like this

diff --git a/api/layer/layer.go b/api/layer/layer.go
index b8c2923f..ead72703 100644
--- a/api/layer/layer.go
+++ b/api/layer/layer.go
@@ -639,36 +639,22 @@ func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams)
 	}
 	n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
 
-	allPatchLen := p.Range.End - p.Range.Start + 1
-	var multipartObjectSize, patchedLen uint64
+	off, ln := p.Range.Start, p.Range.End-p.Range.Start+1
+	var multipartObjectSize uint64
 	for i, part := range parts {
-		if patchedLen == allPatchLen {
+		if off > part.Size || (off == part.Size && i != len(parts)-1) || ln == 0 {
 			multipartObjectSize += part.Size
+			if ln != 0 {
+				off -= part.Size
+			}
 			continue
 		}
 
-		if p.Range.Start > part.Size || (p.Range.Start == part.Size && i != len(parts)-1) {
-			multipartObjectSize += part.Size
-			p.Range.Start -= part.Size
-			p.Range.End -= part.Size
-			continue
-		}
-
-		prmPatch.Object = part.OID
-		prmPatch.ObjectSize = part.Size
-		prmPatch.Offset = p.Range.Start
-		prmPatch.Length = p.Range.End - p.Range.Start + 1
-
-		if i != len(parts)-1 && prmPatch.Length+prmPatch.Offset > part.Size {
-			prmPatch.Length = part.Size - prmPatch.Offset
-		}
-		patchedLen += prmPatch.Length
-
 		var createdObj *data.CreatedObjectInfo
-		if prmPatch.Offset == 0 && prmPatch.Length >= part.Size {
+		if off == 0 && ln > part.Size && part.Size != 0  {
 			prm := PrmObjectCreate{
 				Container:    p.BktInfo.CID,
-				Payload:      io.LimitReader(p.NewBytes, int64(prmPatch.Length)),
+				Payload:      io.LimitReader(p.NewBytes, int64(part.Size)),
 				CreationTime: part.Created,
 				CopiesNumber: p.CopiesNumbers,
 			}
@@ -677,13 +663,25 @@ func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams)
 			if err != nil {
 				return nil, fmt.Errorf("put new part object '%s': %w", part.OID.EncodeToString(), err)
 			}
+			ln -= part.Size
 		} else {
+			curLen := ln
+			if off+curLen > part.Size && i != len(parts)-1 {
+				curLen = part.Size - off
+			}
+			prmPatch.Object = part.OID
+			prmPatch.ObjectSize = part.Size
+			prmPatch.Offset = off
+			prmPatch.Length = curLen
 			prmPatch.Payload = io.LimitReader(p.NewBytes, int64(prmPatch.Length))
 
 			createdObj, err = n.patchObject(ctx, prmPatch)
 			if err != nil {
 				return nil, fmt.Errorf("patch part object '%s': %w", part.OID.EncodeToString(), err)
 			}
+
+			ln -= curLen
+			off = 0
 		}
 
 		parts[i].OID = createdObj.ID
@@ -692,11 +690,6 @@ func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams)
 		parts[i].ETag = hex.EncodeToString(createdObj.HashSum)
 
 		multipartObjectSize += createdObj.Size
-
-		if p.Range.Start > 0 {
-			p.Range.Start = 0
-		}
-		p.Range.End -= part.Size
 	}
 
 	newParts, err := json.Marshal(parts)

Probably it's matter of taste but what do you think about writing something like this ```diff diff --git a/api/layer/layer.go b/api/layer/layer.go index b8c2923f..ead72703 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -639,36 +639,22 @@ func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams) } n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner) - allPatchLen := p.Range.End - p.Range.Start + 1 - var multipartObjectSize, patchedLen uint64 + off, ln := p.Range.Start, p.Range.End-p.Range.Start+1 + var multipartObjectSize uint64 for i, part := range parts { - if patchedLen == allPatchLen { + if off > part.Size || (off == part.Size && i != len(parts)-1) || ln == 0 { multipartObjectSize += part.Size + if ln != 0 { + off -= part.Size + } continue } - if p.Range.Start > part.Size || (p.Range.Start == part.Size && i != len(parts)-1) { - multipartObjectSize += part.Size - p.Range.Start -= part.Size - p.Range.End -= part.Size - continue - } - - prmPatch.Object = part.OID - prmPatch.ObjectSize = part.Size - prmPatch.Offset = p.Range.Start - prmPatch.Length = p.Range.End - p.Range.Start + 1 - - if i != len(parts)-1 && prmPatch.Length+prmPatch.Offset > part.Size { - prmPatch.Length = part.Size - prmPatch.Offset - } - patchedLen += prmPatch.Length - var createdObj *data.CreatedObjectInfo - if prmPatch.Offset == 0 && prmPatch.Length >= part.Size { + if off == 0 && ln > part.Size && part.Size != 0 { prm := PrmObjectCreate{ Container: p.BktInfo.CID, - Payload: io.LimitReader(p.NewBytes, int64(prmPatch.Length)), + Payload: io.LimitReader(p.NewBytes, int64(part.Size)), CreationTime: part.Created, CopiesNumber: p.CopiesNumbers, } @@ -677,13 +663,25 @@ func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams) if err != nil { return nil, fmt.Errorf("put new part object '%s': %w", part.OID.EncodeToString(), err) } + ln -= part.Size } else { + curLen := ln + if off+curLen > part.Size && i != len(parts)-1 { + curLen = part.Size - off + } + prmPatch.Object = part.OID + prmPatch.ObjectSize = part.Size + prmPatch.Offset = off + prmPatch.Length = curLen prmPatch.Payload = io.LimitReader(p.NewBytes, int64(prmPatch.Length)) createdObj, err = n.patchObject(ctx, prmPatch) if err != nil { return nil, fmt.Errorf("patch part object '%s': %w", part.OID.EncodeToString(), err) } + + ln -= curLen + off = 0 } parts[i].OID = createdObj.ID @@ -692,11 +690,6 @@ func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams) parts[i].ETag = hex.EncodeToString(createdObj.HashSum) multipartObjectSize += createdObj.Size - - if p.Range.Start > 0 { - p.Range.Start = 0 - } - p.Range.End -= part.Size } newParts, err := json.Marshal(parts) ```

Looks good, but in some cases we will make unnecessary requests (which are not made with the condition discussed in the comment above)

Looks good, but in some cases we will make unnecessary requests (which are not made with the condition discussed in the comment above)

I've updated diff in previous comment

UPD: again

I've updated diff in previous comment **UPD**: again
if off > part.Size || (off == part.Size && i != len(parts)-1) || ln == 0 {
multipartObjectSize += part.Size
if ln != 0 {
off -= part.Size
}
dkirillov marked this conversation as resolved Outdated

If we have to patch all part it's better to use single put method rather than patch + head

If we have to patch all part it's better to use single `put` method rather than `patch + head`
continue
}
var createdObj *data.CreatedObjectInfo
if off == 0 && ln >= part.Size {
curLen := part.Size
dkirillov marked this conversation as resolved Outdated

Can we use io.LimitReader(p.NewBytes, patchLen) ?

Can we use `io.LimitReader(p.NewBytes, patchLen)` ?
if i == len(parts)-1 {
curLen = ln
}
dkirillov marked this conversation as resolved Outdated

Can we move to separate method patching part?

Can we move to separate method patching part?
prm := PrmObjectCreate{
Container: p.BktInfo.CID,
Payload: io.LimitReader(p.NewBytes, int64(curLen)),
CreationTime: part.Created,
CopiesNumber: p.CopiesNumbers,
}
createdObj, err = n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, fmt.Errorf("put new part object '%s': %w", part.OID.EncodeToString(), err)
}
ln -= curLen
} else {
curLen := ln
if off+curLen > part.Size && i != len(parts)-1 {
curLen = part.Size - off
}
prmPatch.Object = part.OID
prmPatch.ObjectSize = part.Size
prmPatch.Offset = off
prmPatch.Length = curLen
prmPatch.Payload = io.LimitReader(p.NewBytes, int64(prmPatch.Length))
createdObj, err = n.patchObject(ctx, prmPatch)
if err != nil {
return nil, fmt.Errorf("patch part object '%s': %w", part.OID.EncodeToString(), err)
}
ln -= curLen
off = 0
}
parts[i].OID = createdObj.ID
parts[i].Size = createdObj.Size
parts[i].MD5 = ""
parts[i].ETag = hex.EncodeToString(createdObj.HashSum)
multipartObjectSize += createdObj.Size
}
newParts, err := json.Marshal(parts)
if err != nil {
return nil, fmt.Errorf("marshal parts for combined object: %w", err)
}
var headerParts strings.Builder
for i, part := range parts {
headerPart := part.ToHeaderString()
if i != len(parts)-1 {
headerPart += ","
}
headerParts.WriteString(headerPart)
}
prm := PrmObjectCreate{
Container: p.BktInfo.CID,
dkirillov marked this conversation as resolved Outdated

Why do we use patch to update "combined" object? It seems we can use regular put

Why do we use patch to update "combined" object? It seems we can use regular put
PayloadSize: multipartObjectSize,
Filepath: p.Object.ObjectInfo.Name,
Payload: bytes.NewReader(newParts),
CreationTime: p.Object.ObjectInfo.Created,
CopiesNumber: p.CopiesNumbers,
}
prm.Attributes = make([][2]string, 0, len(p.Object.ObjectInfo.Headers)+1)
for k, v := range p.Object.ObjectInfo.Headers {
switch k {
case MultipartObjectSize:
prm.Attributes = append(prm.Attributes, [2]string{MultipartObjectSize, strconv.FormatUint(multipartObjectSize, 10)})
case UploadCompletedParts:
prm.Attributes = append(prm.Attributes, [2]string{UploadCompletedParts, headerParts.String()})
case api.ContentType:
default:
prm.Attributes = append(prm.Attributes, [2]string{k, v})
}
}
prm.Attributes = append(prm.Attributes, [2]string{api.ContentType, p.Object.ObjectInfo.ContentType})
dkirillov marked this conversation as resolved Outdated

Probably we should rebase feature/patch before merge this PR?

Probably we should rebase `feature/patch` before merge this PR?
createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, fmt.Errorf("put new combined object: %w", err)
}
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: objID,
ETag: hashSum,
FilePath: p.Object.Name,
Size: obj.PayloadSize(),
Created: &p.Object.Created,
Owner: &n.gateOwner,
// TODO: Add creation epoch
OID: createdObj.ID,
ETag: hex.EncodeToString(createdObj.HashSum),
MD5: hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts)),
FilePath: p.Object.ObjectInfo.Name,
Size: multipartObjectSize,
Created: &p.Object.ObjectInfo.Created,
Owner: &n.gateOwner,
CreationEpoch: p.Object.NodeVersion.CreationEpoch,
},
IsUnversioned: !p.VersioningEnabled,
IsCombined: p.Object.Headers[MultipartObjectSize] != "",
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
}
p.Object.ID = objID
p.Object.Size = obj.PayloadSize()
p.Object.MD5Sum = ""
p.Object.HashSum = hashSum
p.Object.ObjectInfo.ID = createdObj.ID
p.Object.ObjectInfo.Size = createdObj.Size
p.Object.ObjectInfo.MD5Sum = hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts))
p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum)
p.Object.ObjectInfo.Headers[MultipartObjectSize] = strconv.FormatUint(multipartObjectSize, 10)
p.Object.ObjectInfo.Headers[UploadCompletedParts] = headerParts.String()
p.Object.NodeVersion = newVersion
return &data.ExtendedObjectInfo{
ObjectInfo: p.Object,
NodeVersion: newVersion,
}, nil
return p.Object, nil
}
func getRandomOID() (oid.ID, error) {

View file

@ -153,7 +153,7 @@ The request returns the following data in XML format.
- **ETag**
Patched object tag. Always in SHA-256 format.
Patched object tag. For regular objects always in SHA-256 format.
If the bucket is versioned, the **_x-amz-version-id_** header is returned with the version of the created object.

View file

@ -412,10 +412,10 @@ func (x *FrostFS) PatchObject(ctx context.Context, prm layer.PrmObjectPatch) (oi
prmPatch.SetAddress(addr)
var rng object.Range
rng.SetOffset(prm.Range.Start)
rng.SetLength(prm.Range.End - prm.Range.Start + 1)
if prm.Range.End >= prm.ObjectSize {
rng.SetLength(prm.ObjectSize - prm.Range.Start)
rng.SetOffset(prm.Offset)
rng.SetLength(prm.Length)
if prm.Length+prm.Offset > prm.ObjectSize {
rng.SetLength(prm.ObjectSize - prm.Offset)
}
prmPatch.SetRange(&rng)