oracleobjectstorage: Use rclone's rate limiter in mutipart transfers

This commit is contained in:
Manoj Ghosh 2023-07-30 13:38:51 -07:00 committed by Nick Craig-Wood
parent de185de215
commit 27f5297e8d
10 changed files with 1152 additions and 159 deletions

View file

@ -4,24 +4,26 @@
package oracleobjectstorage
import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"time"
"golang.org/x/net/http/httpguts"
"github.com/ncw/swift/v2"
"github.com/oracle/oci-go-sdk/v65/common"
"github.com/oracle/oci-go-sdk/v65/objectstorage"
"github.com/oracle/oci-go-sdk/v65/objectstorage/transfer"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/atexit"
)
// ------------------------------------------------------------
@ -367,6 +369,25 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo
return resp.HTTPResponse().Body, nil
}
func isZeroLength(streamReader io.Reader) bool {
switch v := streamReader.(type) {
case *bytes.Buffer:
return v.Len() == 0
case *bytes.Reader:
return v.Len() == 0
case *strings.Reader:
return v.Len() == 0
case *os.File:
fi, err := v.Stat()
if err != nil {
return false
}
return fi.Size() == 0
default:
return false
}
}
// Update an object if it has changed
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
bucketName, bucketPath := o.split()
@ -379,11 +400,59 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
size := src.Size()
multipart := size >= int64(o.fs.opt.UploadCutoff)
if isZeroLength(in) {
multipart = false
}
req := objectstorage.PutObjectRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
}
// Set the mtime in the metadata
modTime := src.ModTime(ctx)
metadata := map[string]string{
metaMtime: swift.TimeToFloatString(modTime),
// Fetch metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, src, options)
if err != nil {
return fmt.Errorf("failed to read metadata from source object: %w", err)
}
req.OpcMeta = make(map[string]string, len(meta)+2)
// merge metadata into request and user metadata
for k, v := range meta {
pv := common.String(v)
k = strings.ToLower(k)
switch k {
case "cache-control":
req.CacheControl = pv
case "content-disposition":
req.ContentDisposition = pv
case "content-encoding":
req.ContentEncoding = pv
case "content-language":
req.ContentLanguage = pv
case "content-type":
req.ContentType = pv
case "tier":
// ignore
case "mtime":
// mtime in meta overrides source ModTime
metaModTime, err := time.Parse(time.RFC3339Nano, v)
if err != nil {
fs.Debugf(o, "failed to parse metadata %s: %q: %v", k, v, err)
} else {
modTime = metaModTime
}
case "btime":
// write as metadata since we can't set it
req.OpcMeta[k] = v
default:
req.OpcMeta[k] = v
}
}
// Set the mtime in the metadata
req.OpcMeta[metaMtime] = swift.TimeToFloatString(modTime)
// read the md5sum if available
// - for non-multipart
@ -404,114 +473,53 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// - a multipart upload
// - the ETag is not an MD5, e.g. when using SSE/SSE-C
// provided checksums aren't disabled
metadata[metaMD5Hash] = md5sumBase64
req.OpcMeta[metaMD5Hash] = md5sumBase64
}
}
}
}
// Guess the content type
mimeType := fs.MimeType(ctx, src)
// Set the content type if it isn't set already
if req.ContentType == nil {
req.ContentType = common.String(fs.MimeType(ctx, src))
}
if size >= 0 {
req.ContentLength = common.Int64(size)
}
if md5sumBase64 != "" {
req.ContentMD5 = &md5sumBase64
}
o.applyPutOptions(&req, options...)
useBYOKPutObject(o.fs, &req)
if o.fs.opt.StorageTier != "" {
storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier)
if !ok {
return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier)
}
req.StorageTier = storageTier
}
// Check metadata keys and values are valid
for key, value := range req.OpcMeta {
if !httpguts.ValidHeaderFieldName(key) {
fs.Errorf(o, "Dropping invalid metadata key %q", key)
delete(req.OpcMeta, key)
} else if value == "" {
fs.Errorf(o, "Dropping nil metadata value for key %q", key)
delete(req.OpcMeta, key)
} else if !httpguts.ValidHeaderFieldValue(value) {
fs.Errorf(o, "Dropping invalid metadata value %q for key %q", value, key)
delete(req.OpcMeta, key)
}
}
if multipart {
chunkSize := int64(o.fs.opt.ChunkSize)
uploadRequest := transfer.UploadRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
ContentType: common.String(mimeType),
PartSize: common.Int64(chunkSize),
AllowMultipartUploads: common.Bool(true),
AllowParrallelUploads: common.Bool(true),
ObjectStorageClient: o.fs.srv,
EnableMultipartChecksumVerification: common.Bool(!o.fs.opt.DisableChecksum),
NumberOfGoroutines: common.Int(o.fs.opt.UploadConcurrency),
Metadata: metadataWithOpcPrefix(metadata),
}
if o.fs.opt.StorageTier != "" {
storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier)
if !ok {
return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier)
}
uploadRequest.StorageTier = storageTier
}
o.applyMultiPutOptions(&uploadRequest, options...)
useBYOKUpload(o.fs, &uploadRequest)
uploadStreamRequest := transfer.UploadStreamRequest{
UploadRequest: uploadRequest,
StreamReader: in,
}
uploadMgr := transfer.NewUploadManager()
var uploadID = ""
defer atexit.OnError(&err, func() {
if uploadID == "" {
return
}
if o.fs.opt.LeavePartsOnError {
return
}
fs.Debugf(o, "Cancelling multipart upload")
errCancel := o.fs.abortMultiPartUpload(
context.Background(),
bucketName,
bucketPath,
uploadID)
if errCancel != nil {
fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel)
}
})()
err = o.fs.pacer.Call(func() (bool, error) {
uploadResponse, err := uploadMgr.UploadStream(ctx, uploadStreamRequest)
var httpResponse *http.Response
if err == nil {
if uploadResponse.Type == transfer.MultipartUpload {
if uploadResponse.MultipartUploadResponse != nil {
httpResponse = uploadResponse.MultipartUploadResponse.HTTPResponse()
}
} else {
if uploadResponse.SinglepartUploadResponse != nil {
httpResponse = uploadResponse.SinglepartUploadResponse.HTTPResponse()
}
}
}
if err != nil {
uploadID := ""
if uploadResponse.MultipartUploadResponse != nil && uploadResponse.MultipartUploadResponse.UploadID != nil {
uploadID = *uploadResponse.MultipartUploadResponse.UploadID
fs.Debugf(o, "multipart streaming upload failed, aborting uploadID: %v, may retry", uploadID)
_ = o.fs.abortMultiPartUpload(ctx, bucketName, bucketPath, uploadID)
}
}
return shouldRetry(ctx, httpResponse, err)
})
err = o.uploadMultipart(ctx, &req, in, src)
if err != nil {
fs.Errorf(o, "multipart streaming upload failed %v", err)
return err
}
} else {
req := objectstorage.PutObjectRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
ContentType: common.String(mimeType),
PutObjectBody: io.NopCloser(in),
OpcMeta: metadata,
}
if size >= 0 {
req.ContentLength = common.Int64(size)
}
if o.fs.opt.StorageTier != "" {
storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier)
if !ok {
return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier)
}
req.StorageTier = storageTier
}
o.applyPutOptions(&req, options...)
useBYOKPutObject(o.fs, &req)
var resp objectstorage.PutObjectResponse
err = o.fs.pacer.Call(func() (bool, error) {
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
req.PutObjectBody = io.NopCloser(in)
resp, err = o.fs.srv.PutObject(ctx, req)
return shouldRetry(ctx, resp.HTTPResponse(), err)
})
@ -591,28 +599,24 @@ func (o *Object) applyGetObjectOptions(req *objectstorage.GetObjectRequest, opti
}
}
func (o *Object) applyMultiPutOptions(req *transfer.UploadRequest, options ...fs.OpenOption) {
// Apply upload options
for _, option := range options {
key, value := option.Header()
lowerKey := strings.ToLower(key)
switch lowerKey {
case "":
// ignore
case "content-encoding":
req.ContentEncoding = common.String(value)
case "content-language":
req.ContentLanguage = common.String(value)
case "content-type":
req.ContentType = common.String(value)
default:
if strings.HasPrefix(lowerKey, ociMetaPrefix) {
req.Metadata[lowerKey] = value
} else {
fs.Errorf(o, "Don't know how to set key %q on upload", key)
}
}
}
func (o *Object) applyMultipartUploadOptions(putReq *objectstorage.PutObjectRequest, req *objectstorage.CreateMultipartUploadRequest) {
req.ContentType = putReq.ContentType
req.ContentLanguage = putReq.ContentLanguage
req.ContentEncoding = putReq.ContentEncoding
req.ContentDisposition = putReq.ContentDisposition
req.CacheControl = putReq.CacheControl
req.Metadata = metadataWithOpcPrefix(putReq.OpcMeta)
req.OpcSseCustomerAlgorithm = putReq.OpcSseCustomerAlgorithm
req.OpcSseCustomerKey = putReq.OpcSseCustomerKey
req.OpcSseCustomerKeySha256 = putReq.OpcSseCustomerKeySha256
req.OpcSseKmsKeyId = putReq.OpcSseKmsKeyId
}
func (o *Object) applyPartUploadOptions(putReq *objectstorage.PutObjectRequest, req *objectstorage.UploadPartRequest) {
req.OpcSseCustomerAlgorithm = putReq.OpcSseCustomerAlgorithm
req.OpcSseCustomerKey = putReq.OpcSseCustomerKey
req.OpcSseCustomerKeySha256 = putReq.OpcSseCustomerKeySha256
req.OpcSseKmsKeyId = putReq.OpcSseKmsKeyId
}
func metadataWithOpcPrefix(src map[string]string) map[string]string {