[#466] Implement PATCH for multipart objects
All checks were successful
/ DCO (pull_request) Successful in 1m15s
/ Builds (1.22) (pull_request) Successful in 1m18s
/ Builds (1.23) (pull_request) Successful in 1m22s
/ Vulncheck (pull_request) Successful in 1m25s
/ Lint (pull_request) Successful in 2m37s
/ Tests (1.22) (pull_request) Successful in 1m56s
/ Tests (1.23) (pull_request) Successful in 1m54s

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-08-20 12:35:07 +03:00
parent 25deae5d67
commit 9f9ef38fdc
6 changed files with 358 additions and 39 deletions

View file

@ -95,13 +95,19 @@ func (h *handler) PatchObjectHandler(w http.ResponseWriter, r *http.Request) {
}
params := &layer.PatchObjectParams{
Object: srcObjInfo,
Object: extendedSrcObjInfo,
BktInfo: bktInfo,
NewBytes: r.Body,
Range: byteRange,
VersioningEnabled: settings.VersioningEnabled(),
}
params.CopiesNumbers, err = h.pickCopiesNumbers(nil, reqInfo.Namespace, bktInfo.LocationConstraint)
if err != nil {
h.logAndSendError(w, "invalid copies number", reqInfo, err)
return
}
extendedObjInfo, err := h.obj.PatchObject(ctx, params)
if err != nil {
if isErrObjectLocked(err) {
@ -112,7 +118,10 @@ func (h *handler) PatchObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID())
if settings.VersioningEnabled() {
w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID())
}
w.Header().Set(api.ETag, data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled())))
resp := PatchObjectResult{

View file

@ -3,6 +3,7 @@ package handler
import (
"bytes"
"crypto/md5"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/xml"
@ -107,6 +108,148 @@ func TestPatch(t *testing.T) {
}
}
func TestPatchMultipartObject(t *testing.T) {
tc := prepareHandlerContextWithMinCache(t)
tc.config.md5Enabled = true
bktName, objName, partSize := "bucket-for-multipart-patch", "object-for-multipart-patch", 5*1024*1024
createTestBucket(tc, bktName)
// patch beginning of the first part
multipartInfo := createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 := uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchSize := partSize / 2
patchBody := make([]byte, patchSize)
_, err := rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes 0-"+strconv.Itoa(patchSize-1)+"/*", patchBody, nil)
object, header := getObject(tc, bktName, objName)
contentLen, err := strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{patchBody, data1[patchSize:], data2, data3}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
// patch middle of the first part
multipartInfo = createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/4)+"-"+strconv.Itoa(partSize*3/4-1)+"/*", patchBody, nil)
object, header = getObject(tc, bktName, objName)
contentLen, err = strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/4], patchBody, data1[partSize*3/4:], data2, data3}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
// patch first and second parts
multipartInfo = createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*3/4)+"-"+strconv.Itoa(partSize*5/4-1)+"/*", patchBody, nil)
object, header = getObject(tc, bktName, objName)
contentLen, err = strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize*3/4], patchBody, data2[partSize/4:], data3}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
// patch all parts
multipartInfo = createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, _ = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchSize = partSize * 2
patchBody = make([]byte, patchSize)
_, err = rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize/2-1)+"-"+strconv.Itoa(partSize/2+patchSize-2)+"/*", patchBody, nil)
object, header = getObject(tc, bktName, objName)
contentLen, err = strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1[:partSize/2-1], patchBody, data3[partSize/2-1:]}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
// patch second part
multipartInfo = createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, _ = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchBody = make([]byte, partSize)
_, err = rand.Read(patchBody)
require.NoError(t, err)
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize)+"-"+strconv.Itoa(partSize*2-1)+"/*", patchBody, nil)
object, header = getObject(tc, bktName, objName)
contentLen, err = strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1, patchBody, data3}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
// patch last part
multipartInfo = createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, _ = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2)+"-"+strconv.Itoa(partSize*3-1)+"/*", patchBody, nil)
object, header = getObject(tc, bktName, objName)
contentLen, err = strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1, data2, patchBody}, []byte("")), object)
require.Equal(t, partSize*3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
// patch last part and append bytes
multipartInfo = createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*2+3)+"-"+strconv.Itoa(partSize*3+2)+"/*", patchBody, nil)
object, header = getObject(tc, bktName, objName)
contentLen, err = strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1, data2, data3[:3], patchBody}, []byte("")), object)
require.Equal(t, partSize*3+3, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
// append bytes
multipartInfo = createMultipartUpload(tc, bktName, objName, map[string]string{})
etag1, data1 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 1, partSize)
etag2, data2 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 2, partSize)
etag3, data3 = uploadPart(tc, bktName, objName, multipartInfo.UploadID, 3, partSize)
completeMultipartUpload(tc, bktName, objName, multipartInfo.UploadID, []string{etag1, etag2, etag3})
patchObject(t, tc, bktName, objName, "bytes "+strconv.Itoa(partSize*3)+"-"+strconv.Itoa(partSize*4-1)+"/*", patchBody, nil)
object, header = getObject(tc, bktName, objName)
contentLen, err = strconv.Atoi(header.Get(api.ContentLength))
require.NoError(t, err)
equalDataSlices(t, bytes.Join([][]byte{data1, data2, data3, patchBody}, []byte("")), object)
require.Equal(t, partSize*4, contentLen)
require.True(t, strings.HasSuffix(data.UnQuote(header.Get(api.ETag)), "-3"))
}
func TestPatchWithVersion(t *testing.T) {
hc := prepareHandlerContextWithMinCache(t)
bktName, objName := "bucket", "obj"

View file

@ -215,7 +215,7 @@ type PrmObjectPatch struct {
Payload io.Reader
// Object range to patch.
Range *RangeParams
Offset, Length uint64
// Size of original object payload.
ObjectSize uint64

View file

@ -430,12 +430,12 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm PrmObjectPatch) (oid.
}
var newPayload []byte
if prm.Range.Start > 0 {
newPayload = append(newPayload, obj.Payload()[:prm.Range.Start]...)
if prm.Offset > 0 {
newPayload = append(newPayload, obj.Payload()[:prm.Offset]...)
}
newPayload = append(newPayload, patchBytes...)
if prm.Range.End < obj.PayloadSize()-1 {
newPayload = append(newPayload, obj.Payload()[prm.Range.End+1:]...)
if prm.Offset+prm.Length < obj.PayloadSize() {
newPayload = append(newPayload, obj.Payload()[prm.Offset+prm.Length:]...)
}
newObj.SetPayload(newPayload)
newObj.SetPayloadSize(uint64(len(newPayload)))

View file

@ -1,6 +1,7 @@
package layer
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/rand"
@ -163,11 +164,12 @@ type (
}
PatchObjectParams struct {
Object *data.ObjectInfo
Object *data.ExtendedObjectInfo
BktInfo *data.BucketInfo
NewBytes io.Reader
Range *RangeParams
VersioningEnabled bool
CopiesNumbers []uint32
}
// CreateBucketParams stores bucket create request parameters.
@ -546,63 +548,228 @@ func (n *Layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Exte
}
func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
if p.Object.Headers[AttributeDecryptedSize] != "" {
if p.Object.ObjectInfo.Headers[AttributeDecryptedSize] != "" {
return nil, fmt.Errorf("patch encrypted object")
}
if p.Object.Headers[MultipartObjectSize] != "" {
// TODO: support multipart object patch
return nil, fmt.Errorf("patch multipart object")
if p.Object.ObjectInfo.Headers[MultipartObjectSize] != "" {
return n.patchMultipartObject(ctx, p)
}
prmPatch := PrmObjectPatch{
Container: p.BktInfo.CID,
Object: p.Object.ID,
Object: p.Object.ObjectInfo.ID,
Payload: p.NewBytes,
Range: p.Range,
ObjectSize: p.Object.Size,
Offset: p.Range.Start,
Length: p.Range.End - p.Range.Start + 1,
ObjectSize: p.Object.ObjectInfo.Size,
}
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
objID, err := n.frostFS.PatchObject(ctx, prmPatch)
createdObj, err := n.patchObject(ctx, prmPatch)
if err != nil {
return nil, fmt.Errorf("patch object: %w", err)
}
obj, err := n.objectHead(ctx, p.BktInfo, objID)
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: createdObj.ID,
ETag: hex.EncodeToString(createdObj.HashSum),
FilePath: p.Object.ObjectInfo.Name,
Size: createdObj.Size,
Created: &p.Object.ObjectInfo.Created,
Owner: &n.gateOwner,
CreationEpoch: p.Object.NodeVersion.CreationEpoch,
},
IsUnversioned: !p.VersioningEnabled,
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
}
p.Object.ObjectInfo.ID = createdObj.ID
p.Object.ObjectInfo.Size = createdObj.Size
p.Object.ObjectInfo.MD5Sum = ""
p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum)
p.Object.NodeVersion = newVersion
return p.Object, nil
}
func (n *Layer) patchObject(ctx context.Context, p PrmObjectPatch) (*data.CreatedObjectInfo, error) {
objID, err := n.frostFS.PatchObject(ctx, p)
if err != nil {
return nil, fmt.Errorf("patch object: %w", err)
}
prmHead := PrmObjectHead{
PrmAuth: p.PrmAuth,
Container: p.Container,
Object: objID,
}
obj, err := n.frostFS.HeadObject(ctx, prmHead)
if err != nil {
return nil, fmt.Errorf("head object: %w", err)
}
payloadChecksum, _ := obj.PayloadChecksum()
hashSum := hex.EncodeToString(payloadChecksum.Value())
return &data.CreatedObjectInfo{
ID: objID,
Size: obj.PayloadSize(),
HashSum: payloadChecksum.Value(),
}, nil
}
func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
combinedObj, err := n.objectGet(ctx, p.BktInfo, p.Object.ObjectInfo.ID)
if err != nil {
return nil, fmt.Errorf("get combined object '%s': %w", p.Object.ObjectInfo.ID.EncodeToString(), err)
}
var parts []*data.PartInfo
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
}
prmPatch := PrmObjectPatch{
Container: p.BktInfo.CID,
}
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
allPatchLen := p.Range.End - p.Range.Start + 1
var multipartObjectSize, patchedLen uint64
for i, part := range parts {
if patchedLen == allPatchLen {
multipartObjectSize += part.Size
continue
}
if p.Range.Start > part.Size || (p.Range.Start == part.Size && i != len(parts)-1) {
multipartObjectSize += part.Size
p.Range.Start -= part.Size
p.Range.End -= part.Size
continue
}
prmPatch.Object = part.OID
prmPatch.ObjectSize = part.Size
prmPatch.Offset = p.Range.Start
prmPatch.Length = p.Range.End - p.Range.Start + 1
if i != len(parts)-1 && prmPatch.Length+prmPatch.Offset > part.Size {
prmPatch.Length = part.Size - prmPatch.Offset
}
patchedLen += prmPatch.Length
var createdObj *data.CreatedObjectInfo
if prmPatch.Offset == 0 && prmPatch.Length >= part.Size {
prm := PrmObjectCreate{
Container: p.BktInfo.CID,
Payload: io.LimitReader(p.NewBytes, int64(prmPatch.Length)),
CreationTime: part.Created,
CopiesNumber: p.CopiesNumbers,
}
createdObj, err = n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, fmt.Errorf("put new part object '%s': %w", part.OID.EncodeToString(), err)
}
} else {
prmPatch.Payload = io.LimitReader(p.NewBytes, int64(prmPatch.Length))
createdObj, err = n.patchObject(ctx, prmPatch)
if err != nil {
return nil, fmt.Errorf("patch part object '%s': %w", part.OID.EncodeToString(), err)
}
}
parts[i].OID = createdObj.ID
parts[i].Size = createdObj.Size
parts[i].MD5 = ""
parts[i].ETag = hex.EncodeToString(createdObj.HashSum)
multipartObjectSize += createdObj.Size
if p.Range.Start > 0 {
p.Range.Start = 0
}
p.Range.End -= part.Size
}
newParts, err := json.Marshal(parts)
if err != nil {
return nil, fmt.Errorf("marshal parts for combined object: %w", err)
}
var headerParts strings.Builder
for i, part := range parts {
headerPart := part.ToHeaderString()
if i != len(parts)-1 {
headerPart += ","
}
headerParts.WriteString(headerPart)
}
prm := PrmObjectCreate{
Container: p.BktInfo.CID,
PayloadSize: multipartObjectSize,
Filepath: p.Object.ObjectInfo.Name,
Payload: bytes.NewReader(newParts),
CreationTime: p.Object.ObjectInfo.Created,
CopiesNumber: p.CopiesNumbers,
}
prm.Attributes = make([][2]string, 0, len(p.Object.ObjectInfo.Headers)+1)
for k, v := range p.Object.ObjectInfo.Headers {
switch k {
case MultipartObjectSize:
prm.Attributes = append(prm.Attributes, [2]string{MultipartObjectSize, strconv.FormatUint(multipartObjectSize, 10)})
case UploadCompletedParts:
prm.Attributes = append(prm.Attributes, [2]string{UploadCompletedParts, headerParts.String()})
case api.ContentType:
default:
prm.Attributes = append(prm.Attributes, [2]string{k, v})
}
}
prm.Attributes = append(prm.Attributes, [2]string{api.ContentType, p.Object.ObjectInfo.ContentType})
createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, fmt.Errorf("put new combined object: %w", err)
}
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: objID,
ETag: hashSum,
FilePath: p.Object.Name,
Size: obj.PayloadSize(),
Created: &p.Object.Created,
Owner: &n.gateOwner,
// TODO: Add creation epoch
OID: createdObj.ID,
ETag: hex.EncodeToString(createdObj.HashSum),
MD5: hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts)),
FilePath: p.Object.ObjectInfo.Name,
Size: multipartObjectSize,
Created: &p.Object.ObjectInfo.Created,
Owner: &n.gateOwner,
CreationEpoch: p.Object.NodeVersion.CreationEpoch,
},
IsUnversioned: !p.VersioningEnabled,
IsCombined: p.Object.Headers[MultipartObjectSize] != "",
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
}
p.Object.ID = objID
p.Object.Size = obj.PayloadSize()
p.Object.MD5Sum = ""
p.Object.HashSum = hashSum
p.Object.ObjectInfo.ID = createdObj.ID
p.Object.ObjectInfo.Size = createdObj.Size
p.Object.ObjectInfo.MD5Sum = hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts))
p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum)
p.Object.ObjectInfo.Headers[MultipartObjectSize] = strconv.FormatUint(multipartObjectSize, 10)
p.Object.ObjectInfo.Headers[UploadCompletedParts] = headerParts.String()
p.Object.NodeVersion = newVersion
return &data.ExtendedObjectInfo{
ObjectInfo: p.Object,
NodeVersion: newVersion,
}, nil
return p.Object, nil
}
func getRandomOID() (oid.ID, error) {

View file

@ -412,10 +412,10 @@ func (x *FrostFS) PatchObject(ctx context.Context, prm layer.PrmObjectPatch) (oi
prmPatch.SetAddress(addr)
var rng object.Range
rng.SetOffset(prm.Range.Start)
rng.SetLength(prm.Range.End - prm.Range.Start + 1)
if prm.Range.End >= prm.ObjectSize {
rng.SetLength(prm.ObjectSize - prm.Range.Start)
rng.SetOffset(prm.Offset)
rng.SetLength(prm.Length)
if prm.Length+prm.Offset > prm.ObjectSize {
rng.SetLength(prm.ObjectSize - prm.Offset)
}
prmPatch.SetRange(&rng)