Consider the extra options for rclone http operations

Add some documentation bits for functions and structs
This commit is contained in:
Klaas Freitag 2024-11-11 15:46:53 +01:00 committed by Christian Richter
parent f9f810ccfc
commit e07847e3ea
4 changed files with 17 additions and 10 deletions

View file

@ -18,11 +18,13 @@ var (
ErrFingerprintNotSet = errors.New("tus fingerprint not set") ErrFingerprintNotSet = errors.New("tus fingerprint not set")
) )
// ClientError represents an error state of a client
type ClientError struct { type ClientError struct {
Code int Code int
Body []byte Body []byte
} }
// Error returns an error string containing the client error code
func (c ClientError) Error() string { func (c ClientError) Error() string {
return fmt.Sprintf("unexpected status code: %d", c.Code) return fmt.Sprintf("unexpected status code: %d", c.Code)
} }

View file

@ -8,8 +8,10 @@ import (
"strings" "strings"
) )
// Metadata is a typedef for a string to string map to hold metadata
type Metadata map[string]string type Metadata map[string]string
// Upload is a struct containing the file status during upload
type Upload struct { type Upload struct {
stream io.ReadSeeker stream io.ReadSeeker
size int64 size int64

View file

@ -13,6 +13,7 @@ import (
"github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/rest"
) )
// Uploader holds all information about a currently running upload
type Uploader struct { type Uploader struct {
fs *Fs fs *Fs
url string url string
@ -50,7 +51,7 @@ func (f *Fs) shouldRetryChunk(ctx context.Context, resp *http.Response, err erro
return f.shouldRetry(ctx, resp, err) return f.shouldRetry(ctx, resp, err)
} }
func (u *Uploader) uploadChunck(ctx context.Context, body io.Reader, size int64, offset int64) (int64, error) { func (u *Uploader) uploadChunck(ctx context.Context, body io.Reader, size int64, offset int64, options ...fs.OpenOption) (int64, error) {
var method string var method string
if !u.overridePatchMethod { if !u.overridePatchMethod {
@ -82,6 +83,7 @@ func (u *Uploader) uploadChunck(ctx context.Context, body io.Reader, size int64,
Body: body, Body: body,
ContentType: "application/offset+octet-stream", ContentType: "application/offset+octet-stream",
ExtraHeaders: extraHeaders, ExtraHeaders: extraHeaders,
Options: options,
} }
var newOffset int64 var newOffset int64
@ -100,12 +102,12 @@ func (u *Uploader) uploadChunck(ctx context.Context, body io.Reader, size int64,
} }
// Upload uploads the entire body to the server. // Upload uploads the entire body to the server.
func (u *Uploader) Upload(ctx context.Context) error { func (u *Uploader) Upload(ctx context.Context, options ...fs.OpenOption) error {
var cnt int = 1 var cnt int = 1
fs.Debug(u.fs, "Uploaded starts") fs.Debug(u.fs, "Uploaded starts")
for u.offset < u.upload.size && !u.aborted { for u.offset < u.upload.size && !u.aborted {
err := u.UploadChunck(ctx, cnt) err := u.UploadChunck(ctx, cnt, options...)
cnt++ cnt++
if err != nil { if err != nil {
return err return err
@ -117,7 +119,7 @@ func (u *Uploader) Upload(ctx context.Context) error {
} }
// UploadChunck uploads a single chunck. // UploadChunck uploads a single chunck.
func (u *Uploader) UploadChunck(ctx context.Context, cnt int) error { func (u *Uploader) UploadChunck(ctx context.Context, cnt int, options ...fs.OpenOption) error {
chunkSize := u.fs.opt.ChunkSize chunkSize := u.fs.opt.ChunkSize
data := make([]byte, chunkSize) data := make([]byte, chunkSize)
@ -137,7 +139,7 @@ func (u *Uploader) UploadChunck(ctx context.Context, cnt int) error {
body := bytes.NewBuffer(data[:size]) body := bytes.NewBuffer(data[:size])
newOffset, err := u.uploadChunck(ctx, body, int64(size), u.offset) newOffset, err := u.uploadChunck(ctx, body, int64(size), u.offset, options...)
if err == nil { if err == nil {
fs.Debugf(u.fs, "Uploaded chunk no %d ok, range %d -> %d", cnt, u.offset, newOffset) fs.Debugf(u.fs, "Uploaded chunk no %d ok, range %d -> %d", cnt, u.offset, newOffset)
@ -158,7 +160,7 @@ func (u *Uploader) UploadChunck(ctx context.Context, cnt int) error {
// Waits for a signal to broadcast to all subscribers // Waits for a signal to broadcast to all subscribers
func (u *Uploader) broadcastProgress() { func (u *Uploader) broadcastProgress() {
for _ = range u.notifyChan { for range u.notifyChan {
for _, c := range u.uploadSubs { for _, c := range u.uploadSubs {
c <- *u.upload c <- *u.upload
} }

View file

@ -39,10 +39,10 @@ func (o *Object) updateViaTus(ctx context.Context, in io.Reader, contentType str
upload := NewUpload(in, src.Size(), metadata, fingerprint) upload := NewUpload(in, src.Size(), metadata, fingerprint)
// create the uploader. // create the uploader.
uploader, err := o.CreateUploader(ctx, upload) uploader, err := o.CreateUploader(ctx, upload, options...)
if err == nil { if err == nil {
// start the uploading process. // start the uploading process.
err = uploader.Upload(ctx) err = uploader.Upload(ctx, options...)
} }
return err return err
@ -64,8 +64,8 @@ func (f *Fs) shouldRetryCreateUpload(ctx context.Context, resp *http.Response, e
return f.shouldRetry(ctx, resp, err) return f.shouldRetry(ctx, resp, err)
} }
// CreateUpload creates a new upload in the server. // CreateUpload creates a new upload to the server.
func (o *Object) CreateUploader(ctx context.Context, u *Upload) (*Uploader, error) { func (o *Object) CreateUploader(ctx context.Context, u *Upload, options ...fs.OpenOption) (*Uploader, error) {
if u == nil { if u == nil {
return nil, ErrNilUpload return nil, ErrNilUpload
} }
@ -89,6 +89,7 @@ func (o *Object) CreateUploader(ctx context.Context, u *Upload) (*Uploader, erro
RootURL: o.fs.endpointURL, RootURL: o.fs.endpointURL,
ContentLength: &l, ContentLength: &l,
ExtraHeaders: o.extraHeaders(ctx, o), ExtraHeaders: o.extraHeaders(ctx, o),
Options: options,
} }
opts.ExtraHeaders["Upload-Length"] = strconv.FormatInt(u.size, 10) opts.ExtraHeaders["Upload-Length"] = strconv.FormatInt(u.size, 10)
opts.ExtraHeaders["Upload-Metadata"] = u.EncodedMetadata() opts.ExtraHeaders["Upload-Metadata"] = u.EncodedMetadata()