[#63] Add fast multipart upload

Add new flag to object tree meta `isCombined` that means
the object payload is list of parts that forms real payload.
Set this attribute when complete multipart upload not to do unnecessary copying.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-06-27 15:49:20 +03:00
parent 361d10cc78
commit ad81b599dd
8 changed files with 283 additions and 75 deletions

View file

@ -18,6 +18,7 @@ type NodeVersion struct {
BaseNodeVersion
DeleteMarker *DeleteMarkerInfo
IsUnversioned bool
IsCombined bool
}
func (v NodeVersion) IsDeleteMarker() bool {
@ -79,13 +80,13 @@ type MultipartInfo struct {
// PartInfo is upload information about part.
type PartInfo struct {
Key string
UploadID string
Number int
OID oid.ID
Size uint64
ETag string
Created time.Time
Key string `json:"key"`
UploadID string `json:"uploadId"`
Number int `json:"number"`
OID oid.ID `json:"oid"`
Size uint64 `json:"size"`
ETag string `json:"etag"`
Created time.Time `json:"created"`
}
// ToHeaderString form short part representation to use in S3-Completed-Parts header.

View file

@ -42,7 +42,7 @@ func TestSimpleGetEncrypted(t *testing.T) {
require.NoError(t, err)
require.NotEqual(t, content, string(encryptedContent))
response, _ := getEncryptedObject(t, tc, bktName, objName)
response, _ := getEncryptedObject(tc, bktName, objName)
require.Equal(t, content, string(response))
}
@ -104,14 +104,40 @@ func TestS3EncryptionSSECMultipartUpload(t *testing.T) {
data := multipartUploadEncrypted(tc, bktName, objName, headers, objLen, partSize)
require.Equal(t, objLen, len(data))
resData, resHeader := getEncryptedObject(t, tc, bktName, objName)
resData, resHeader := getEncryptedObject(tc, bktName, objName)
equalDataSlices(t, data, resData)
require.Equal(t, headers[api.ContentType], resHeader.Get(api.ContentType))
require.Equal(t, headers[headerMetaKey], resHeader[headerMetaKey][0])
require.Equal(t, strconv.Itoa(objLen), resHeader.Get(api.ContentLength))
checkContentUsingRangeEnc(t, tc, bktName, objName, data, 1000000)
checkContentUsingRangeEnc(t, tc, bktName, objName, data, 10000000)
checkContentUsingRangeEnc(tc, bktName, objName, data, 1000000)
checkContentUsingRangeEnc(tc, bktName, objName, data, 10000000)
}
func TestMultipartUploadGetRange(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket-for-multipart-s3-tests", "multipart_obj"
createTestBucket(hc, bktName)
objLen := 30 * 1024 * 1024
partSize := objLen / 6
headerMetaKey := api.MetadataPrefix + "foo"
headers := map[string]string{
headerMetaKey: "bar",
api.ContentType: "text/plain",
}
data := multipartUpload(hc, bktName, objName, headers, objLen, partSize)
require.Equal(t, objLen, len(data))
resData, resHeader := getObject(hc, bktName, objName)
equalDataSlices(t, data, resData)
require.Equal(t, headers[api.ContentType], resHeader.Get(api.ContentType))
require.Equal(t, headers[headerMetaKey], resHeader[headerMetaKey][0])
require.Equal(t, strconv.Itoa(objLen), resHeader.Get(api.ContentLength))
checkContentUsingRange(hc, bktName, objName, data, 1000000)
checkContentUsingRange(hc, bktName, objName, data, 10000000)
}
func equalDataSlices(t *testing.T, expected, actual []byte) {
@ -128,7 +154,15 @@ func equalDataSlices(t *testing.T, expected, actual []byte) {
}
}
func checkContentUsingRangeEnc(t *testing.T, tc *handlerContext, bktName, objName string, data []byte, step int) {
func checkContentUsingRangeEnc(hc *handlerContext, bktName, objName string, data []byte, step int) {
checkContentUsingRangeBase(hc, bktName, objName, data, step, true)
}
func checkContentUsingRange(hc *handlerContext, bktName, objName string, data []byte, step int) {
checkContentUsingRangeBase(hc, bktName, objName, data, step, false)
}
func checkContentUsingRangeBase(hc *handlerContext, bktName, objName string, data []byte, step int, encrypted bool) {
var off, toRead, end int
for off < len(data) {
@ -138,8 +172,14 @@ func checkContentUsingRangeEnc(t *testing.T, tc *handlerContext, bktName, objNam
}
end = off + toRead - 1
rangeData := getEncryptedObjectRange(t, tc, bktName, objName, off, end)
equalDataSlices(t, data[off:end+1], rangeData)
var rangeData []byte
if encrypted {
rangeData = getEncryptedObjectRange(hc.t, hc, bktName, objName, off, end)
} else {
rangeData = getObjectRange(hc.t, hc, bktName, objName, off, end)
}
equalDataSlices(hc.t, data[off:end+1], rangeData)
off += step
}
@ -169,6 +209,30 @@ func multipartUploadEncrypted(hc *handlerContext, bktName, objName string, heade
return
}
func multipartUpload(hc *handlerContext, bktName, objName string, headers map[string]string, objLen, partsSize int) (objData []byte) {
multipartInfo := createMultipartUpload(hc, bktName, objName, headers)
var sum, currentPart int
var etags []string
adjustedSize := partsSize
for sum < objLen {
currentPart++
sum += partsSize
if sum > objLen {
adjustedSize = objLen - sum
}
etag, data := uploadPart(hc, bktName, objName, multipartInfo.UploadID, currentPart, adjustedSize)
etags = append(etags, etag)
objData = append(objData, data...)
}
completeMultipartUpload(hc, bktName, objName, multipartInfo.UploadID, etags)
return
}
func createMultipartUploadEncrypted(hc *handlerContext, bktName, objName string, headers map[string]string) *InitiateMultipartUploadResponse {
return createMultipartUploadBase(hc, bktName, objName, true, headers)
}
@ -254,7 +318,7 @@ func TestMultipartEncrypted(t *testing.T) {
part2ETag, part2 := uploadPartEncrypted(hc, bktName, objName, multipartInitInfo.UploadID, 2, 5)
completeMultipartUpload(hc, bktName, objName, multipartInitInfo.UploadID, []string{part1ETag, part2ETag})
res, _ := getEncryptedObject(t, hc, bktName, objName)
res, _ := getEncryptedObject(hc, bktName, objName)
require.Equal(t, len(part1)+len(part2), len(res))
require.Equal(t, append(part1, part2...), res)
@ -270,13 +334,22 @@ func putEncryptedObject(t *testing.T, tc *handlerContext, bktName, objName, cont
assertStatus(t, w, http.StatusOK)
}
func getEncryptedObject(t *testing.T, tc *handlerContext, bktName, objName string) ([]byte, http.Header) {
w, r := prepareTestRequest(tc, bktName, objName, nil)
func getEncryptedObject(hc *handlerContext, bktName, objName string) ([]byte, http.Header) {
w, r := prepareTestRequest(hc, bktName, objName, nil)
setEncryptHeaders(r)
tc.Handler().GetObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
return getObjectBase(hc, w, r)
}
func getObject(hc *handlerContext, bktName, objName string) ([]byte, http.Header) {
w, r := prepareTestRequest(hc, bktName, objName, nil)
return getObjectBase(hc, w, r)
}
func getObjectBase(hc *handlerContext, w *httptest.ResponseRecorder, r *http.Request) ([]byte, http.Header) {
hc.Handler().GetObjectHandler(w, r)
assertStatus(hc.t, w, http.StatusOK)
content, err := io.ReadAll(w.Result().Body)
require.NoError(t, err)
require.NoError(hc.t, err)
return content, w.Header()
}

View file

@ -12,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"go.uber.org/zap"
)
@ -88,6 +89,8 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
if len(info.Headers[layer.AttributeEncryptionAlgorithm]) > 0 {
h.Set(api.ContentLength, info.Headers[layer.AttributeDecryptedSize])
addSSECHeaders(h, requestHeader)
} else if len(info.Headers[layer.MultipartObjectSize]) > 0 {
h.Set(api.ContentLength, info.Headers[layer.MultipartObjectSize])
} else {
h.Set(api.ContentLength, strconv.FormatUint(info.Size, 10))
}
@ -165,12 +168,10 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
fullSize := info.Size
if encryptionParams.Enabled() {
if fullSize, err = strconv.ParseUint(info.Headers[layer.AttributeDecryptedSize], 10, 64); err != nil {
h.logAndSendError(w, "invalid decrypted size header", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
return
}
fullSize, err := getObjectSize(extendedInfo, encryptionParams)
if err != nil {
h.logAndSendError(w, "invalid size header", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
return
}
if params, err = fetchRangeHeader(r.Header, fullSize); err != nil {
@ -221,7 +222,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
if params != nil {
writeRangeHeaders(w, params, info.Size)
writeRangeHeaders(w, params, fullSize)
} else {
w.WriteHeader(http.StatusOK)
}
@ -232,6 +233,23 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
}
}
func getObjectSize(extendedInfo *data.ExtendedObjectInfo, encryptionParams encryption.Params) (uint64, error) {
var err error
fullSize := extendedInfo.ObjectInfo.Size
if encryptionParams.Enabled() {
if fullSize, err = strconv.ParseUint(extendedInfo.ObjectInfo.Headers[layer.AttributeDecryptedSize], 10, 64); err != nil {
return 0, fmt.Errorf("invalid decrypted size header: %w", err)
}
} else if extendedInfo.NodeVersion.IsCombined {
if fullSize, err = strconv.ParseUint(extendedInfo.ObjectInfo.Headers[layer.MultipartObjectSize], 10, 64); err != nil {
return 0, fmt.Errorf("invalid multipart size header: %w", err)
}
}
return fullSize, nil
}
func checkPreconditions(info *data.ObjectInfo, args *conditionalArgs) error {
if len(args.IfMatch) > 0 && args.IfMatch != info.HashSum {
return fmt.Errorf("%w: etag mismatched: '%s', '%s'", errors.GetAPIError(errors.ErrPreconditionFailed), args.IfMatch, info.HashSum)

View file

@ -111,6 +111,15 @@ type (
CopiesNumbers []uint32
}
PutCombinedObjectParams struct {
BktInfo *data.BucketInfo
Object string
Size uint64
Header map[string]string
Lock *data.ObjectLock
Encryption encryption.Params
}
DeleteObjectParams struct {
BktInfo *data.BucketInfo
Objects []*VersionedObject
@ -410,7 +419,7 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) {
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) {
var params getParams
params.oid = p.ObjectInfo.ID
params.objInfo = p.ObjectInfo
params.bktInfo = p.BucketInfo
var decReader *encryption.Decrypter

View file

@ -0,0 +1,80 @@
package layer
import (
"context"
"errors"
"fmt"
"io"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type partObj struct {
OID oid.ID
Size uint64
}
// implements io.Reader of payloads of the object list stored in the FrostFS network.
type multiObjectReader struct {
ctx context.Context
layer *layer
off, ln uint64
prm getFrostFSParams
curReader io.Reader
parts []partObj
}
func (x *multiObjectReader) Read(p []byte) (n int, err error) {
if x.curReader != nil {
n, err = x.curReader.Read(p)
if !errors.Is(err, io.EOF) {
return n, err
}
}
if len(x.parts) == 0 {
return n, io.EOF
}
for x.off != 0 {
if x.parts[0].Size < x.off {
x.parts = x.parts[1:]
x.off -= x.parts[0].Size
} else {
x.prm.off = x.off
x.off = 0
}
}
x.prm.oid = x.parts[0].OID
if x.ln != 0 {
if x.parts[0].Size < x.prm.off+x.ln {
x.prm.ln = x.parts[0].Size - x.prm.off
x.ln -= x.prm.ln
} else {
x.prm.ln = x.ln
x.ln = 0
x.parts = x.parts[:1]
}
}
x.curReader, err = x.layer.initFrostFSObjectPayloadReader(x.ctx, x.prm)
if err != nil {
return n, fmt.Errorf("init payload reader for the next part: %w", err)
}
x.prm.off = 0
x.prm.ln = 0
x.parts = x.parts[1:]
next, err := x.Read(p[n:])
return n + next, err
}

View file

@ -1,8 +1,10 @@
package layer
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
@ -24,6 +26,7 @@ const (
UploadIDAttributeName = "S3-Upload-Id"
UploadPartNumberAttributeName = "S3-Upload-Part-Number"
UploadCompletedParts = "S3-Completed-Parts"
MultipartObjectSize = "S3-Multipart-Object-Size"
metaPrefix = "meta-"
aclPrefix = "acl-"
@ -313,45 +316,6 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
return n.uploadPart(ctx, multipartInfo, params)
}
// implements io.Reader of payloads of the object list stored in the FrostFS network.
type multiObjectReader struct {
ctx context.Context
layer *layer
prm getParams
curReader io.Reader
parts []*data.PartInfo
}
func (x *multiObjectReader) Read(p []byte) (n int, err error) {
if x.curReader != nil {
n, err = x.curReader.Read(p)
if !errors.Is(err, io.EOF) {
return n, err
}
}
if len(x.parts) == 0 {
return n, io.EOF
}
x.prm.oid = x.parts[0].OID
x.curReader, err = x.layer.initObjectPayloadReader(x.ctx, x.prm)
if err != nil {
return n, fmt.Errorf("init payload reader for the next part: %w", err)
}
x.parts = x.parts[1:]
next, err := x.Read(p[n:])
return n + next, err
}
func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) {
for i := 1; i < len(p.Parts); i++ {
if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber {
@ -379,6 +343,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
if partInfo == nil || part.ETag != partInfo.ETag {
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
}
delete(partsInfo, part.PartNumber)
// for the last part we have no minimum size limit
if i != len(p.Parts)-1 && partInfo.Size < uploadMinSize {
return nil, nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooSmall), partInfo.Size, uploadMinSize)
@ -405,6 +371,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
initMetadata[UploadCompletedParts] = completedPartsHeader.String()
initMetadata[MultipartObjectSize] = strconv.FormatUint(multipartObjetSize, 10)
uploadData := &UploadData{
TagSet: make(map[string]string),
@ -428,18 +395,15 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
multipartObjetSize = encMultipartObjectSize
}
r := &multiObjectReader{
ctx: ctx,
layer: n,
parts: parts,
partsData, err := json.Marshal(parts)
if err != nil {
return nil, nil, fmt.Errorf("marshal parst for combined object: %w", err)
}
r.prm.bktInfo = p.Info.Bkt
extObjInfo, err := n.PutObject(ctx, &PutObjectParams{
BktInfo: p.Info.Bkt,
Object: p.Info.Key,
Reader: r,
Reader: bytes.NewReader(partsData),
Header: initMetadata,
Size: multipartObjetSize,
Encryption: p.Info.Encryption,

View file

@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
@ -32,6 +33,14 @@ type (
// payload range
off, ln uint64
objInfo *data.ObjectInfo
bktInfo *data.BucketInfo
}
getFrostFSParams struct {
// payload range
off, ln uint64
oid oid.ID
bktInfo *data.BucketInfo
}
@ -98,9 +107,55 @@ func (n *layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj
return res.Head, nil
}
func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) {
if _, isCombined := p.objInfo.Headers[MultipartObjectSize]; !isCombined {
return n.initFrostFSObjectPayloadReader(ctx, getFrostFSParams{
off: p.off,
ln: p.ln,
oid: p.objInfo.ID,
bktInfo: p.bktInfo,
})
}
combinedObj, err := n.objectGet(ctx, p.bktInfo, p.objInfo.ID)
if err != nil {
return nil, fmt.Errorf("get combined object '%s': %w", p.objInfo.ID.EncodeToString(), err)
}
var parts []*data.PartInfo
if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil {
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
}
isEncrypted := FormEncryptionInfo(p.objInfo.Headers).Enabled
objParts := make([]partObj, len(parts))
for i, part := range parts {
size := part.Size
if isEncrypted {
if size, err = sio.EncryptedSize(part.Size); err != nil {
return nil, fmt.Errorf("compute encrypted size: %w", err)
}
}
objParts[i] = partObj{
OID: part.OID,
Size: size,
}
}
return &multiObjectReader{
ctx: ctx,
off: p.off,
ln: p.ln,
layer: n,
parts: objParts,
prm: getFrostFSParams{bktInfo: p.bktInfo},
}, nil
}
// initializes payload reader of the FrostFS object.
// Zero range corresponds to full payload (panics if only offset is set).
func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) {
func (n *layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error) {
prm := PrmObjectRead{
Container: p.bktInfo.CID,
Object: p.oid,
@ -250,6 +305,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
Size: size,
},
IsUnversioned: !bktSettings.VersioningEnabled(),
IsCombined: p.Header[MultipartObjectSize] != "",
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {

View file

@ -73,6 +73,7 @@ const (
lockConfigurationKV = "LockConfiguration"
oidKV = "OID"
isCombinedKV = "IsCombined"
isUnversionedKV = "IsUnversioned"
isTagKV = "IsTag"
uploadIDKV = "UploadId"
@ -181,6 +182,7 @@ func newNodeVersion(filePath string, node NodeResponse) (*data.NodeVersion, erro
func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeVersion {
_, isUnversioned := treeNode.Get(isUnversionedKV)
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
_, isCombined := treeNode.Get(isCombinedKV)
eTag, _ := treeNode.Get(etagKV)
version := &data.NodeVersion{
@ -194,6 +196,7 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
FilePath: filePath,
},
IsUnversioned: isUnversioned,
IsCombined: isCombined,
}
if isDeleteMarker {
@ -1073,6 +1076,10 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
meta[createdKV] = strconv.FormatInt(version.DeleteMarker.Created.UTC().UnixMilli(), 10)
}
if version.IsCombined {
meta[isCombinedKV] = "true"
}
if version.IsUnversioned {
meta[isUnversionedKV] = "true"