[#466] Implement PATCH for multipart objects #466
|
@ -95,13 +95,19 @@ func (h *handler) PatchObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
params := &layer.PatchObjectParams{
|
params := &layer.PatchObjectParams{
|
||||||
Object: srcObjInfo,
|
Object: extendedSrcObjInfo,
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
NewBytes: r.Body,
|
NewBytes: r.Body,
|
||||||
Range: byteRange,
|
Range: byteRange,
|
||||||
VersioningEnabled: settings.VersioningEnabled(),
|
VersioningEnabled: settings.VersioningEnabled(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
params.CopiesNumbers, err = h.pickCopiesNumbers(nil, reqInfo.Namespace, bktInfo.LocationConstraint)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
extendedObjInfo, err := h.obj.PatchObject(ctx, params)
|
extendedObjInfo, err := h.obj.PatchObject(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrObjectLocked(err) {
|
if isErrObjectLocked(err) {
|
||||||
|
@ -112,7 +118,10 @@ func (h *handler) PatchObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID())
|
if settings.VersioningEnabled() {
|
||||||
|
w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID())
|
||||||
|
}
|
||||||
|
|
||||||
w.Header().Set(api.ETag, data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled())))
|
w.Header().Set(api.ETag, data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled())))
|
||||||
|
|
||||||
resp := PatchObjectResult{
|
resp := PatchObjectResult{
|
||||||
|
|
|
@ -3,6 +3,7 @@ package handler
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
|
"crypto/rand"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
|
@ -107,6 +108,237 @@ func TestPatch(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPatchMultipartObject(t *testing.T) {
|
||||||
|
tc := prepareHandlerContextWithMinCache(t)
|
||||||
|
tc.config.md5Enabled = true
|
||||||
|
|
||||||
|
bktName, objName, partSize := "bucket-for-multipart-patch", "object-for-multipart-patch", 5*1024*1024
|
||||||
|
createTestBucket(tc, bktName)
|
||||||
|
|
||||||
|
t.Run("patch beginning of the first part", func(t *testing.T) {
|
||||||
dkirillov marked this conversation as resolved
Outdated
|
|||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchSize := partSize / 2
|
||||||
|
patchBody := make([]byte, patchSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(patchSize-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{patchBody, data1[patchSize:], data2, data3}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch middle of the first part", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchSize := partSize / 2
|
||||||
|
patchBody := make([]byte, patchSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/4)+"-"+strconv.Itoa(partSize*3/4-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/4], patchBody, data1[partSize*3/4:], data2, data3}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch first and second parts", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchSize := partSize / 2
|
||||||
|
patchBody := make([]byte, patchSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*3/4)+"-"+strconv.Itoa(partSize*5/4-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize*3/4], patchBody, data2[partSize/4:], data3}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch all parts", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchSize := partSize * 2
|
||||||
|
patchBody := make([]byte, patchSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/2-1)+"-"+strconv.Itoa(partSize/2+patchSize-2)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/2-1], patchBody, data3[partSize/2-1:]}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch all parts and append bytes", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchSize := partSize * 3
|
||||||
|
patchBody := make([]byte, patchSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/2)+"-"+strconv.Itoa(partSize/2+patchSize-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/2], patchBody}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*7/2, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch second part", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize)+"-"+strconv.Itoa(partSize*2-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1, patchBody, data3}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch last part, equal size", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2)+"-"+strconv.Itoa(partSize*3-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1, data2, patchBody}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch last part, increase size", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize+1)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2)+"-"+strconv.Itoa(partSize*3)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1, data2, patchBody}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3+1, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch last part with offset and append bytes", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2+3)+"-"+strconv.Itoa(partSize*3+2)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1, data2, data3[:3], patchBody}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3+3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("append bytes", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*3)+"-"+strconv.Itoa(partSize*4-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1, data2, data3, patchBody}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*4, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch empty multipart", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, 0)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(partSize-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, patchBody, object)
|
||||||
|
require.Equal(t, partSize, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-1"))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestPatchWithVersion(t *testing.T) {
|
func TestPatchWithVersion(t *testing.T) {
|
||||||
hc := prepareHandlerContextWithMinCache(t)
|
hc := prepareHandlerContextWithMinCache(t)
|
||||||
bktName, objName := "bucket", "obj"
|
bktName, objName := "bucket", "obj"
|
||||||
|
|
|
@ -215,7 +215,7 @@ type PrmObjectPatch struct {
|
||||||
Payload io.Reader
|
Payload io.Reader
|
||||||
|
|
||||||
// Object range to patch.
|
// Object range to patch.
|
||||||
Range *RangeParams
|
Offset, Length uint64
|
||||||
|
|
||||||
// Size of original object payload.
|
// Size of original object payload.
|
||||||
ObjectSize uint64
|
ObjectSize uint64
|
||||||
|
|
|
@ -430,12 +430,12 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm PrmObjectPatch) (oid.
|
||||||
}
|
}
|
||||||
|
|
||||||
var newPayload []byte
|
var newPayload []byte
|
||||||
if prm.Range.Start > 0 {
|
if prm.Offset > 0 {
|
||||||
newPayload = append(newPayload, obj.Payload()[:prm.Range.Start]...)
|
newPayload = append(newPayload, obj.Payload()[:prm.Offset]...)
|
||||||
}
|
}
|
||||||
newPayload = append(newPayload, patchBytes...)
|
newPayload = append(newPayload, patchBytes...)
|
||||||
if prm.Range.End < obj.PayloadSize()-1 {
|
if prm.Offset+prm.Length < obj.PayloadSize() {
|
||||||
newPayload = append(newPayload, obj.Payload()[prm.Range.End+1:]...)
|
newPayload = append(newPayload, obj.Payload()[prm.Offset+prm.Length:]...)
|
||||||
}
|
}
|
||||||
newObj.SetPayload(newPayload)
|
newObj.SetPayload(newPayload)
|
||||||
newObj.SetPayloadSize(uint64(len(newPayload)))
|
newObj.SetPayloadSize(uint64(len(newPayload)))
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package layer
|
package layer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
|
@ -163,11 +164,12 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
PatchObjectParams struct {
|
PatchObjectParams struct {
|
||||||
Object *data.ObjectInfo
|
Object *data.ExtendedObjectInfo
|
||||||
BktInfo *data.BucketInfo
|
BktInfo *data.BucketInfo
|
||||||
NewBytes io.Reader
|
NewBytes io.Reader
|
||||||
Range *RangeParams
|
Range *RangeParams
|
||||||
VersioningEnabled bool
|
VersioningEnabled bool
|
||||||
|
CopiesNumbers []uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateBucketParams stores bucket create request parameters.
|
// CreateBucketParams stores bucket create request parameters.
|
||||||
|
@ -546,63 +548,226 @@ func (n *Layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Exte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
|
func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||||
if p.Object.Headers[AttributeDecryptedSize] != "" {
|
if p.Object.ObjectInfo.Headers[AttributeDecryptedSize] != "" {
|
||||||
return nil, fmt.Errorf("patch encrypted object")
|
return nil, fmt.Errorf("patch encrypted object")
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Object.Headers[MultipartObjectSize] != "" {
|
if p.Object.ObjectInfo.Headers[MultipartObjectSize] != "" {
|
||||||
// TODO: support multipart object patch
|
return n.patchMultipartObject(ctx, p)
|
||||||
return nil, fmt.Errorf("patch multipart object")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
prmPatch := PrmObjectPatch{
|
prmPatch := PrmObjectPatch{
|
||||||
Container: p.BktInfo.CID,
|
Container: p.BktInfo.CID,
|
||||||
Object: p.Object.ID,
|
Object: p.Object.ObjectInfo.ID,
|
||||||
Payload: p.NewBytes,
|
Payload: p.NewBytes,
|
||||||
Range: p.Range,
|
Offset: p.Range.Start,
|
||||||
ObjectSize: p.Object.Size,
|
Length: p.Range.End - p.Range.Start + 1,
|
||||||
|
ObjectSize: p.Object.ObjectInfo.Size,
|
||||||
}
|
}
|
||||||
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
|
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
|
||||||
|
|
||||||
objID, err := n.frostFS.PatchObject(ctx, prmPatch)
|
createdObj, err := n.patchObject(ctx, prmPatch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("patch object: %w", err)
|
return nil, fmt.Errorf("patch object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := n.objectHead(ctx, p.BktInfo, objID)
|
newVersion := &data.NodeVersion{
|
||||||
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
|
OID: createdObj.ID,
|
||||||
|
ETag: hex.EncodeToString(createdObj.HashSum),
|
||||||
|
FilePath: p.Object.ObjectInfo.Name,
|
||||||
|
Size: createdObj.Size,
|
||||||
|
Created: &p.Object.ObjectInfo.Created,
|
||||||
|
Owner: &n.gateOwner,
|
||||||
|
CreationEpoch: p.Object.NodeVersion.CreationEpoch,
|
||||||
|
},
|
||||||
|
IsUnversioned: !p.VersioningEnabled,
|
||||||
|
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
||||||
|
}
|
||||||
|
|
||||||
|
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.Object.ObjectInfo.ID = createdObj.ID
|
||||||
|
p.Object.ObjectInfo.Size = createdObj.Size
|
||||||
|
p.Object.ObjectInfo.MD5Sum = ""
|
||||||
|
p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum)
|
||||||
|
p.Object.NodeVersion = newVersion
|
||||||
|
|
||||||
|
return p.Object, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) patchObject(ctx context.Context, p PrmObjectPatch) (*data.CreatedObjectInfo, error) {
|
||||||
|
objID, err := n.frostFS.PatchObject(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("patch object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
prmHead := PrmObjectHead{
|
||||||
|
PrmAuth: p.PrmAuth,
|
||||||
|
Container: p.Container,
|
||||||
|
Object: objID,
|
||||||
|
}
|
||||||
|
obj, err := n.frostFS.HeadObject(ctx, prmHead)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("head object: %w", err)
|
return nil, fmt.Errorf("head object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
payloadChecksum, _ := obj.PayloadChecksum()
|
payloadChecksum, _ := obj.PayloadChecksum()
|
||||||
hashSum := hex.EncodeToString(payloadChecksum.Value())
|
|
||||||
|
return &data.CreatedObjectInfo{
|
||||||
|
ID: objID,
|
||||||
|
Size: obj.PayloadSize(),
|
||||||
|
HashSum: payloadChecksum.Value(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||||
|
combinedObj, err := n.objectGet(ctx, p.BktInfo, p.Object.ObjectInfo.ID)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Should we configure buffer size? Should we configure buffer size?
|
|||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get combined object '%s': %w", p.Object.ObjectInfo.ID.EncodeToString(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var parts []*data.PartInfo
|
||||||
|
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||||
|
}
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
I don't understand why do we need I don't understand why do we need `|| (p.Range.Start == part.Size && i != len(parts)-1)`. If we really need this, let's add test for this condition. Currently without this condition tests pass
mbiryukova
commented
If range starts after the last part - we have to patch the last part. Otherwise, we move on to the next part. I will add test for this condition If range starts after the last part - we have to patch the last part. Otherwise, we move on to the next part. I will add test for this condition
dkirillov
commented
I still can freely delete this condition I still can freely delete this condition
mbiryukova
commented
Seems that it doesn't affect the result, but reduces number of patch requests. If it's ok, I can count their number and compare it with expected one Seems that it doesn't affect the result, but reduces number of patch requests. If it's ok, I can count their number and compare it with expected one
|
|||||||
|
|
||||||
|
prmPatch := PrmObjectPatch{
|
||||||
|
Container: p.BktInfo.CID,
|
||||||
|
}
|
||||||
|
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
|
||||||
|
|
||||||
|
off, ln := p.Range.Start, p.Range.End-p.Range.Start+1
|
||||||
|
var multipartObjectSize uint64
|
||||||
|
for i, part := range parts {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Can we use brand new range param. It's not easy to catch idea with Can we use brand new range param. It's not easy to catch idea with `p.Range.Start -= part.Size` etc.
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Probably it's matter of taste but what do you think about writing something like this
Probably it's matter of taste but what do you think about writing something like this
```diff
diff --git a/api/layer/layer.go b/api/layer/layer.go
index b8c2923f..ead72703 100644
--- a/api/layer/layer.go
+++ b/api/layer/layer.go
@@ -639,36 +639,22 @@ func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams)
}
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
- allPatchLen := p.Range.End - p.Range.Start + 1
- var multipartObjectSize, patchedLen uint64
+ off, ln := p.Range.Start, p.Range.End-p.Range.Start+1
+ var multipartObjectSize uint64
for i, part := range parts {
- if patchedLen == allPatchLen {
+ if off > part.Size || (off == part.Size && i != len(parts)-1) || ln == 0 {
multipartObjectSize += part.Size
+ if ln != 0 {
+ off -= part.Size
+ }
continue
}
- if p.Range.Start > part.Size || (p.Range.Start == part.Size && i != len(parts)-1) {
- multipartObjectSize += part.Size
- p.Range.Start -= part.Size
- p.Range.End -= part.Size
- continue
- }
-
- prmPatch.Object = part.OID
- prmPatch.ObjectSize = part.Size
- prmPatch.Offset = p.Range.Start
- prmPatch.Length = p.Range.End - p.Range.Start + 1
-
- if i != len(parts)-1 && prmPatch.Length+prmPatch.Offset > part.Size {
- prmPatch.Length = part.Size - prmPatch.Offset
- }
- patchedLen += prmPatch.Length
-
var createdObj *data.CreatedObjectInfo
- if prmPatch.Offset == 0 && prmPatch.Length >= part.Size {
+ if off == 0 && ln > part.Size && part.Size != 0 {
prm := PrmObjectCreate{
Container: p.BktInfo.CID,
- Payload: io.LimitReader(p.NewBytes, int64(prmPatch.Length)),
+ Payload: io.LimitReader(p.NewBytes, int64(part.Size)),
CreationTime: part.Created,
CopiesNumber: p.CopiesNumbers,
}
@@ -677,13 +663,25 @@ func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams)
if err != nil {
return nil, fmt.Errorf("put new part object '%s': %w", part.OID.EncodeToString(), err)
}
+ ln -= part.Size
} else {
+ curLen := ln
+ if off+curLen > part.Size && i != len(parts)-1 {
+ curLen = part.Size - off
+ }
+ prmPatch.Object = part.OID
+ prmPatch.ObjectSize = part.Size
+ prmPatch.Offset = off
+ prmPatch.Length = curLen
prmPatch.Payload = io.LimitReader(p.NewBytes, int64(prmPatch.Length))
createdObj, err = n.patchObject(ctx, prmPatch)
if err != nil {
return nil, fmt.Errorf("patch part object '%s': %w", part.OID.EncodeToString(), err)
}
+
+ ln -= curLen
+ off = 0
}
parts[i].OID = createdObj.ID
@@ -692,11 +690,6 @@ func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams)
parts[i].ETag = hex.EncodeToString(createdObj.HashSum)
multipartObjectSize += createdObj.Size
-
- if p.Range.Start > 0 {
- p.Range.Start = 0
- }
- p.Range.End -= part.Size
}
newParts, err := json.Marshal(parts)
```
mbiryukova
commented
Looks good, but in some cases we will make unnecessary requests (which are not made with the condition discussed in the comment above) Looks good, but in some cases we will make unnecessary requests (which are not made with the condition discussed in the comment above)
dkirillov
commented
I've updated diff in previous comment UPD: again I've updated diff in previous comment
**UPD**: again
|
|||||||
|
if off > part.Size || (off == part.Size && i != len(parts)-1) || ln == 0 {
|
||||||
|
multipartObjectSize += part.Size
|
||||||
|
if ln != 0 {
|
||||||
|
off -= part.Size
|
||||||
|
}
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
If we have to patch all part it's better to use single If we have to patch all part it's better to use single `put` method rather than `patch + head`
|
|||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var createdObj *data.CreatedObjectInfo
|
||||||
|
if off == 0 && ln >= part.Size {
|
||||||
|
curLen := part.Size
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Can we use Can we use `io.LimitReader(p.NewBytes, patchLen)` ?
|
|||||||
|
if i == len(parts)-1 {
|
||||||
|
curLen = ln
|
||||||
|
}
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Can we move to separate method patching part? Can we move to separate method patching part?
|
|||||||
|
prm := PrmObjectCreate{
|
||||||
|
Container: p.BktInfo.CID,
|
||||||
|
Payload: io.LimitReader(p.NewBytes, int64(curLen)),
|
||||||
|
CreationTime: part.Created,
|
||||||
|
CopiesNumber: p.CopiesNumbers,
|
||||||
|
}
|
||||||
|
|
||||||
|
createdObj, err = n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("put new part object '%s': %w", part.OID.EncodeToString(), err)
|
||||||
|
}
|
||||||
|
ln -= curLen
|
||||||
|
} else {
|
||||||
|
curLen := ln
|
||||||
|
if off+curLen > part.Size && i != len(parts)-1 {
|
||||||
|
curLen = part.Size - off
|
||||||
|
}
|
||||||
|
prmPatch.Object = part.OID
|
||||||
|
prmPatch.ObjectSize = part.Size
|
||||||
|
prmPatch.Offset = off
|
||||||
|
prmPatch.Length = curLen
|
||||||
|
|
||||||
|
prmPatch.Payload = io.LimitReader(p.NewBytes, int64(prmPatch.Length))
|
||||||
|
|
||||||
|
createdObj, err = n.patchObject(ctx, prmPatch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("patch part object '%s': %w", part.OID.EncodeToString(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ln -= curLen
|
||||||
|
off = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
parts[i].OID = createdObj.ID
|
||||||
|
parts[i].Size = createdObj.Size
|
||||||
|
parts[i].MD5 = ""
|
||||||
|
parts[i].ETag = hex.EncodeToString(createdObj.HashSum)
|
||||||
|
|
||||||
|
multipartObjectSize += createdObj.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
newParts, err := json.Marshal(parts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal parts for combined object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var headerParts strings.Builder
|
||||||
|
for i, part := range parts {
|
||||||
|
headerPart := part.ToHeaderString()
|
||||||
|
if i != len(parts)-1 {
|
||||||
|
headerPart += ","
|
||||||
|
}
|
||||||
|
headerParts.WriteString(headerPart)
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := PrmObjectCreate{
|
||||||
|
Container: p.BktInfo.CID,
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we use patch to update "combined" object? It seems we can use regular put Why do we use patch to update "combined" object? It seems we can use regular put
|
|||||||
|
PayloadSize: multipartObjectSize,
|
||||||
|
Filepath: p.Object.ObjectInfo.Name,
|
||||||
|
Payload: bytes.NewReader(newParts),
|
||||||
|
CreationTime: p.Object.ObjectInfo.Created,
|
||||||
|
CopiesNumber: p.CopiesNumbers,
|
||||||
|
}
|
||||||
|
|
||||||
|
prm.Attributes = make([][2]string, 0, len(p.Object.ObjectInfo.Headers)+1)
|
||||||
|
|
||||||
|
for k, v := range p.Object.ObjectInfo.Headers {
|
||||||
|
switch k {
|
||||||
|
case MultipartObjectSize:
|
||||||
|
prm.Attributes = append(prm.Attributes, [2]string{MultipartObjectSize, strconv.FormatUint(multipartObjectSize, 10)})
|
||||||
|
case UploadCompletedParts:
|
||||||
|
prm.Attributes = append(prm.Attributes, [2]string{UploadCompletedParts, headerParts.String()})
|
||||||
|
case api.ContentType:
|
||||||
|
default:
|
||||||
|
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
prm.Attributes = append(prm.Attributes, [2]string{api.ContentType, p.Object.ObjectInfo.ContentType})
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Probably we should rebase Probably we should rebase `feature/patch` before merge this PR?
|
|||||||
|
|
||||||
|
createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("put new combined object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
newVersion := &data.NodeVersion{
|
newVersion := &data.NodeVersion{
|
||||||
BaseNodeVersion: data.BaseNodeVersion{
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
OID: objID,
|
OID: createdObj.ID,
|
||||||
ETag: hashSum,
|
ETag: hex.EncodeToString(createdObj.HashSum),
|
||||||
FilePath: p.Object.Name,
|
MD5: hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts)),
|
||||||
Size: obj.PayloadSize(),
|
FilePath: p.Object.ObjectInfo.Name,
|
||||||
Created: &p.Object.Created,
|
Size: multipartObjectSize,
|
||||||
Owner: &n.gateOwner,
|
Created: &p.Object.ObjectInfo.Created,
|
||||||
// TODO: Add creation epoch
|
Owner: &n.gateOwner,
|
||||||
|
CreationEpoch: p.Object.NodeVersion.CreationEpoch,
|
||||||
},
|
},
|
||||||
IsUnversioned: !p.VersioningEnabled,
|
IsUnversioned: !p.VersioningEnabled,
|
||||||
IsCombined: p.Object.Headers[MultipartObjectSize] != "",
|
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
||||||
}
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Object.ID = objID
|
p.Object.ObjectInfo.ID = createdObj.ID
|
||||||
p.Object.Size = obj.PayloadSize()
|
p.Object.ObjectInfo.Size = createdObj.Size
|
||||||
p.Object.MD5Sum = ""
|
p.Object.ObjectInfo.MD5Sum = hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts))
|
||||||
p.Object.HashSum = hashSum
|
p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum)
|
||||||
|
p.Object.ObjectInfo.Headers[MultipartObjectSize] = strconv.FormatUint(multipartObjectSize, 10)
|
||||||
|
p.Object.ObjectInfo.Headers[UploadCompletedParts] = headerParts.String()
|
||||||
|
p.Object.NodeVersion = newVersion
|
||||||
|
|
||||||
return &data.ExtendedObjectInfo{
|
return p.Object, nil
|
||||||
ObjectInfo: p.Object,
|
|
||||||
NodeVersion: newVersion,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRandomOID() (oid.ID, error) {
|
func getRandomOID() (oid.ID, error) {
|
||||||
|
|
|
@ -153,7 +153,7 @@ The request returns the following data in XML format.
|
||||||
|
|
||||||
- **ETag**
|
- **ETag**
|
||||||
|
|
||||||
Patched object tag. Always in SHA-256 format.
|
Patched object tag. For regular objects always in SHA-256 format.
|
||||||
|
|
||||||
If the bucket is versioned, the **_x-amz-version-id_** header is returned with the version of the created object.
|
If the bucket is versioned, the **_x-amz-version-id_** header is returned with the version of the created object.
|
||||||
|
|
||||||
|
|
|
@ -412,10 +412,10 @@ func (x *FrostFS) PatchObject(ctx context.Context, prm layer.PrmObjectPatch) (oi
|
||||||
prmPatch.SetAddress(addr)
|
prmPatch.SetAddress(addr)
|
||||||
|
|
||||||
var rng object.Range
|
var rng object.Range
|
||||||
rng.SetOffset(prm.Range.Start)
|
rng.SetOffset(prm.Offset)
|
||||||
rng.SetLength(prm.Range.End - prm.Range.Start + 1)
|
rng.SetLength(prm.Length)
|
||||||
if prm.Range.End >= prm.ObjectSize {
|
if prm.Length+prm.Offset > prm.ObjectSize {
|
||||||
rng.SetLength(prm.ObjectSize - prm.Range.Start)
|
rng.SetLength(prm.ObjectSize - prm.Offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
prmPatch.SetRange(&rng)
|
prmPatch.SetRange(&rng)
|
||||||
|
|
Maybe it's better to use
t.Run