From ad81b599ddfbdc57acfc03628ae3a797ed5ce9ee Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 27 Jun 2023 15:49:20 +0300 Subject: [PATCH] [#63] Add fast multipart upload Add new flag to object tree meta `isCombined` that means the object payload is list of parts that forms real payload. Set this attribute when complete multipart upload not to do unnecessary copying. Signed-off-by: Denis Kirillov --- api/data/tree.go | 15 ++--- api/handler/encryption_test.go | 99 +++++++++++++++++++++++++++----- api/handler/get.go | 32 ++++++++--- api/layer/layer.go | 11 +++- api/layer/multi_object_reader.go | 80 ++++++++++++++++++++++++++ api/layer/multipart_upload.go | 56 ++++-------------- api/layer/object.go | 58 ++++++++++++++++++- pkg/service/tree/tree.go | 7 +++ 8 files changed, 283 insertions(+), 75 deletions(-) create mode 100644 api/layer/multi_object_reader.go diff --git a/api/data/tree.go b/api/data/tree.go index 02db82cc..3959fc8f 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -18,6 +18,7 @@ type NodeVersion struct { BaseNodeVersion DeleteMarker *DeleteMarkerInfo IsUnversioned bool + IsCombined bool } func (v NodeVersion) IsDeleteMarker() bool { @@ -79,13 +80,13 @@ type MultipartInfo struct { // PartInfo is upload information about part. type PartInfo struct { - Key string - UploadID string - Number int - OID oid.ID - Size uint64 - ETag string - Created time.Time + Key string `json:"key"` + UploadID string `json:"uploadId"` + Number int `json:"number"` + OID oid.ID `json:"oid"` + Size uint64 `json:"size"` + ETag string `json:"etag"` + Created time.Time `json:"created"` } // ToHeaderString form short part representation to use in S3-Completed-Parts header. diff --git a/api/handler/encryption_test.go b/api/handler/encryption_test.go index 5684ea32..5d3d4f5a 100644 --- a/api/handler/encryption_test.go +++ b/api/handler/encryption_test.go @@ -42,7 +42,7 @@ func TestSimpleGetEncrypted(t *testing.T) { require.NoError(t, err) require.NotEqual(t, content, string(encryptedContent)) - response, _ := getEncryptedObject(t, tc, bktName, objName) + response, _ := getEncryptedObject(tc, bktName, objName) require.Equal(t, content, string(response)) } @@ -104,14 +104,40 @@ func TestS3EncryptionSSECMultipartUpload(t *testing.T) { data := multipartUploadEncrypted(tc, bktName, objName, headers, objLen, partSize) require.Equal(t, objLen, len(data)) - resData, resHeader := getEncryptedObject(t, tc, bktName, objName) + resData, resHeader := getEncryptedObject(tc, bktName, objName) equalDataSlices(t, data, resData) require.Equal(t, headers[api.ContentType], resHeader.Get(api.ContentType)) require.Equal(t, headers[headerMetaKey], resHeader[headerMetaKey][0]) require.Equal(t, strconv.Itoa(objLen), resHeader.Get(api.ContentLength)) - checkContentUsingRangeEnc(t, tc, bktName, objName, data, 1000000) - checkContentUsingRangeEnc(t, tc, bktName, objName, data, 10000000) + checkContentUsingRangeEnc(tc, bktName, objName, data, 1000000) + checkContentUsingRangeEnc(tc, bktName, objName, data, 10000000) +} + +func TestMultipartUploadGetRange(t *testing.T) { + hc := prepareHandlerContext(t) + bktName, objName := "bucket-for-multipart-s3-tests", "multipart_obj" + createTestBucket(hc, bktName) + + objLen := 30 * 1024 * 1024 + partSize := objLen / 6 + headerMetaKey := api.MetadataPrefix + "foo" + headers := map[string]string{ + headerMetaKey: "bar", + api.ContentType: "text/plain", + } + + data := multipartUpload(hc, bktName, objName, headers, objLen, partSize) + require.Equal(t, objLen, len(data)) + + resData, resHeader := getObject(hc, bktName, objName) + equalDataSlices(t, data, resData) + require.Equal(t, headers[api.ContentType], resHeader.Get(api.ContentType)) + require.Equal(t, headers[headerMetaKey], resHeader[headerMetaKey][0]) + require.Equal(t, strconv.Itoa(objLen), resHeader.Get(api.ContentLength)) + + checkContentUsingRange(hc, bktName, objName, data, 1000000) + checkContentUsingRange(hc, bktName, objName, data, 10000000) } func equalDataSlices(t *testing.T, expected, actual []byte) { @@ -128,7 +154,15 @@ func equalDataSlices(t *testing.T, expected, actual []byte) { } } -func checkContentUsingRangeEnc(t *testing.T, tc *handlerContext, bktName, objName string, data []byte, step int) { +func checkContentUsingRangeEnc(hc *handlerContext, bktName, objName string, data []byte, step int) { + checkContentUsingRangeBase(hc, bktName, objName, data, step, true) +} + +func checkContentUsingRange(hc *handlerContext, bktName, objName string, data []byte, step int) { + checkContentUsingRangeBase(hc, bktName, objName, data, step, false) +} + +func checkContentUsingRangeBase(hc *handlerContext, bktName, objName string, data []byte, step int, encrypted bool) { var off, toRead, end int for off < len(data) { @@ -138,8 +172,14 @@ func checkContentUsingRangeEnc(t *testing.T, tc *handlerContext, bktName, objNam } end = off + toRead - 1 - rangeData := getEncryptedObjectRange(t, tc, bktName, objName, off, end) - equalDataSlices(t, data[off:end+1], rangeData) + var rangeData []byte + if encrypted { + rangeData = getEncryptedObjectRange(hc.t, hc, bktName, objName, off, end) + } else { + rangeData = getObjectRange(hc.t, hc, bktName, objName, off, end) + } + + equalDataSlices(hc.t, data[off:end+1], rangeData) off += step } @@ -169,6 +209,30 @@ func multipartUploadEncrypted(hc *handlerContext, bktName, objName string, heade return } +func multipartUpload(hc *handlerContext, bktName, objName string, headers map[string]string, objLen, partsSize int) (objData []byte) { + multipartInfo := createMultipartUpload(hc, bktName, objName, headers) + + var sum, currentPart int + var etags []string + adjustedSize := partsSize + + for sum < objLen { + currentPart++ + + sum += partsSize + if sum > objLen { + adjustedSize = objLen - sum + } + + etag, data := uploadPart(hc, bktName, objName, multipartInfo.UploadID, currentPart, adjustedSize) + etags = append(etags, etag) + objData = append(objData, data...) + } + + completeMultipartUpload(hc, bktName, objName, multipartInfo.UploadID, etags) + return +} + func createMultipartUploadEncrypted(hc *handlerContext, bktName, objName string, headers map[string]string) *InitiateMultipartUploadResponse { return createMultipartUploadBase(hc, bktName, objName, true, headers) } @@ -254,7 +318,7 @@ func TestMultipartEncrypted(t *testing.T) { part2ETag, part2 := uploadPartEncrypted(hc, bktName, objName, multipartInitInfo.UploadID, 2, 5) completeMultipartUpload(hc, bktName, objName, multipartInitInfo.UploadID, []string{part1ETag, part2ETag}) - res, _ := getEncryptedObject(t, hc, bktName, objName) + res, _ := getEncryptedObject(hc, bktName, objName) require.Equal(t, len(part1)+len(part2), len(res)) require.Equal(t, append(part1, part2...), res) @@ -270,13 +334,22 @@ func putEncryptedObject(t *testing.T, tc *handlerContext, bktName, objName, cont assertStatus(t, w, http.StatusOK) } -func getEncryptedObject(t *testing.T, tc *handlerContext, bktName, objName string) ([]byte, http.Header) { - w, r := prepareTestRequest(tc, bktName, objName, nil) +func getEncryptedObject(hc *handlerContext, bktName, objName string) ([]byte, http.Header) { + w, r := prepareTestRequest(hc, bktName, objName, nil) setEncryptHeaders(r) - tc.Handler().GetObjectHandler(w, r) - assertStatus(t, w, http.StatusOK) + return getObjectBase(hc, w, r) +} + +func getObject(hc *handlerContext, bktName, objName string) ([]byte, http.Header) { + w, r := prepareTestRequest(hc, bktName, objName, nil) + return getObjectBase(hc, w, r) +} + +func getObjectBase(hc *handlerContext, w *httptest.ResponseRecorder, r *http.Request) ([]byte, http.Header) { + hc.Handler().GetObjectHandler(w, r) + assertStatus(hc.t, w, http.StatusOK) content, err := io.ReadAll(w.Result().Body) - require.NoError(t, err) + require.NoError(hc.t, err) return content, w.Header() } diff --git a/api/handler/get.go b/api/handler/get.go index e4582d1a..a3afa9f3 100644 --- a/api/handler/get.go +++ b/api/handler/get.go @@ -12,6 +12,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "go.uber.org/zap" ) @@ -88,6 +89,8 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E if len(info.Headers[layer.AttributeEncryptionAlgorithm]) > 0 { h.Set(api.ContentLength, info.Headers[layer.AttributeDecryptedSize]) addSSECHeaders(h, requestHeader) + } else if len(info.Headers[layer.MultipartObjectSize]) > 0 { + h.Set(api.ContentLength, info.Headers[layer.MultipartObjectSize]) } else { h.Set(api.ContentLength, strconv.FormatUint(info.Size, 10)) } @@ -165,12 +168,10 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { return } - fullSize := info.Size - if encryptionParams.Enabled() { - if fullSize, err = strconv.ParseUint(info.Headers[layer.AttributeDecryptedSize], 10, 64); err != nil { - h.logAndSendError(w, "invalid decrypted size header", reqInfo, errors.GetAPIError(errors.ErrBadRequest)) - return - } + fullSize, err := getObjectSize(extendedInfo, encryptionParams) + if err != nil { + h.logAndSendError(w, "invalid size header", reqInfo, errors.GetAPIError(errors.ErrBadRequest)) + return } if params, err = fetchRangeHeader(r.Header, fullSize); err != nil { @@ -221,7 +222,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned()) if params != nil { - writeRangeHeaders(w, params, info.Size) + writeRangeHeaders(w, params, fullSize) } else { w.WriteHeader(http.StatusOK) } @@ -232,6 +233,23 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { } } +func getObjectSize(extendedInfo *data.ExtendedObjectInfo, encryptionParams encryption.Params) (uint64, error) { + var err error + fullSize := extendedInfo.ObjectInfo.Size + + if encryptionParams.Enabled() { + if fullSize, err = strconv.ParseUint(extendedInfo.ObjectInfo.Headers[layer.AttributeDecryptedSize], 10, 64); err != nil { + return 0, fmt.Errorf("invalid decrypted size header: %w", err) + } + } else if extendedInfo.NodeVersion.IsCombined { + if fullSize, err = strconv.ParseUint(extendedInfo.ObjectInfo.Headers[layer.MultipartObjectSize], 10, 64); err != nil { + return 0, fmt.Errorf("invalid multipart size header: %w", err) + } + } + + return fullSize, nil +} + func checkPreconditions(info *data.ObjectInfo, args *conditionalArgs) error { if len(args.IfMatch) > 0 && args.IfMatch != info.HashSum { return fmt.Errorf("%w: etag mismatched: '%s', '%s'", errors.GetAPIError(errors.ErrPreconditionFailed), args.IfMatch, info.HashSum) diff --git a/api/layer/layer.go b/api/layer/layer.go index 17cc6fe7..e68f2711 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -111,6 +111,15 @@ type ( CopiesNumbers []uint32 } + PutCombinedObjectParams struct { + BktInfo *data.BucketInfo + Object string + Size uint64 + Header map[string]string + Lock *data.ObjectLock + Encryption encryption.Params + } + DeleteObjectParams struct { BktInfo *data.BucketInfo Objects []*VersionedObject @@ -410,7 +419,7 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) { func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) { var params getParams - params.oid = p.ObjectInfo.ID + params.objInfo = p.ObjectInfo params.bktInfo = p.BucketInfo var decReader *encryption.Decrypter diff --git a/api/layer/multi_object_reader.go b/api/layer/multi_object_reader.go new file mode 100644 index 00000000..aa632492 --- /dev/null +++ b/api/layer/multi_object_reader.go @@ -0,0 +1,80 @@ +package layer + +import ( + "context" + "errors" + "fmt" + "io" + + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type partObj struct { + OID oid.ID + Size uint64 +} + +// implements io.Reader of payloads of the object list stored in the FrostFS network. +type multiObjectReader struct { + ctx context.Context + + layer *layer + + off, ln uint64 + + prm getFrostFSParams + + curReader io.Reader + + parts []partObj +} + +func (x *multiObjectReader) Read(p []byte) (n int, err error) { + if x.curReader != nil { + n, err = x.curReader.Read(p) + if !errors.Is(err, io.EOF) { + return n, err + } + } + + if len(x.parts) == 0 { + return n, io.EOF + } + + for x.off != 0 { + if x.parts[0].Size < x.off { + x.parts = x.parts[1:] + x.off -= x.parts[0].Size + } else { + x.prm.off = x.off + x.off = 0 + } + } + + x.prm.oid = x.parts[0].OID + + if x.ln != 0 { + if x.parts[0].Size < x.prm.off+x.ln { + x.prm.ln = x.parts[0].Size - x.prm.off + x.ln -= x.prm.ln + } else { + x.prm.ln = x.ln + x.ln = 0 + x.parts = x.parts[:1] + } + } + + x.curReader, err = x.layer.initFrostFSObjectPayloadReader(x.ctx, x.prm) + if err != nil { + return n, fmt.Errorf("init payload reader for the next part: %w", err) + } + + x.prm.off = 0 + x.prm.ln = 0 + + x.parts = x.parts[1:] + + next, err := x.Read(p[n:]) + + return n + next, err +} diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 129efb55..81e1f721 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -1,8 +1,10 @@ package layer import ( + "bytes" "context" "encoding/hex" + "encoding/json" "errors" "fmt" "io" @@ -24,6 +26,7 @@ const ( UploadIDAttributeName = "S3-Upload-Id" UploadPartNumberAttributeName = "S3-Upload-Part-Number" UploadCompletedParts = "S3-Completed-Parts" + MultipartObjectSize = "S3-Multipart-Object-Size" metaPrefix = "meta-" aclPrefix = "acl-" @@ -313,45 +316,6 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. return n.uploadPart(ctx, multipartInfo, params) } -// implements io.Reader of payloads of the object list stored in the FrostFS network. -type multiObjectReader struct { - ctx context.Context - - layer *layer - - prm getParams - - curReader io.Reader - - parts []*data.PartInfo -} - -func (x *multiObjectReader) Read(p []byte) (n int, err error) { - if x.curReader != nil { - n, err = x.curReader.Read(p) - if !errors.Is(err, io.EOF) { - return n, err - } - } - - if len(x.parts) == 0 { - return n, io.EOF - } - - x.prm.oid = x.parts[0].OID - - x.curReader, err = x.layer.initObjectPayloadReader(x.ctx, x.prm) - if err != nil { - return n, fmt.Errorf("init payload reader for the next part: %w", err) - } - - x.parts = x.parts[1:] - - next, err := x.Read(p[n:]) - - return n + next, err -} - func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) { for i := 1; i < len(p.Parts); i++ { if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber { @@ -379,6 +343,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar if partInfo == nil || part.ETag != partInfo.ETag { return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber) } + delete(partsInfo, part.PartNumber) + // for the last part we have no minimum size limit if i != len(p.Parts)-1 && partInfo.Size < uploadMinSize { return nil, nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooSmall), partInfo.Size, uploadMinSize) @@ -405,6 +371,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar initMetadata := make(map[string]string, len(multipartInfo.Meta)+1) initMetadata[UploadCompletedParts] = completedPartsHeader.String() + initMetadata[MultipartObjectSize] = strconv.FormatUint(multipartObjetSize, 10) uploadData := &UploadData{ TagSet: make(map[string]string), @@ -428,18 +395,15 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar multipartObjetSize = encMultipartObjectSize } - r := &multiObjectReader{ - ctx: ctx, - layer: n, - parts: parts, + partsData, err := json.Marshal(parts) + if err != nil { + return nil, nil, fmt.Errorf("marshal parst for combined object: %w", err) } - r.prm.bktInfo = p.Info.Bkt - extObjInfo, err := n.PutObject(ctx, &PutObjectParams{ BktInfo: p.Info.Bkt, Object: p.Info.Key, - Reader: r, + Reader: bytes.NewReader(partsData), Header: initMetadata, Size: multipartObjetSize, Encryption: p.Info.Encryption, diff --git a/api/layer/object.go b/api/layer/object.go index 04730063..765ec9de 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "encoding/json" "errors" "fmt" "io" @@ -32,6 +33,14 @@ type ( // payload range off, ln uint64 + objInfo *data.ObjectInfo + bktInfo *data.BucketInfo + } + + getFrostFSParams struct { + // payload range + off, ln uint64 + oid oid.ID bktInfo *data.BucketInfo } @@ -98,9 +107,55 @@ func (n *layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj return res.Head, nil } +func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) { + if _, isCombined := p.objInfo.Headers[MultipartObjectSize]; !isCombined { + return n.initFrostFSObjectPayloadReader(ctx, getFrostFSParams{ + off: p.off, + ln: p.ln, + oid: p.objInfo.ID, + bktInfo: p.bktInfo, + }) + } + + combinedObj, err := n.objectGet(ctx, p.bktInfo, p.objInfo.ID) + if err != nil { + return nil, fmt.Errorf("get combined object '%s': %w", p.objInfo.ID.EncodeToString(), err) + } + + var parts []*data.PartInfo + if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil { + return nil, fmt.Errorf("unmarshal combined object parts: %w", err) + } + + isEncrypted := FormEncryptionInfo(p.objInfo.Headers).Enabled + objParts := make([]partObj, len(parts)) + for i, part := range parts { + size := part.Size + if isEncrypted { + if size, err = sio.EncryptedSize(part.Size); err != nil { + return nil, fmt.Errorf("compute encrypted size: %w", err) + } + } + + objParts[i] = partObj{ + OID: part.OID, + Size: size, + } + } + + return &multiObjectReader{ + ctx: ctx, + off: p.off, + ln: p.ln, + layer: n, + parts: objParts, + prm: getFrostFSParams{bktInfo: p.bktInfo}, + }, nil +} + // initializes payload reader of the FrostFS object. // Zero range corresponds to full payload (panics if only offset is set). -func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) { +func (n *layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error) { prm := PrmObjectRead{ Container: p.bktInfo.CID, Object: p.oid, @@ -250,6 +305,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend Size: size, }, IsUnversioned: !bktSettings.VersioningEnabled(), + IsCombined: p.Header[MultipartObjectSize] != "", } if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index a59e660e..8617bdd3 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -73,6 +73,7 @@ const ( lockConfigurationKV = "LockConfiguration" oidKV = "OID" + isCombinedKV = "IsCombined" isUnversionedKV = "IsUnversioned" isTagKV = "IsTag" uploadIDKV = "UploadId" @@ -181,6 +182,7 @@ func newNodeVersion(filePath string, node NodeResponse) (*data.NodeVersion, erro func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeVersion { _, isUnversioned := treeNode.Get(isUnversionedKV) _, isDeleteMarker := treeNode.Get(isDeleteMarkerKV) + _, isCombined := treeNode.Get(isCombinedKV) eTag, _ := treeNode.Get(etagKV) version := &data.NodeVersion{ @@ -194,6 +196,7 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV FilePath: filePath, }, IsUnversioned: isUnversioned, + IsCombined: isCombined, } if isDeleteMarker { @@ -1073,6 +1076,10 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID meta[createdKV] = strconv.FormatInt(version.DeleteMarker.Created.UTC().UnixMilli(), 10) } + if version.IsCombined { + meta[isCombinedKV] = "true" + } + if version.IsUnversioned { meta[isUnversionedKV] = "true"