[#466] Implement PATCH for multipart objects
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
a2e0b92575
commit
d6b506f6d9
8 changed files with 468 additions and 40 deletions
|
@ -7,6 +7,7 @@ This document outlines major changes between releases.
|
||||||
### Added
|
### Added
|
||||||
- Add support for virtual hosted style addressing (#446, #449)
|
- Add support for virtual hosted style addressing (#446, #449)
|
||||||
- Support new param `frostfs.graceful_close_on_switch_timeout` (#475)
|
- Support new param `frostfs.graceful_close_on_switch_timeout` (#475)
|
||||||
|
- Support patch object method (#479)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Update go version to go1.19 (#470)
|
- Update go version to go1.19 (#470)
|
||||||
|
|
|
@ -95,13 +95,19 @@ func (h *handler) PatchObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
params := &layer.PatchObjectParams{
|
params := &layer.PatchObjectParams{
|
||||||
Object: srcObjInfo,
|
Object: extendedSrcObjInfo,
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
NewBytes: r.Body,
|
NewBytes: r.Body,
|
||||||
Range: byteRange,
|
Range: byteRange,
|
||||||
VersioningEnabled: settings.VersioningEnabled(),
|
VersioningEnabled: settings.VersioningEnabled(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
params.CopiesNumbers, err = h.pickCopiesNumbers(nil, reqInfo.Namespace, bktInfo.LocationConstraint)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
extendedObjInfo, err := h.obj.PatchObject(ctx, params)
|
extendedObjInfo, err := h.obj.PatchObject(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrObjectLocked(err) {
|
if isErrObjectLocked(err) {
|
||||||
|
@ -112,7 +118,10 @@ func (h *handler) PatchObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID())
|
if settings.VersioningEnabled() {
|
||||||
|
w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID())
|
||||||
|
}
|
||||||
|
|
||||||
w.Header().Set(api.ETag, data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled())))
|
w.Header().Set(api.ETag, data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled())))
|
||||||
|
|
||||||
resp := PatchObjectResult{
|
resp := PatchObjectResult{
|
||||||
|
|
|
@ -3,6 +3,7 @@ package handler
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
|
"crypto/rand"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
|
@ -107,6 +108,237 @@ func TestPatch(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPatchMultipartObject(t *testing.T) {
|
||||||
|
tc := prepareHandlerContextWithMinCache(t)
|
||||||
|
tc.config.md5Enabled = true
|
||||||
|
|
||||||
|
bktName, objName, partSize := "bucket-for-multipart-patch", "object-for-multipart-patch", 5*1024*1024
|
||||||
|
createTestBucket(tc, bktName)
|
||||||
|
|
||||||
|
t.Run("patch beginning of the first part", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchSize := partSize / 2
|
||||||
|
patchBody := make([]byte, patchSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(patchSize-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{patchBody, data1[patchSize:], data2, data3}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch middle of the first part", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchSize := partSize / 2
|
||||||
|
patchBody := make([]byte, patchSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/4)+"-"+strconv.Itoa(partSize*3/4-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/4], patchBody, data1[partSize*3/4:], data2, data3}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch first and second parts", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchSize := partSize / 2
|
||||||
|
patchBody := make([]byte, patchSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*3/4)+"-"+strconv.Itoa(partSize*5/4-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize*3/4], patchBody, data2[partSize/4:], data3}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch all parts", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchSize := partSize * 2
|
||||||
|
patchBody := make([]byte, patchSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/2-1)+"-"+strconv.Itoa(partSize/2+patchSize-2)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/2-1], patchBody, data3[partSize/2-1:]}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch all parts and append bytes", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchSize := partSize * 3
|
||||||
|
patchBody := make([]byte, patchSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/2)+"-"+strconv.Itoa(partSize/2+patchSize-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/2], patchBody}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*7/2, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch second part", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize)+"-"+strconv.Itoa(partSize*2-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1, patchBody, data3}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch last part, equal size", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2)+"-"+strconv.Itoa(partSize*3-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1, data2, patchBody}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch last part, increase size", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize+1)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2)+"-"+strconv.Itoa(partSize*3)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1, data2, patchBody}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3+1, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch last part with offset and append bytes", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2+3)+"-"+strconv.Itoa(partSize*3+2)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1, data2, data3[:3], patchBody}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*3+3, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("append bytes", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
|
||||||
|
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
|
||||||
|
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*3)+"-"+strconv.Itoa(partSize*4-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, bytes.Join([][]byte{data1, data2, data3, patchBody}, []byte("")), object)
|
||||||
|
require.Equal(t, partSize*4, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("patch empty multipart", func(t *testing.T) {
|
||||||
|
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
|
||||||
|
etag, _ := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, 0)
|
||||||
|
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag})
|
||||||
|
|
||||||
|
patchBody := make([]byte, partSize)
|
||||||
|
_, err := rand.Read(patchBody)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(partSize-1)+"/*", patchBody, nil)
|
||||||
|
object, header := getObject(tc, bktName, objName)
|
||||||
|
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
|
||||||
|
require.NoError(t, err)
|
||||||
|
equalDataSlices(t, patchBody, object)
|
||||||
|
require.Equal(t, partSize, contentLen)
|
||||||
|
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-1"))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestPatchWithVersion(t *testing.T) {
|
func TestPatchWithVersion(t *testing.T) {
|
||||||
hc := prepareHandlerContextWithMinCache(t)
|
hc := prepareHandlerContextWithMinCache(t)
|
||||||
bktName, objName := "bucket", "obj"
|
bktName, objName := "bucket", "obj"
|
||||||
|
|
|
@ -215,7 +215,7 @@ type PrmObjectPatch struct {
|
||||||
Payload io.Reader
|
Payload io.Reader
|
||||||
|
|
||||||
// Object range to patch.
|
// Object range to patch.
|
||||||
Range *RangeParams
|
Offset, Length uint64
|
||||||
|
|
||||||
// Size of original object payload.
|
// Size of original object payload.
|
||||||
ObjectSize uint64
|
ObjectSize uint64
|
||||||
|
|
|
@ -430,12 +430,12 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm PrmObjectPatch) (oid.
|
||||||
}
|
}
|
||||||
|
|
||||||
var newPayload []byte
|
var newPayload []byte
|
||||||
if prm.Range.Start > 0 {
|
if prm.Offset > 0 {
|
||||||
newPayload = append(newPayload, obj.Payload()[:prm.Range.Start]...)
|
newPayload = append(newPayload, obj.Payload()[:prm.Offset]...)
|
||||||
}
|
}
|
||||||
newPayload = append(newPayload, patchBytes...)
|
newPayload = append(newPayload, patchBytes...)
|
||||||
if prm.Range.End < obj.PayloadSize()-1 {
|
if prm.Offset+prm.Length < obj.PayloadSize() {
|
||||||
newPayload = append(newPayload, obj.Payload()[prm.Range.End+1:]...)
|
newPayload = append(newPayload, obj.Payload()[prm.Offset+prm.Length:]...)
|
||||||
}
|
}
|
||||||
newObj.SetPayload(newPayload)
|
newObj.SetPayload(newPayload)
|
||||||
newObj.SetPayloadSize(uint64(len(newPayload)))
|
newObj.SetPayloadSize(uint64(len(newPayload)))
|
||||||
|
|
|
@ -1,78 +1,264 @@
|
||||||
package layer
|
package layer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PatchObjectParams struct {
|
type PatchObjectParams struct {
|
||||||
Object *data.ObjectInfo
|
Object *data.ExtendedObjectInfo
|
||||||
BktInfo *data.BucketInfo
|
BktInfo *data.BucketInfo
|
||||||
NewBytes io.Reader
|
NewBytes io.Reader
|
||||||
Range *RangeParams
|
Range *RangeParams
|
||||||
VersioningEnabled bool
|
VersioningEnabled bool
|
||||||
|
CopiesNumbers []uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
|
func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||||
if p.Object.Headers[AttributeDecryptedSize] != "" {
|
if p.Object.ObjectInfo.Headers[AttributeDecryptedSize] != "" {
|
||||||
return nil, fmt.Errorf("patch encrypted object")
|
return nil, fmt.Errorf("patch encrypted object")
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Object.Headers[MultipartObjectSize] != "" {
|
if p.Object.ObjectInfo.Headers[MultipartObjectSize] != "" {
|
||||||
// TODO: support multipart object patch
|
return n.patchMultipartObject(ctx, p)
|
||||||
return nil, fmt.Errorf("patch multipart object")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
prmPatch := PrmObjectPatch{
|
prmPatch := PrmObjectPatch{
|
||||||
Container: p.BktInfo.CID,
|
Container: p.BktInfo.CID,
|
||||||
Object: p.Object.ID,
|
Object: p.Object.ObjectInfo.ID,
|
||||||
Payload: p.NewBytes,
|
Payload: p.NewBytes,
|
||||||
Range: p.Range,
|
Offset: p.Range.Start,
|
||||||
ObjectSize: p.Object.Size,
|
Length: p.Range.End - p.Range.Start + 1,
|
||||||
|
ObjectSize: p.Object.ObjectInfo.Size,
|
||||||
}
|
}
|
||||||
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
|
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
|
||||||
|
|
||||||
objID, err := n.frostFS.PatchObject(ctx, prmPatch)
|
createdObj, err := n.patchObject(ctx, prmPatch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("patch object: %w", err)
|
return nil, fmt.Errorf("patch object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := n.objectHead(ctx, p.BktInfo, objID)
|
newVersion := &data.NodeVersion{
|
||||||
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
|
OID: createdObj.ID,
|
||||||
|
ETag: hex.EncodeToString(createdObj.HashSum),
|
||||||
|
FilePath: p.Object.ObjectInfo.Name,
|
||||||
|
Size: createdObj.Size,
|
||||||
|
Created: &p.Object.ObjectInfo.Created,
|
||||||
|
Owner: &n.gateOwner,
|
||||||
|
CreationEpoch: p.Object.NodeVersion.CreationEpoch,
|
||||||
|
},
|
||||||
|
IsUnversioned: !p.VersioningEnabled,
|
||||||
|
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
||||||
|
}
|
||||||
|
|
||||||
|
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.Object.ObjectInfo.ID = createdObj.ID
|
||||||
|
p.Object.ObjectInfo.Size = createdObj.Size
|
||||||
|
p.Object.ObjectInfo.MD5Sum = ""
|
||||||
|
p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum)
|
||||||
|
p.Object.NodeVersion = newVersion
|
||||||
|
|
||||||
|
return p.Object, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) patchObject(ctx context.Context, p PrmObjectPatch) (*data.CreatedObjectInfo, error) {
|
||||||
|
objID, err := n.frostFS.PatchObject(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("patch object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
prmHead := PrmObjectHead{
|
||||||
|
PrmAuth: p.PrmAuth,
|
||||||
|
Container: p.Container,
|
||||||
|
Object: objID,
|
||||||
|
}
|
||||||
|
obj, err := n.frostFS.HeadObject(ctx, prmHead)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("head object: %w", err)
|
return nil, fmt.Errorf("head object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
payloadChecksum, _ := obj.PayloadChecksum()
|
payloadChecksum, _ := obj.PayloadChecksum()
|
||||||
hashSum := hex.EncodeToString(payloadChecksum.Value())
|
|
||||||
|
return &data.CreatedObjectInfo{
|
||||||
|
ID: objID,
|
||||||
|
Size: obj.PayloadSize(),
|
||||||
|
HashSum: payloadChecksum.Value(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||||
|
combinedObj, err := n.objectGet(ctx, p.BktInfo, p.Object.ObjectInfo.ID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get combined object '%s': %w", p.Object.ObjectInfo.ID.EncodeToString(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var parts []*data.PartInfo
|
||||||
|
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
prmPatch := PrmObjectPatch{
|
||||||
|
Container: p.BktInfo.CID,
|
||||||
|
}
|
||||||
|
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
|
||||||
|
|
||||||
|
off, ln := p.Range.Start, p.Range.End-p.Range.Start+1
|
||||||
|
var multipartObjectSize uint64
|
||||||
|
for i, part := range parts {
|
||||||
|
if off > part.Size || (off == part.Size && i != len(parts)-1) || ln == 0 {
|
||||||
|
multipartObjectSize += part.Size
|
||||||
|
if ln != 0 {
|
||||||
|
off -= part.Size
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var createdObj *data.CreatedObjectInfo
|
||||||
|
createdObj, off, ln, err = n.patchPart(ctx, part, p, &prmPatch, off, ln, i == len(parts)-1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("patch part: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
parts[i].OID = createdObj.ID
|
||||||
|
parts[i].Size = createdObj.Size
|
||||||
|
parts[i].MD5 = ""
|
||||||
|
parts[i].ETag = hex.EncodeToString(createdObj.HashSum)
|
||||||
|
|
||||||
|
multipartObjectSize += createdObj.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
return n.updateCombinedObject(ctx, parts, multipartObjectSize, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns patched part info, updated offset and length.
|
||||||
|
func (n *Layer) patchPart(ctx context.Context, part *data.PartInfo, p *PatchObjectParams, prmPatch *PrmObjectPatch, off, ln uint64, lastPart bool) (*data.CreatedObjectInfo, uint64, uint64, error) {
|
||||||
|
if off == 0 && ln >= part.Size {
|
||||||
|
curLen := part.Size
|
||||||
|
if lastPart {
|
||||||
|
curLen = ln
|
||||||
|
}
|
||||||
|
prm := PrmObjectCreate{
|
||||||
|
Container: p.BktInfo.CID,
|
||||||
|
Payload: io.LimitReader(p.NewBytes, int64(curLen)),
|
||||||
|
CreationTime: part.Created,
|
||||||
|
CopiesNumber: p.CopiesNumbers,
|
||||||
|
}
|
||||||
|
|
||||||
|
createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, 0, fmt.Errorf("put new part object '%s': %w", part.OID.EncodeToString(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ln -= curLen
|
||||||
|
|
||||||
|
return createdObj, off, ln, err
|
||||||
|
}
|
||||||
|
|
||||||
|
curLen := ln
|
||||||
|
if off+curLen > part.Size && !lastPart {
|
||||||
|
curLen = part.Size - off
|
||||||
|
}
|
||||||
|
prmPatch.Object = part.OID
|
||||||
|
prmPatch.ObjectSize = part.Size
|
||||||
|
prmPatch.Offset = off
|
||||||
|
prmPatch.Length = curLen
|
||||||
|
|
||||||
|
prmPatch.Payload = io.LimitReader(p.NewBytes, int64(prmPatch.Length))
|
||||||
|
|
||||||
|
createdObj, err := n.patchObject(ctx, *prmPatch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, 0, fmt.Errorf("patch part object '%s': %w", part.OID.EncodeToString(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ln -= curLen
|
||||||
|
off = 0
|
||||||
|
|
||||||
|
return createdObj, off, ln, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) updateCombinedObject(ctx context.Context, parts []*data.PartInfo, fullObjSize uint64, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||||
|
newParts, err := json.Marshal(parts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal parts for combined object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var headerParts strings.Builder
|
||||||
|
for i, part := range parts {
|
||||||
|
headerPart := part.ToHeaderString()
|
||||||
|
if i != len(parts)-1 {
|
||||||
|
headerPart += ","
|
||||||
|
}
|
||||||
|
headerParts.WriteString(headerPart)
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := PrmObjectCreate{
|
||||||
|
Container: p.BktInfo.CID,
|
||||||
|
PayloadSize: fullObjSize,
|
||||||
|
Filepath: p.Object.ObjectInfo.Name,
|
||||||
|
Payload: bytes.NewReader(newParts),
|
||||||
|
CreationTime: p.Object.ObjectInfo.Created,
|
||||||
|
CopiesNumber: p.CopiesNumbers,
|
||||||
|
}
|
||||||
|
|
||||||
|
prm.Attributes = make([][2]string, 0, len(p.Object.ObjectInfo.Headers)+1)
|
||||||
|
|
||||||
|
for k, v := range p.Object.ObjectInfo.Headers {
|
||||||
|
switch k {
|
||||||
|
case MultipartObjectSize:
|
||||||
|
prm.Attributes = append(prm.Attributes, [2]string{MultipartObjectSize, strconv.FormatUint(fullObjSize, 10)})
|
||||||
|
case UploadCompletedParts:
|
||||||
|
prm.Attributes = append(prm.Attributes, [2]string{UploadCompletedParts, headerParts.String()})
|
||||||
|
case api.ContentType:
|
||||||
|
default:
|
||||||
|
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
prm.Attributes = append(prm.Attributes, [2]string{api.ContentType, p.Object.ObjectInfo.ContentType})
|
||||||
|
|
||||||
|
createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("put new combined object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
newVersion := &data.NodeVersion{
|
newVersion := &data.NodeVersion{
|
||||||
BaseNodeVersion: data.BaseNodeVersion{
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
OID: objID,
|
OID: createdObj.ID,
|
||||||
ETag: hashSum,
|
ETag: hex.EncodeToString(createdObj.HashSum),
|
||||||
FilePath: p.Object.Name,
|
MD5: hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts)),
|
||||||
Size: obj.PayloadSize(),
|
FilePath: p.Object.ObjectInfo.Name,
|
||||||
Created: &p.Object.Created,
|
Size: fullObjSize,
|
||||||
Owner: &n.gateOwner,
|
Created: &p.Object.ObjectInfo.Created,
|
||||||
// TODO: Add creation epoch
|
Owner: &n.gateOwner,
|
||||||
|
CreationEpoch: p.Object.NodeVersion.CreationEpoch,
|
||||||
},
|
},
|
||||||
IsUnversioned: !p.VersioningEnabled,
|
IsUnversioned: !p.VersioningEnabled,
|
||||||
IsCombined: p.Object.Headers[MultipartObjectSize] != "",
|
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
||||||
}
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Object.ID = objID
|
p.Object.ObjectInfo.ID = createdObj.ID
|
||||||
p.Object.Size = obj.PayloadSize()
|
p.Object.ObjectInfo.Size = createdObj.Size
|
||||||
p.Object.MD5Sum = ""
|
p.Object.ObjectInfo.MD5Sum = hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts))
|
||||||
p.Object.HashSum = hashSum
|
p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum)
|
||||||
|
p.Object.ObjectInfo.Headers[MultipartObjectSize] = strconv.FormatUint(fullObjSize, 10)
|
||||||
|
p.Object.ObjectInfo.Headers[UploadCompletedParts] = headerParts.String()
|
||||||
|
p.Object.NodeVersion = newVersion
|
||||||
|
|
||||||
return &data.ExtendedObjectInfo{
|
return p.Object, nil
|
||||||
ObjectInfo: p.Object,
|
|
||||||
NodeVersion: newVersion,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -153,7 +153,7 @@ The request returns the following data in XML format.
|
||||||
|
|
||||||
- **ETag**
|
- **ETag**
|
||||||
|
|
||||||
Patched object tag. Always in SHA-256 format.
|
Patched object tag. For regular objects always in SHA-256 format.
|
||||||
|
|
||||||
If the bucket is versioned, the **_x-amz-version-id_** header is returned with the version of the created object.
|
If the bucket is versioned, the **_x-amz-version-id_** header is returned with the version of the created object.
|
||||||
|
|
||||||
|
|
|
@ -412,10 +412,10 @@ func (x *FrostFS) PatchObject(ctx context.Context, prm layer.PrmObjectPatch) (oi
|
||||||
prmPatch.SetAddress(addr)
|
prmPatch.SetAddress(addr)
|
||||||
|
|
||||||
var rng object.Range
|
var rng object.Range
|
||||||
rng.SetOffset(prm.Range.Start)
|
rng.SetOffset(prm.Offset)
|
||||||
rng.SetLength(prm.Range.End - prm.Range.Start + 1)
|
rng.SetLength(prm.Length)
|
||||||
if prm.Range.End >= prm.ObjectSize {
|
if prm.Length+prm.Offset > prm.ObjectSize {
|
||||||
rng.SetLength(prm.ObjectSize - prm.Range.Start)
|
rng.SetLength(prm.ObjectSize - prm.Offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
prmPatch.SetRange(&rng)
|
prmPatch.SetRange(&rng)
|
||||||
|
|
Loading…
Reference in a new issue