From 27f5297e8d14d047c9ecfae532ed7c46f6330772 Mon Sep 17 00:00:00 2001 From: Manoj Ghosh Date: Sun, 30 Jul 2023 13:38:51 -0700 Subject: [PATCH] oracleobjectstorage: Use rclone's rate limiter in mutipart transfers --- backend/oracleobjectstorage/byok.go | 16 - backend/oracleobjectstorage/command.go | 66 ++- backend/oracleobjectstorage/multipart.go | 334 +++++++++++++ backend/oracleobjectstorage/object.go | 250 +++++----- backend/oracleobjectstorage/options.go | 57 ++- .../oracleobjectstorage.go | 21 + docs/content/local.md | 2 +- docs/content/oracleobjectstorage.md | 94 +++- docs/content/oracleobjectstorage/_index.md | 0 .../oracleobjectstorage/tutorial_mount.md | 471 ++++++++++++++++++ 10 files changed, 1152 insertions(+), 159 deletions(-) create mode 100644 backend/oracleobjectstorage/multipart.go create mode 100644 docs/content/oracleobjectstorage/_index.md create mode 100644 docs/content/oracleobjectstorage/tutorial_mount.md diff --git a/backend/oracleobjectstorage/byok.go b/backend/oracleobjectstorage/byok.go index 099476a7d..450705d1d 100644 --- a/backend/oracleobjectstorage/byok.go +++ b/backend/oracleobjectstorage/byok.go @@ -13,7 +13,6 @@ import ( "github.com/oracle/oci-go-sdk/v65/common" "github.com/oracle/oci-go-sdk/v65/objectstorage" - "github.com/oracle/oci-go-sdk/v65/objectstorage/transfer" ) const ( @@ -128,18 +127,3 @@ func useBYOKCopyObject(fs *Fs, request *objectstorage.CopyObjectRequest) { request.OpcSseCustomerKeySha256 = common.String(fs.opt.SSECustomerKeySha256) } } - -func useBYOKUpload(fs *Fs, request *transfer.UploadRequest) { - if fs.opt.SSEKMSKeyID != "" { - request.OpcSseKmsKeyId = common.String(fs.opt.SSEKMSKeyID) - } - if fs.opt.SSECustomerAlgorithm != "" { - request.OpcSseCustomerAlgorithm = common.String(fs.opt.SSECustomerAlgorithm) - } - if fs.opt.SSECustomerKey != "" { - request.OpcSseCustomerKey = common.String(fs.opt.SSECustomerKey) - } - if fs.opt.SSECustomerKeySha256 != "" { - request.OpcSseCustomerKeySha256 = common.String(fs.opt.SSECustomerKeySha256) - } -} diff --git a/backend/oracleobjectstorage/command.go b/backend/oracleobjectstorage/command.go index 529730971..4be61e3e5 100644 --- a/backend/oracleobjectstorage/command.go +++ b/backend/oracleobjectstorage/command.go @@ -6,6 +6,7 @@ package oracleobjectstorage import ( "context" "fmt" + "sort" "strings" "time" @@ -196,6 +197,32 @@ func (f *Fs) listMultipartUploadsAll(ctx context.Context) (uploadsMap map[string // for "dir" and it returns "dirKey" func (f *Fs) listMultipartUploads(ctx context.Context, bucketName, directory string) ( uploads []*objectstorage.MultipartUpload, err error) { + return f.listMultipartUploadsObject(ctx, bucketName, directory, false) +} + +// listMultipartUploads finds first outstanding multipart uploads for (bucket, key) +// +// Note that rather lazily we treat key as a prefix, so it matches +// directories and objects. This could surprise the user if they ask +// for "dir" and it returns "dirKey" +func (f *Fs) findLatestMultipartUpload(ctx context.Context, bucketName, directory string) ( + uploads []*objectstorage.MultipartUpload, err error) { + pastUploads, err := f.listMultipartUploadsObject(ctx, bucketName, directory, true) + if err != nil { + return nil, err + } + + if len(pastUploads) > 0 { + sort.Slice(pastUploads, func(i, j int) bool { + return pastUploads[i].TimeCreated.After(pastUploads[j].TimeCreated.Time) + }) + return pastUploads[:1], nil + } + return nil, err +} + +func (f *Fs) listMultipartUploadsObject(ctx context.Context, bucketName, directory string, exact bool) ( + uploads []*objectstorage.MultipartUpload, err error) { uploads = []*objectstorage.MultipartUpload{} req := objectstorage.ListMultipartUploadsRequest{ @@ -217,7 +244,13 @@ func (f *Fs) listMultipartUploads(ctx context.Context, bucketName, directory str if directory != "" && item.Object != nil && !strings.HasPrefix(*item.Object, directory) { continue } - uploads = append(uploads, &response.Items[index]) + if exact { + if *item.Object == directory { + uploads = append(uploads, &response.Items[index]) + } + } else { + uploads = append(uploads, &response.Items[index]) + } } if response.OpcNextPage == nil { break @@ -226,3 +259,34 @@ func (f *Fs) listMultipartUploads(ctx context.Context, bucketName, directory str } return uploads, nil } + +func (f *Fs) listMultipartUploadParts(ctx context.Context, bucketName, bucketPath string, uploadID string) ( + uploadedParts map[int]objectstorage.MultipartUploadPartSummary, err error) { + uploadedParts = make(map[int]objectstorage.MultipartUploadPartSummary) + req := objectstorage.ListMultipartUploadPartsRequest{ + NamespaceName: common.String(f.opt.Namespace), + BucketName: common.String(bucketName), + ObjectName: common.String(bucketPath), + UploadId: common.String(uploadID), + Limit: common.Int(1000), + } + + var response objectstorage.ListMultipartUploadPartsResponse + for { + err = f.pacer.Call(func() (bool, error) { + response, err = f.srv.ListMultipartUploadParts(ctx, req) + return shouldRetry(ctx, response.HTTPResponse(), err) + }) + if err != nil { + return uploadedParts, err + } + for _, item := range response.Items { + uploadedParts[*item.PartNumber] = item + } + if response.OpcNextPage == nil { + break + } + req.Page = response.OpcNextPage + } + return uploadedParts, nil +} diff --git a/backend/oracleobjectstorage/multipart.go b/backend/oracleobjectstorage/multipart.go new file mode 100644 index 000000000..9dce5ff75 --- /dev/null +++ b/backend/oracleobjectstorage/multipart.go @@ -0,0 +1,334 @@ +//go:build !plan9 && !solaris && !js +// +build !plan9,!solaris,!js + +package oracleobjectstorage + +import ( + "bytes" + "context" + "crypto/md5" + "encoding/base64" + "fmt" + "io" + "sort" + "strconv" + "sync" + + "github.com/oracle/oci-go-sdk/v65/common" + "github.com/oracle/oci-go-sdk/v65/objectstorage" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/chunksize" + "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/readers" + "golang.org/x/sync/errgroup" +) + +var warnStreamUpload sync.Once + +func (o *Object) uploadMultipart( + ctx context.Context, + putReq *objectstorage.PutObjectRequest, + in io.Reader, + src fs.ObjectInfo) (err error) { + uploadID, uploadedParts, err := o.createMultipartUpload(ctx, putReq) + if err != nil { + fs.Errorf(o, "failed to create multipart upload-id err: %v", err) + return err + } + return o.uploadParts(ctx, putReq, in, src, uploadID, uploadedParts) +} + +func (o *Object) createMultipartUpload(ctx context.Context, putReq *objectstorage.PutObjectRequest) ( + uploadID string, uploadedParts map[int]objectstorage.MultipartUploadPartSummary, err error) { + bucketName, bucketPath := o.split() + f := o.fs + if f.opt.AttemptResumeUpload { + fs.Debugf(o, "attempting to resume upload for %v (if any)", o.remote) + resumeUploads, err := o.fs.findLatestMultipartUpload(ctx, bucketName, bucketPath) + if err == nil && len(resumeUploads) > 0 { + uploadID = *resumeUploads[0].UploadId + uploadedParts, err = f.listMultipartUploadParts(ctx, bucketName, bucketPath, uploadID) + if err == nil { + fs.Debugf(o, "resuming with existing upload id: %v", uploadID) + return uploadID, uploadedParts, err + } + } + } + req := objectstorage.CreateMultipartUploadRequest{ + NamespaceName: common.String(o.fs.opt.Namespace), + BucketName: common.String(bucketName), + } + req.Object = common.String(bucketPath) + if o.fs.opt.StorageTier != "" { + storageTier, ok := objectstorage.GetMappingStorageTierEnum(o.fs.opt.StorageTier) + if !ok { + return "", nil, fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier) + } + req.StorageTier = storageTier + } + o.applyMultipartUploadOptions(putReq, &req) + + var resp objectstorage.CreateMultipartUploadResponse + err = o.fs.pacer.Call(func() (bool, error) { + resp, err = o.fs.srv.CreateMultipartUpload(ctx, req) + return shouldRetry(ctx, resp.HTTPResponse(), err) + }) + if err != nil { + return "", nil, err + } + uploadID = *resp.UploadId + fs.Debugf(o, "created new upload id: %v", uploadID) + return uploadID, nil, err +} + +func (o *Object) uploadParts( + ctx context.Context, + putReq *objectstorage.PutObjectRequest, + in io.Reader, + src fs.ObjectInfo, + uploadID string, + uploadedParts map[int]objectstorage.MultipartUploadPartSummary) (err error) { + bucketName, bucketPath := o.split() + f := o.fs + + // make concurrency machinery + concurrency := f.opt.UploadConcurrency + if concurrency < 1 { + concurrency = 1 + } + + uploadParts := f.opt.MaxUploadParts + if uploadParts < 1 { + uploadParts = 1 + } else if uploadParts > maxUploadParts { + uploadParts = maxUploadParts + } + + // calculate size of parts + partSize := f.opt.ChunkSize + fileSize := src.Size() + + // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize + // buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of + // 48 GiB which seems like a not too unreasonable limit. + if fileSize == -1 { + warnStreamUpload.Do(func() { + fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", + f.opt.ChunkSize, fs.SizeSuffix(int64(partSize)*int64(uploadParts))) + }) + } else { + partSize = chunksize.Calculator(o, fileSize, uploadParts, f.opt.ChunkSize) + } + + uploadCtx, cancel := context.WithCancel(ctx) + defer atexit.OnError(&err, func() { + cancel() + if o.fs.opt.LeavePartsOnError { + return + } + fs.Debugf(o, "Cancelling multipart upload") + errCancel := o.fs.abortMultiPartUpload( + context.Background(), + bucketName, + bucketPath, + uploadID) + if errCancel != nil { + fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel) + } else { + fs.Debugf(o, "canceled and aborted multipart upload: %v", uploadID) + } + })() + + var ( + g, gCtx = errgroup.WithContext(uploadCtx) + finished = false + partsMu sync.Mutex // to protect parts + parts []*objectstorage.CommitMultipartUploadPartDetails + off int64 + md5sMu sync.Mutex + md5s []byte + tokens = pacer.NewTokenDispenser(concurrency) + memPool = o.fs.getMemoryPool(int64(partSize)) + ) + + addMd5 := func(md5binary *[md5.Size]byte, partNum int64) { + md5sMu.Lock() + defer md5sMu.Unlock() + start := partNum * md5.Size + end := start + md5.Size + if extend := end - int64(len(md5s)); extend > 0 { + md5s = append(md5s, make([]byte, extend)...) + } + copy(md5s[start:end], (*md5binary)[:]) + } + + for partNum := int64(1); !finished; partNum++ { + // Get a block of memory from the pool and token which limits concurrency. + tokens.Get() + buf := memPool.Get() + + free := func() { + // return the memory and token + memPool.Put(buf) + tokens.Put() + } + + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + free() + break + } + + // Read the chunk + var n int + n, err = readers.ReadFill(in, buf) // this can never return 0, nil + if err == io.EOF { + if n == 0 && partNum != 1 { // end if no data and if not first chunk + free() + break + } + finished = true + } else if err != nil { + free() + return fmt.Errorf("multipart upload failed to read source: %w", err) + } + buf = buf[:n] + + partNum := partNum + fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(fileSize)) + off += int64(n) + g.Go(func() (err error) { + defer free() + partLength := int64(len(buf)) + + // create checksum of buffer for integrity checking + md5sumBinary := md5.Sum(buf) + addMd5(&md5sumBinary, partNum-1) + md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:]) + if uploadedPart, ok := uploadedParts[int(partNum)]; ok { + if md5sum == *uploadedPart.Md5 { + fs.Debugf(o, "matched uploaded part found, part num %d, skipping part, md5=%v", partNum, md5sum) + partsMu.Lock() + parts = append(parts, &objectstorage.CommitMultipartUploadPartDetails{ + PartNum: uploadedPart.PartNumber, + Etag: uploadedPart.Etag, + }) + partsMu.Unlock() + return nil + } + } + + req := objectstorage.UploadPartRequest{ + NamespaceName: common.String(o.fs.opt.Namespace), + BucketName: common.String(bucketName), + ObjectName: common.String(bucketPath), + UploadId: common.String(uploadID), + UploadPartNum: common.Int(int(partNum)), + ContentLength: common.Int64(partLength), + ContentMD5: common.String(md5sum), + } + o.applyPartUploadOptions(putReq, &req) + var resp objectstorage.UploadPartResponse + err = f.pacer.Call(func() (bool, error) { + req.UploadPartBody = io.NopCloser(bytes.NewReader(buf)) + resp, err = f.srv.UploadPart(gCtx, req) + if err != nil { + if partNum <= int64(concurrency) { + return shouldRetry(gCtx, resp.HTTPResponse(), err) + } + // retry all chunks once have done the first batch + return true, err + } + partsMu.Lock() + parts = append(parts, &objectstorage.CommitMultipartUploadPartDetails{ + PartNum: common.Int(int(partNum)), + Etag: resp.ETag, + }) + partsMu.Unlock() + return false, nil + }) + if err != nil { + fs.Errorf(o, "multipart upload failed to upload part:%d err: %v", partNum, err) + return fmt.Errorf("multipart upload failed to upload part: %w", err) + } + return nil + }) + } + err = g.Wait() + if err != nil { + return err + } + + // sort the completed parts by part number + sort.Slice(parts, func(i, j int) bool { + return *parts[i].PartNum < *parts[j].PartNum + }) + + var resp objectstorage.CommitMultipartUploadResponse + resp, err = o.commitMultiPart(ctx, uploadID, parts) + if err != nil { + return err + } + fs.Debugf(o, "multipart upload %v committed.", uploadID) + hashOfHashes := md5.Sum(md5s) + wantMultipartMd5 := base64.StdEncoding.EncodeToString(hashOfHashes[:]) + "-" + strconv.Itoa(len(parts)) + gotMultipartMd5 := *resp.OpcMultipartMd5 + if wantMultipartMd5 != gotMultipartMd5 { + fs.Errorf(o, "multipart upload corrupted: multipart md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5) + return fmt.Errorf("multipart upload corrupted: md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5) + } + fs.Debugf(o, "multipart upload %v md5 matched: expecting %s and got %s", uploadID, wantMultipartMd5, gotMultipartMd5) + return nil +} + +// commits the multipart upload +func (o *Object) commitMultiPart(ctx context.Context, uploadID string, parts []*objectstorage.CommitMultipartUploadPartDetails) (resp objectstorage.CommitMultipartUploadResponse, err error) { + bucketName, bucketPath := o.split() + req := objectstorage.CommitMultipartUploadRequest{ + NamespaceName: common.String(o.fs.opt.Namespace), + BucketName: common.String(bucketName), + ObjectName: common.String(bucketPath), + UploadId: common.String(uploadID), + } + var partsToCommit []objectstorage.CommitMultipartUploadPartDetails + for _, part := range parts { + partsToCommit = append(partsToCommit, *part) + } + req.PartsToCommit = partsToCommit + err = o.fs.pacer.Call(func() (bool, error) { + resp, err = o.fs.srv.CommitMultipartUpload(ctx, req) + // if multipart is corrupted, we will abort the uploadId + if o.isMultiPartUploadCorrupted(err) { + fs.Debugf(o, "multipart uploadId %v is corrupted, aborting...", uploadID) + errCancel := o.fs.abortMultiPartUpload( + context.Background(), + bucketName, + bucketPath, + uploadID) + if errCancel != nil { + fs.Debugf(o, "Failed to abort multipart upload: %v, ignoring.", errCancel) + } else { + fs.Debugf(o, "aborted multipart upload: %v", uploadID) + } + return false, err + } + return shouldRetry(ctx, resp.HTTPResponse(), err) + }) + return resp, err +} + +func (o *Object) isMultiPartUploadCorrupted(err error) bool { + if err == nil { + return false + } + // Check if this ocierr object, and if it is multipart commit error + if ociError, ok := err.(common.ServiceError); ok { + // If it is a timeout then we want to retry that + if ociError.GetCode() == "InvalidUploadPart" { + return true + } + } + return false +} diff --git a/backend/oracleobjectstorage/object.go b/backend/oracleobjectstorage/object.go index e82ac295e..b9158062c 100644 --- a/backend/oracleobjectstorage/object.go +++ b/backend/oracleobjectstorage/object.go @@ -4,24 +4,26 @@ package oracleobjectstorage import ( + "bytes" "context" "encoding/base64" "encoding/hex" "fmt" "io" "net/http" + "os" "regexp" "strconv" "strings" "time" + "golang.org/x/net/http/httpguts" + "github.com/ncw/swift/v2" "github.com/oracle/oci-go-sdk/v65/common" "github.com/oracle/oci-go-sdk/v65/objectstorage" - "github.com/oracle/oci-go-sdk/v65/objectstorage/transfer" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/hash" - "github.com/rclone/rclone/lib/atexit" ) // ------------------------------------------------------------ @@ -367,6 +369,25 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo return resp.HTTPResponse().Body, nil } +func isZeroLength(streamReader io.Reader) bool { + switch v := streamReader.(type) { + case *bytes.Buffer: + return v.Len() == 0 + case *bytes.Reader: + return v.Len() == 0 + case *strings.Reader: + return v.Len() == 0 + case *os.File: + fi, err := v.Stat() + if err != nil { + return false + } + return fi.Size() == 0 + default: + return false + } +} + // Update an object if it has changed func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { bucketName, bucketPath := o.split() @@ -379,11 +400,59 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op size := src.Size() multipart := size >= int64(o.fs.opt.UploadCutoff) + if isZeroLength(in) { + multipart = false + } + + req := objectstorage.PutObjectRequest{ + NamespaceName: common.String(o.fs.opt.Namespace), + BucketName: common.String(bucketName), + ObjectName: common.String(bucketPath), + } + // Set the mtime in the metadata modTime := src.ModTime(ctx) - metadata := map[string]string{ - metaMtime: swift.TimeToFloatString(modTime), + // Fetch metadata if --metadata is in use + meta, err := fs.GetMetadataOptions(ctx, src, options) + if err != nil { + return fmt.Errorf("failed to read metadata from source object: %w", err) } + req.OpcMeta = make(map[string]string, len(meta)+2) + // merge metadata into request and user metadata + for k, v := range meta { + pv := common.String(v) + k = strings.ToLower(k) + switch k { + case "cache-control": + req.CacheControl = pv + case "content-disposition": + req.ContentDisposition = pv + case "content-encoding": + req.ContentEncoding = pv + case "content-language": + req.ContentLanguage = pv + case "content-type": + req.ContentType = pv + case "tier": + // ignore + case "mtime": + // mtime in meta overrides source ModTime + metaModTime, err := time.Parse(time.RFC3339Nano, v) + if err != nil { + fs.Debugf(o, "failed to parse metadata %s: %q: %v", k, v, err) + } else { + modTime = metaModTime + } + case "btime": + // write as metadata since we can't set it + req.OpcMeta[k] = v + default: + req.OpcMeta[k] = v + } + } + + // Set the mtime in the metadata + req.OpcMeta[metaMtime] = swift.TimeToFloatString(modTime) // read the md5sum if available // - for non-multipart @@ -404,114 +473,53 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // - a multipart upload // - the ETag is not an MD5, e.g. when using SSE/SSE-C // provided checksums aren't disabled - metadata[metaMD5Hash] = md5sumBase64 + req.OpcMeta[metaMD5Hash] = md5sumBase64 } } } } - // Guess the content type - mimeType := fs.MimeType(ctx, src) + // Set the content type if it isn't set already + if req.ContentType == nil { + req.ContentType = common.String(fs.MimeType(ctx, src)) + } + if size >= 0 { + req.ContentLength = common.Int64(size) + } + if md5sumBase64 != "" { + req.ContentMD5 = &md5sumBase64 + } + o.applyPutOptions(&req, options...) + useBYOKPutObject(o.fs, &req) + if o.fs.opt.StorageTier != "" { + storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier) + if !ok { + return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier) + } + req.StorageTier = storageTier + } + // Check metadata keys and values are valid + for key, value := range req.OpcMeta { + if !httpguts.ValidHeaderFieldName(key) { + fs.Errorf(o, "Dropping invalid metadata key %q", key) + delete(req.OpcMeta, key) + } else if value == "" { + fs.Errorf(o, "Dropping nil metadata value for key %q", key) + delete(req.OpcMeta, key) + } else if !httpguts.ValidHeaderFieldValue(value) { + fs.Errorf(o, "Dropping invalid metadata value %q for key %q", value, key) + delete(req.OpcMeta, key) + } + } if multipart { - chunkSize := int64(o.fs.opt.ChunkSize) - uploadRequest := transfer.UploadRequest{ - NamespaceName: common.String(o.fs.opt.Namespace), - BucketName: common.String(bucketName), - ObjectName: common.String(bucketPath), - ContentType: common.String(mimeType), - PartSize: common.Int64(chunkSize), - AllowMultipartUploads: common.Bool(true), - AllowParrallelUploads: common.Bool(true), - ObjectStorageClient: o.fs.srv, - EnableMultipartChecksumVerification: common.Bool(!o.fs.opt.DisableChecksum), - NumberOfGoroutines: common.Int(o.fs.opt.UploadConcurrency), - Metadata: metadataWithOpcPrefix(metadata), - } - if o.fs.opt.StorageTier != "" { - storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier) - if !ok { - return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier) - } - uploadRequest.StorageTier = storageTier - } - o.applyMultiPutOptions(&uploadRequest, options...) - useBYOKUpload(o.fs, &uploadRequest) - uploadStreamRequest := transfer.UploadStreamRequest{ - UploadRequest: uploadRequest, - StreamReader: in, - } - uploadMgr := transfer.NewUploadManager() - var uploadID = "" - - defer atexit.OnError(&err, func() { - if uploadID == "" { - return - } - if o.fs.opt.LeavePartsOnError { - return - } - fs.Debugf(o, "Cancelling multipart upload") - errCancel := o.fs.abortMultiPartUpload( - context.Background(), - bucketName, - bucketPath, - uploadID) - if errCancel != nil { - fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel) - } - })() - - err = o.fs.pacer.Call(func() (bool, error) { - uploadResponse, err := uploadMgr.UploadStream(ctx, uploadStreamRequest) - var httpResponse *http.Response - if err == nil { - if uploadResponse.Type == transfer.MultipartUpload { - if uploadResponse.MultipartUploadResponse != nil { - httpResponse = uploadResponse.MultipartUploadResponse.HTTPResponse() - } - } else { - if uploadResponse.SinglepartUploadResponse != nil { - httpResponse = uploadResponse.SinglepartUploadResponse.HTTPResponse() - } - } - } - if err != nil { - uploadID := "" - if uploadResponse.MultipartUploadResponse != nil && uploadResponse.MultipartUploadResponse.UploadID != nil { - uploadID = *uploadResponse.MultipartUploadResponse.UploadID - fs.Debugf(o, "multipart streaming upload failed, aborting uploadID: %v, may retry", uploadID) - _ = o.fs.abortMultiPartUpload(ctx, bucketName, bucketPath, uploadID) - } - } - return shouldRetry(ctx, httpResponse, err) - }) + err = o.uploadMultipart(ctx, &req, in, src) if err != nil { - fs.Errorf(o, "multipart streaming upload failed %v", err) return err } } else { - req := objectstorage.PutObjectRequest{ - NamespaceName: common.String(o.fs.opt.Namespace), - BucketName: common.String(bucketName), - ObjectName: common.String(bucketPath), - ContentType: common.String(mimeType), - PutObjectBody: io.NopCloser(in), - OpcMeta: metadata, - } - if size >= 0 { - req.ContentLength = common.Int64(size) - } - if o.fs.opt.StorageTier != "" { - storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier) - if !ok { - return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier) - } - req.StorageTier = storageTier - } - o.applyPutOptions(&req, options...) - useBYOKPutObject(o.fs, &req) var resp objectstorage.PutObjectResponse - err = o.fs.pacer.Call(func() (bool, error) { + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + req.PutObjectBody = io.NopCloser(in) resp, err = o.fs.srv.PutObject(ctx, req) return shouldRetry(ctx, resp.HTTPResponse(), err) }) @@ -591,28 +599,24 @@ func (o *Object) applyGetObjectOptions(req *objectstorage.GetObjectRequest, opti } } -func (o *Object) applyMultiPutOptions(req *transfer.UploadRequest, options ...fs.OpenOption) { - // Apply upload options - for _, option := range options { - key, value := option.Header() - lowerKey := strings.ToLower(key) - switch lowerKey { - case "": - // ignore - case "content-encoding": - req.ContentEncoding = common.String(value) - case "content-language": - req.ContentLanguage = common.String(value) - case "content-type": - req.ContentType = common.String(value) - default: - if strings.HasPrefix(lowerKey, ociMetaPrefix) { - req.Metadata[lowerKey] = value - } else { - fs.Errorf(o, "Don't know how to set key %q on upload", key) - } - } - } +func (o *Object) applyMultipartUploadOptions(putReq *objectstorage.PutObjectRequest, req *objectstorage.CreateMultipartUploadRequest) { + req.ContentType = putReq.ContentType + req.ContentLanguage = putReq.ContentLanguage + req.ContentEncoding = putReq.ContentEncoding + req.ContentDisposition = putReq.ContentDisposition + req.CacheControl = putReq.CacheControl + req.Metadata = metadataWithOpcPrefix(putReq.OpcMeta) + req.OpcSseCustomerAlgorithm = putReq.OpcSseCustomerAlgorithm + req.OpcSseCustomerKey = putReq.OpcSseCustomerKey + req.OpcSseCustomerKeySha256 = putReq.OpcSseCustomerKeySha256 + req.OpcSseKmsKeyId = putReq.OpcSseKmsKeyId +} + +func (o *Object) applyPartUploadOptions(putReq *objectstorage.PutObjectRequest, req *objectstorage.UploadPartRequest) { + req.OpcSseCustomerAlgorithm = putReq.OpcSseCustomerAlgorithm + req.OpcSseCustomerKey = putReq.OpcSseCustomerKey + req.OpcSseCustomerKeySha256 = putReq.OpcSseCustomerKeySha256 + req.OpcSseKmsKeyId = putReq.OpcSseKmsKeyId } func metadataWithOpcPrefix(src map[string]string) map[string]string { diff --git a/backend/oracleobjectstorage/options.go b/backend/oracleobjectstorage/options.go index 3f89badaf..6442e1d8c 100644 --- a/backend/oracleobjectstorage/options.go +++ b/backend/oracleobjectstorage/options.go @@ -13,12 +13,15 @@ import ( const ( maxSizeForCopy = 4768 * 1024 * 1024 - minChunkSize = fs.SizeSuffix(1024 * 1024 * 5) - defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) + maxUploadParts = 10000 defaultUploadConcurrency = 10 + minChunkSize = fs.SizeSuffix(5 * 1024 * 1024) + defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024) minSleep = 10 * time.Millisecond defaultCopyTimeoutDuration = fs.Duration(time.Minute) + memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long + memoryPoolUseMmap = false ) const ( @@ -55,12 +58,16 @@ type Options struct { ConfigProfile string `config:"config_profile"` UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` ChunkSize fs.SizeSuffix `config:"chunk_size"` + MaxUploadParts int `config:"max_upload_parts"` UploadConcurrency int `config:"upload_concurrency"` DisableChecksum bool `config:"disable_checksum"` + MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"` + MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"` CopyCutoff fs.SizeSuffix `config:"copy_cutoff"` CopyTimeout fs.Duration `config:"copy_timeout"` StorageTier string `config:"storage_tier"` LeavePartsOnError bool `config:"leave_parts_on_error"` + AttemptResumeUpload bool `config:"attempt_resume_upload"` NoCheckBucket bool `config:"no_check_bucket"` SSEKMSKeyID string `config:"sse_kms_key_id"` SSECustomerAlgorithm string `config:"sse_customer_algorithm"` @@ -157,9 +164,8 @@ The minimum is 0 and the maximum is 5 GiB.`, Help: `Chunk size to use for uploading. When uploading files larger than upload_cutoff or files with unknown -size (e.g. from "rclone rcat" or uploaded with "rclone mount" or google -photos or google docs) they will be uploaded as multipart uploads -using this chunk size. +size (e.g. from "rclone rcat" or uploaded with "rclone mount" they will be uploaded +as multipart uploads using this chunk size. Note that "upload_concurrency" chunks of this size are buffered in memory per transfer. @@ -181,6 +187,20 @@ statistics displayed with "-P" flag. `, Default: minChunkSize, Advanced: true, + }, { + Name: "max_upload_parts", + Help: `Maximum number of parts in a multipart upload. + +This option defines the maximum number of multipart chunks to use +when doing a multipart upload. + +OCI has max parts limit of 10,000 chunks. + +Rclone will automatically increase the chunk size when uploading a +large file of a known size to stay below this number of chunks limit. +`, + Default: maxUploadParts, + Advanced: true, }, { Name: "upload_concurrency", Help: `Concurrency for multipart uploads. @@ -203,6 +223,19 @@ copied in chunks of this size. The minimum is 0 and the maximum is 5 GiB.`, Default: fs.SizeSuffix(maxSizeForCopy), Advanced: true, + }, { + Name: "memory_pool_flush_time", + Default: memoryPoolFlushTime, + Advanced: true, + Help: `How often internal memory buffer pools will be flushed. + +Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. +This option controls how often unused buffers will be removed from the pool.`, + }, { + Name: "memory_pool_use_mmap", + Default: memoryPoolUseMmap, + Advanced: true, + Help: `Whether to use mmap buffers in internal memory pool.`, }, { Name: "copy_timeout", Help: `Timeout for copy. @@ -238,12 +271,24 @@ to start uploading.`, encoder.EncodeDot, }, { Name: "leave_parts_on_error", - Help: `If true avoid calling abort upload on a failure, leaving all successfully uploaded parts on S3 for manual recovery. + Help: `If true avoid calling abort upload on a failure, leaving all successfully uploaded parts for manual recovery. It should be set to true for resuming uploads across different sessions. WARNING: Storing parts of an incomplete multipart upload counts towards space usage on object storage and will add additional costs if not cleaned up. +`, + Default: false, + Advanced: true, + }, { + Name: "attempt_resume_upload", + Help: `If true attempt to resume previously started multipart upload for the object. +This will be helpful to speed up multipart transfers by resuming uploads from past session. + +WARNING: If chunk size differs in resumed session from past incomplete session, then the resumed multipart upload is +aborted and a new multipart upload is started with the new chunk size. + +The flag leave_parts_on_error must be true to resume and optimize to skip parts that were already uploaded successfully. `, Default: false, Advanced: true, diff --git a/backend/oracleobjectstorage/oracleobjectstorage.go b/backend/oracleobjectstorage/oracleobjectstorage.go index e3ad782d4..4ff321519 100644 --- a/backend/oracleobjectstorage/oracleobjectstorage.go +++ b/backend/oracleobjectstorage/oracleobjectstorage.go @@ -23,6 +23,7 @@ import ( "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/pool" ) // Register with Fs @@ -49,6 +50,7 @@ type Fs struct { rootDirectory string // directory part of root (if any) cache *bucket.Cache // cache for bucket creation status pacer *fs.Pacer // To pace the API calls + pool *pool.Pool // memory pool } // NewFs Initialize backend @@ -80,6 +82,12 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e srv: objectStorageClient, cache: bucket.NewCache(), pacer: pc, + pool: pool.New( + time.Duration(opt.MemoryPoolFlushTime), + int(opt.ChunkSize), + opt.UploadConcurrency*ci.Transfers, + opt.MemoryPoolUseMmap, + ), } f.setRoot(root) f.features = (&fs.Features{ @@ -179,6 +187,19 @@ func (f *Fs) Hashes() hash.Set { return hash.Set(hash.MD5) } +func (f *Fs) getMemoryPool(size int64) *pool.Pool { + if size == int64(f.opt.ChunkSize) { + return f.pool + } + + return pool.New( + time.Duration(f.opt.MemoryPoolFlushTime), + int(size), + f.opt.UploadConcurrency*f.ci.Transfers, + f.opt.MemoryPoolUseMmap, + ) +} + // setRoot changes the root of the Fs func (f *Fs) setRoot(root string) { f.root = parsePath(root) diff --git a/docs/content/local.md b/docs/content/local.md index 42327a940..5fa379038 100644 --- a/docs/content/local.md +++ b/docs/content/local.md @@ -562,7 +562,7 @@ Properties: - Config: encoding - Env Var: RCLONE_LOCAL_ENCODING - Type: MultiEncoder -- Default: Slash,Dot +- Default: Slash,InvalidUtf8,Dot ### Metadata diff --git a/docs/content/oracleobjectstorage.md b/docs/content/oracleobjectstorage.md index 0e41ee788..002ab9781 100644 --- a/docs/content/oracleobjectstorage.md +++ b/docs/content/oracleobjectstorage.md @@ -1,17 +1,22 @@ --- title: "Oracle Object Storage" description: "Rclone docs for Oracle Object Storage" +type: page versionIntroduced: "v1.60" --- # {{< icon "fa fa-cloud" >}} Oracle Object Storage -[Oracle Object Storage Overview](https://docs.oracle.com/en-us/iaas/Content/Object/Concepts/objectstorageoverview.htm) - -[Oracle Object Storage FAQ](https://www.oracle.com/cloud/storage/object-storage/faq/) +- [Oracle Object Storage Overview](https://docs.oracle.com/en-us/iaas/Content/Object/Concepts/objectstorageoverview.htm) +- [Oracle Object Storage FAQ](https://www.oracle.com/cloud/storage/object-storage/faq/) +- [Oracle Object Storage Limits](https://docs.oracle.com/en-us/iaas/Content/Resources/Assets/whitepapers/oci-object-storage-best-practices.pdf) Paths are specified as `remote:bucket` (or `remote:` for the `lsd` command.) You may put subdirectories in too, e.g. `remote:bucket/path/to/dir`. +Sample command to transfer local artifacts to remote:bucket in oracle object storage: + +`rclone -vvv --progress --stats-one-line --max-stats-groups 10 --log-format date,time,UTC,longfile --fast-list --buffer-size 256Mi --oos-no-check-bucket --oos-upload-cutoff 10Mi --multi-thread-cutoff 16Mi --multi-thread-streams 3000 --transfers 3000 --checkers 64 --retries 2 --oos-chunk-size 10Mi --oos-upload-concurrency 10000 --oos-attempt-resume-upload --oos-leave-parts-on-error sync ./artifacts remote:bucket -vv` + ## Configuration Here is an example of making an oracle object storage configuration. `rclone config` walks you @@ -135,7 +140,7 @@ List the contents of a bucket rclone ls remote:bucket rclone ls remote:bucket --max-depth 1 -### OCI Authentication Provider +## Authentication Providers OCI has various authentication methods. To learn more about authentication methods please refer [oci authentication methods](https://docs.oracle.com/en-us/iaas/Content/API/Concepts/sdk_authentication_methods.htm) @@ -148,7 +153,7 @@ Rclone supports the following OCI authentication provider. Resource Principal No authentication -#### Authentication provider choice: User Principal +### User Principal Sample rclone config file for Authentication Provider User Principal: [oos] @@ -168,7 +173,7 @@ Considerations: - Overhead of managing users and keys. - If the user is deleted, the config file will no longer work and may cause automation regressions that use the user's credentials. -#### Authentication provider choice: Instance Principal +### Instance Principal An OCI compute instance can be authorized to use rclone by using it's identity and certificates as an instance principal. With this approach no credentials have to be stored and managed. @@ -197,7 +202,7 @@ Considerations: - Everyone who has access to this machine can execute the CLI commands. - It is applicable for oci compute instances only. It cannot be used on external instance or resources. -#### Authentication provider choice: Resource Principal +### Resource Principal Resource principal auth is very similar to instance principal auth but used for resources that are not compute instances such as [serverless functions](https://docs.oracle.com/en-us/iaas/Content/Functions/Concepts/functionsoverview.htm). To use resource principal ensure Rclone process is started with these environment variables set in its process. @@ -216,7 +221,7 @@ Sample rclone configuration file for Authentication Provider Resource Principal: region = us-ashburn-1 provider = resource_principal_auth -#### Authentication provider choice: No authentication +### No authentication Public buckets do not require any authentication mechanism to read objects. Sample rclone configuration file for No authentication: @@ -419,9 +424,8 @@ Properties: Chunk size to use for uploading. When uploading files larger than upload_cutoff or files with unknown -size (e.g. from "rclone rcat" or uploaded with "rclone mount" or google -photos or google docs) they will be uploaded as multipart uploads -using this chunk size. +size (e.g. from "rclone rcat" or uploaded with "rclone mount" they will be uploaded +as multipart uploads using this chunk size. Note that "upload_concurrency" chunks of this size are buffered in memory per transfer. @@ -449,6 +453,26 @@ Properties: - Type: SizeSuffix - Default: 5Mi +#### --oos-max-upload-parts + +Maximum number of parts in a multipart upload. + +This option defines the maximum number of multipart chunks to use +when doing a multipart upload. + +OCI has max parts limit of 10,000 chunks. + +Rclone will automatically increase the chunk size when uploading a +large file of a known size to stay below this number of chunks limit. + + +Properties: + +- Config: max_upload_parts +- Env Var: RCLONE_OOS_MAX_UPLOAD_PARTS +- Type: int +- Default: 10000 + #### --oos-upload-concurrency Concurrency for multipart uploads. @@ -483,6 +507,31 @@ Properties: - Type: SizeSuffix - Default: 4.656Gi +#### --oos-memory-pool-flush-time + +How often internal memory buffer pools will be flushed. + +Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. +This option controls how often unused buffers will be removed from the pool. + +Properties: + +- Config: memory_pool_flush_time +- Env Var: RCLONE_OOS_MEMORY_POOL_FLUSH_TIME +- Type: Duration +- Default: 1m0s + +#### --oos-memory-pool-use-mmap + +Whether to use mmap buffers in internal memory pool. + +Properties: + +- Config: memory_pool_use_mmap +- Env Var: RCLONE_OOS_MEMORY_POOL_USE_MMAP +- Type: bool +- Default: false + #### --oos-copy-timeout Timeout for copy. @@ -528,7 +577,7 @@ Properties: #### --oos-leave-parts-on-error -If true avoid calling abort upload on a failure, leaving all successfully uploaded parts on S3 for manual recovery. +If true avoid calling abort upload on a failure, leaving all successfully uploaded parts for manual recovery. It should be set to true for resuming uploads across different sessions. @@ -543,6 +592,24 @@ Properties: - Type: bool - Default: false +#### --oos-attempt-resume-upload + +If true attempt to resume previously started multipart upload for the object. +This will be helpful to speed up multipart transfers by resuming uploads from past session. + +WARNING: If chunk size differs in resumed session from past incomplete session, then the resumed multipart upload is +aborted and a new multipart upload is started with the new chunk size. + +The flag leave_parts_on_error must be true to resume and optimize to skip parts that were already uploaded successfully. + + +Properties: + +- Config: attempt_resume_upload +- Env Var: RCLONE_OOS_ATTEMPT_RESUME_UPLOAD +- Type: bool +- Default: false + #### --oos-no-check-bucket If set, don't attempt to check the bucket exists or create it. @@ -725,3 +792,6 @@ Options: - "max-age": Max age of upload to delete {{< rem autogenerated options stop >}} + +## Tutorials +### [Mounting Buckets](/oracleobjectstorage/tutorial_mount/) diff --git a/docs/content/oracleobjectstorage/_index.md b/docs/content/oracleobjectstorage/_index.md new file mode 100644 index 000000000..e69de29bb diff --git a/docs/content/oracleobjectstorage/tutorial_mount.md b/docs/content/oracleobjectstorage/tutorial_mount.md new file mode 100644 index 000000000..924fc4344 --- /dev/null +++ b/docs/content/oracleobjectstorage/tutorial_mount.md @@ -0,0 +1,471 @@ +--- +title: "Oracle Object Storage Mount" +description: "Oracle Object Storage mounting tutorial" +slug: tutorial_mount +url: /oracleobjectstorage/tutorial_mount/ +--- +# {{< icon "fa fa-cloud" >}} Mount Buckets and Expose via NFS Tutorial +This runbook shows how to [mount](/commands/rclone_mount/) *Oracle Object Storage* buckets as local file system in +OCI compute Instance using rclone tool. + +You will also learn how to export the rclone mounts as NFS mount, so that other NFS client can access them. + +Usage Pattern : + +NFS Client --> NFS Server --> RClone Mount --> OCI Object Storage + +## Step 1 : Install Rclone + +In oracle linux 8, Rclone can be installed from +[OL8_Developer](https://yum.oracle.com/repo/OracleLinux/OL8/developer/x86_64/index.html) Yum Repo, Please enable the +repo if not enabled already. + +```shell +[opc@base-inst-boot ~]$ sudo yum-config-manager --enable ol8_developer +[opc@base-inst-boot ~]$ sudo yum install -y rclone +[opc@base-inst-boot ~]$ sudo yum install -y fuse +# rclone will prefer fuse3 if available +[opc@base-inst-boot ~]$ sudo yum install -y fuse3 +[opc@base-inst-boot ~]$ yum info rclone +Last metadata expiration check: 0:01:58 ago on Fri 07 Apr 2023 05:53:43 PM GMT. +Installed Packages +Name : rclone +Version : 1.62.2 +Release : 1.0.1.el8 +Architecture : x86_64 +Size : 67 M +Source : rclone-1.62.2-1.0.1.el8.src.rpm +Repository : @System +From repo : ol8_developer +Summary : rsync for cloud storage +URL : http://rclone.org/ +License : MIT +Description : Rclone is a command line program to sync files and directories to and from various cloud services. +``` + +To run it as a mount helper you should symlink rclone binary to /sbin/mount.rclone and optionally /usr/bin/rclonefs, +e.g. ln -s /usr/bin/rclone /sbin/mount.rclone. rclone will detect it and translate command-line arguments appropriately. + +```shell +ln -s /usr/bin/rclone /sbin/mount.rclone +``` + +## Step 2: Setup Rclone Configuration file + +Let's assume you want to access 3 buckets from the oci compute instance using instance principal provider as means of +authenticating with object storage service. + +- namespace-a, bucket-a, +- namespace-b, bucket-b, +- namespace-c, bucket-c + +Rclone configuration file needs to have 3 remote sections, one section of each of above 3 buckets. Create a +configuration file in a accessible location that rclone program can read. + +```shell + +[opc@base-inst-boot ~]$ mkdir -p /etc/rclone +[opc@base-inst-boot ~]$ sudo touch /etc/rclone/rclone.conf + + +# add below contents to /etc/rclone/rclone.conf +[opc@base-inst-boot ~]$ cat /etc/rclone/rclone.conf + + +[ossa] +type = oracleobjectstorage +provider = instance_principal_auth +namespace = namespace-a +compartment = ocid1.compartment.oc1..aaaaaaaa...compartment-a +region = us-ashburn-1 + +[ossb] +type = oracleobjectstorage +provider = instance_principal_auth +namespace = namespace-b +compartment = ocid1.compartment.oc1..aaaaaaaa...compartment-b +region = us-ashburn-1 + + +[ossc] +type = oracleobjectstorage +provider = instance_principal_auth +namespace = namespace-c +compartment = ocid1.compartment.oc1..aaaaaaaa...compartment-c +region = us-ashburn-1 + +# List remotes +[opc@base-inst-boot ~]$ rclone --config /etc/rclone/rclone.conf listremotes +ossa: +ossb: +ossc: + +# Now please ensure you do not see below errors while listing the bucket, +# i.e you should fix the settings to see if namespace, compartment, bucket name are all correct. +# and you must have a dynamic group policy to allow the instance to use object-family in compartment. + +[opc@base-inst-boot ~]$ rclone --config /etc/rclone/rclone.conf ls ossa: +2023/04/07 19:09:21 Failed to ls: Error returned by ObjectStorage Service. Http Status Code: 404. Error Code: NamespaceNotFound. Opc request id: iad-1:kVVAb0knsVXDvu9aHUGHRs3gSNBOFO2_334B6co82LrPMWo2lM5PuBKNxJOTmZsS. Message: You do not have authorization to perform this request, or the requested resource could not be found. +Operation Name: ListBuckets +Timestamp: 2023-04-07 19:09:21 +0000 GMT +Client Version: Oracle-GoSDK/65.32.0 +Request Endpoint: GET https://objectstorage.us-ashburn-1.oraclecloud.com/n/namespace-a/b?compartmentId=ocid1.compartment.oc1..aaaaaaaa...compartment-a +Troubleshooting Tips: See https://docs.oracle.com/iaas/Content/API/References/apierrors.htm#apierrors_404__404_namespacenotfound for more information about resolving this error. +Also see https://docs.oracle.com/iaas/api/#/en/objectstorage/20160918/Bucket/ListBuckets for details on this operation's requirements. +To get more info on the failing request, you can set OCI_GO_SDK_DEBUG env var to info or higher level to log the request/response details. +If you are unable to resolve this ObjectStorage issue, please contact Oracle support and provide them this full error message. +[opc@base-inst-boot ~]$ + +``` + +## Step 3: Setup Dynamic Group and Add IAM Policy. +Just like a human user has an identity identified by its USER-PRINCIPAL, every OCI compute instance is also a robotic +user identified by its INSTANCE-PRINCIPAL. The instance principal key is automatically fetched by rclone/with-oci-sdk +from instance-metadata to make calls to object storage. + +Similar to [user-group](https://docs.oracle.com/en-us/iaas/Content/Identity/Tasks/managinggroups.htm), +[instance groups](https://docs.oracle.com/en-us/iaas/Content/Identity/Tasks/managingdynamicgroups.htm) +is known as dynamic-group in IAM. + +Create a dynamic group say rclone-dynamic-group that the oci compute instance becomes a member of the below group +says all instances belonging to compartment a...c is member of this dynamic-group. + +```shell +any {instance.compartment.id = '', + instance.compartment.id = '', + instance.compartment.id = '' + } +``` + +Now that you have a dynamic group, you need to add a policy allowing what permissions this dynamic-group has. +In our case, we want this dynamic-group to access object-storage. So create a policy now. + +```shell +allow dynamic-group rclone-dynamic-group to manage object-family in compartment compartment-a +allow dynamic-group rclone-dynamic-group to manage object-family in compartment compartment-b +allow dynamic-group rclone-dynamic-group to manage object-family in compartment compartment-c +``` + +After you add the policy, now ensure the rclone can list files in your bucket, if not please troubleshoot any mistakes +you did so far. Please note, identity can take upto a minute to ensure policy gets reflected. + +## Step 4: Setup Mount Folders +Let's assume you have to mount 3 buckets, bucket-a, bucket-b, bucket-c at path /opt/mnt/bucket-a, /opt/mnt/bucket-b, +/opt/mnt/bucket-c respectively. + +Create the mount folder and set its ownership to desired user, group. +```shell +[opc@base-inst-boot ~]$ sudo mkdir /opt/mnt +[opc@base-inst-boot ~]$ sudo chown -R opc:adm /opt/mnt +``` + +Set chmod permissions to user, group, others as desired for each mount path +```shell +[opc@base-inst-boot ~]$ sudo chmod 764 /opt/mnt +[opc@base-inst-boot ~]$ ls -al /opt/mnt/ +total 0 +drwxrw-r--. 2 opc adm 6 Apr 7 18:01 . +drwxr-xr-x. 10 root root 179 Apr 7 18:01 .. + +[opc@base-inst-boot ~]$ mkdir -p /opt/mnt/bucket-a +[opc@base-inst-boot ~]$ mkdir -p /opt/mnt/bucket-b +[opc@base-inst-boot ~]$ mkdir -p /opt/mnt/bucket-c + +[opc@base-inst-boot ~]$ ls -al /opt/mnt +total 0 +drwxrw-r--. 5 opc adm 54 Apr 7 18:17 . +drwxr-xr-x. 10 root root 179 Apr 7 18:01 .. +drwxrwxr-x. 2 opc opc 6 Apr 7 18:17 bucket-a +drwxrwxr-x. 2 opc opc 6 Apr 7 18:17 bucket-b +drwxrwxr-x. 2 opc opc 6 Apr 7 18:17 bucket-c +``` + +## Step 5: Identify Rclone mount CLI configuration settings to use. +Please read through this [rclone mount](https://rclone.org/commands/rclone_mount/) page completely to really +understand the mount and its flags, what is rclone +[virtual file system](https://rclone.org/commands/rclone_mount/#vfs-virtual-file-system) mode settings and +how to effectively use them for desired Read/Write consistencies. + +Local File systems expect things to be 100% reliable, whereas cloud storage systems are a long way from 100% reliable. +Object storage can throw several errors like 429, 503, 404 etc. The rclone sync/copy commands cope with this with +lots of retries. However rclone mount can't use retries in the same way without making local copies of the uploads. +Please Look at the VFS File Caching for solutions to make mount more reliable. + +First lets understand the rclone mount flags and some global flags for troubleshooting. + +```shell + +rclone mount \ + ossa:bucket-a \ # Remote:bucket-name + /opt/mnt/bucket-a \ # Local mount folder + --config /etc/rclone/rclone.conf \ # Path to rclone config file + --allow-non-empty \ # Allow mounting over a non-empty directory + --dir-perms 0770 \ # Directory permissions (default 0777) + --file-perms 0660 \ # File permissions (default 0666) + --allow-other \ # Allow access to other users + --umask 0117 \ # sets (660) rw-rw---- as permissions for the mount using the umask + --transfers 8 \ # default 4, can be set to adjust the number of parallel uploads of modified files to remote from the cache + --tpslimit 50 \ # Limit HTTP transactions per second to this. A transaction is roughly defined as an API call; + # its exact meaning will depend on the backend. For HTTP based backends it is an HTTP PUT/GET/POST/etc and its response + --cache-dir /tmp/rclone/cache # Directory rclone will use for caching. + --dir-cache-time 5m \ # Time to cache directory entries for (default 5m0s) + --vfs-cache-mode writes \ # Cache mode off|minimal|writes|full (default off), writes gives the maximum compatiblity like a local disk + --vfs-cache-max-age 20m \ # Max age of objects in the cache (default 1h0m0s) + --vfs-cache-max-size 10G \ # Max total size of objects in the cache (default off) + --vfs-cache-poll-interval 1m \ # Interval to poll the cache for stale objects (default 1m0s) + --vfs-write-back 5s \ # Time to writeback files after last use when using cache (default 5s). + # Note that files are written back to the remote only when they are closed and + # if they haven't been accessed for --vfs-write-back seconds. If rclone is quit or + # dies with files that haven't been uploaded, these will be uploaded next time rclone is run with the same flags. + --vfs-fast-fingerprint # Use fast (less accurate) fingerprints for change detection. + --log-level ERROR \ # log level, can be DEBUG, INFO, ERROR + --log-file /var/log/rclone/oosa-bucket-a.log # rclone application log + +``` + +### --vfs-cache-mode writes + +In this mode files opened for read only are still read directly from the remote, write only and read/write files are +buffered to disk first. This mode should support all normal file system operations. If an upload fails it will be +retried at exponentially increasing intervals up to 1 minute. + +VFS cache mode of writes is recommended, so that application can have maximum compatibility of using remote storage +as a local disk, when write is finished, file is closed, it is uploaded to backend remote after vfs-write-back duration +has elapsed. If rclone is quit or dies with files that haven't been uploaded, these will be uploaded next time rclone +is run with the same flags. + +### --tpslimit float + +Limit transactions per second to this number. Default is 0 which is used to mean unlimited transactions per second. + +A transaction is roughly defined as an API call; its exact meaning will depend on the backend. For HTTP based backends +it is an HTTP PUT/GET/POST/etc and its response. For FTP/SFTP it is a round trip transaction over TCP. + +For example, to limit rclone to 10 transactions per second use --tpslimit 10, or to 1 transaction every 2 seconds +use --tpslimit 0.5. + +Use this when the number of transactions per second from rclone is causing a problem with the cloud storage +provider (e.g. getting you banned or rate limited or throttled). + +This can be very useful for rclone mount to control the behaviour of applications using it. Let's guess and say Object +storage allows roughly 100 tps per tenant, so to be on safe side, it will be wise to set this at 50. (tune it to actuals per +region) + +### --vfs-fast-fingerprint + +If you use the --vfs-fast-fingerprint flag then rclone will not include the slow operations in the fingerprint. This +makes the fingerprinting less accurate but much faster and will improve the opening time of cached files. If you are +running a vfs cache over local, s3, object storage or swift backends then using this flag is recommended. + +Various parts of the VFS use fingerprinting to see if a local file copy has changed relative to a remote file. +Fingerprints are made from: +- size +- modification time +- hash + where available on an object. + + +## Step 6: Mounting Options, Use Any one option + +### Step 6a: Run as a Service Daemon: Configure FSTAB entry for Rclone mount +Add this entry in /etc/fstab : +```shell +ossa:bucket-a /opt/mnt/bucket-a rclone rw,umask=0117,nofail,_netdev,args2env,config=/etc/rclone/rclone.conf,uid=1000,gid=4, +file_perms=0760,dir_perms=0760,allow_other,vfs_cache_mode=writes,cache_dir=/tmp/rclone/cache 0 0 +``` +IMPORTANT: Please note in fstab entry arguments are specified as underscore instead of dash, +example: vfs_cache_mode=writes instead of vfs-cache-mode=writes +Rclone in the mount helper mode will split -o argument(s) by comma, replace _ by - and prepend -- to +get the command-line flags. Options containing commas or spaces can be wrapped in single or double quotes. +Any inner quotes inside outer quotes of the same type should be doubled. + + +then run sudo mount -av +```shell + +[opc@base-inst-boot ~]$ sudo mount -av +/ : ignored +/boot : already mounted +/boot/efi : already mounted +/var/oled : already mounted +/dev/shm : already mounted +none : ignored +/opt/mnt/bucket-a : already mounted # This is the bucket mounted information, running mount -av again and again is idempotent. + +``` + +## Step 6b: Run as a Service Daemon: Configure systemd entry for Rclone mount + +If you are familiar with configuring systemd unit files, you can also configure the each rclone mount into a +systemd units file. +various examples in git search: https://github.com/search?l=Shell&q=rclone+unit&type=Code +```shell +tee "/etc/systemd/system/rclonebucketa.service" > /dev/null <&1 | grep -i 'Transport endpoint is not connected' | awk '{print ""$2"" }' | tr -d \:) +rclone_list=$(findmnt -t fuse.rclone -n 2>&1 | awk '{print ""$1"" }' | tr -d \:) +IFS=$'\n'; set -f +intersection=$(comm -12 <(printf '%s\n' "$erroneous_list" | sort) <(printf '%s\n' "$rclone_list" | sort)) +for directory in $intersection +do + echo "$directory is being fixed." + sudo fusermount -uz "$directory" +done +sudo mount -av + +``` +Script to idempotently add a Cron job to babysit the mount paths every 5 minutes +```shell +echo "Creating rclone nanny cron job." +croncmd="/etc/rclone/scripts/rclone_nanny_script.sh" +cronjob="*/5 * * * * $croncmd" +# idempotency - adds rclone_nanny cronjob only if absent. +( crontab -l | grep -v -F "$croncmd" || : ; echo "$cronjob" ) | crontab - +echo "Finished creating rclone nanny cron job." +``` + +Ensure the crontab is added, so that above nanny script runs every 5 minutes. +```shell +[opc@base-inst-boot ~]$ sudo crontab -l +*/5 * * * * /etc/rclone/scripts/rclone_nanny_script.sh +[opc@base-inst-boot ~]$ +``` + +## Step 8: Optional: Setup NFS server to access the mount points of rclone + +Let's say you want to make the rclone mount path /opt/mnt/bucket-a available as a NFS server export so that other +clients can access it by using a NFS client. + +### Step 8a : Setup NFS server + +Install NFS Utils +```shell +sudo yum install -y nfs-utils +``` + +Export the desired directory via NFS Server in the same machine where rclone has mounted to, ensure NFS serivce has +desired permissions to read the directory. If it runs as root, then it will have permissions for sure, but if it runs +as separate user then ensure that user has necessary desired privileges. +```shell +# this gives opc user and adm (administrators group) ownership to the path, so any user belonging to adm group will be able to access the files. +[opc@tools ~]$ sudo chown -R opc:adm /opt/mnt/bucket-a/ +[opc@tools ~]$ sudo chmod 764 /opt/mnt/bucket-a/ + +# Not export the mount path of rclone for exposing via nfs server +# There are various nfs export options that you should keep per desired usage. +# Syntax is +# (