From 46f8e5061425deaa55421bdd3b7ab6c03fc476f2 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 1 Jul 2016 10:04:52 +0100 Subject: [PATCH] b2: Make upload multi-threaded - fixes #531 --- b2/api/types.go | 15 ++++++++++ b2/b2.go | 36 ++++++++++++++++++----- b2/upload.go | 78 ++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 110 insertions(+), 19 deletions(-) diff --git a/b2/api/types.go b/b2/api/types.go index b8a8d612c..abe907440 100644 --- a/b2/api/types.go +++ b/b2/api/types.go @@ -214,3 +214,18 @@ type FinishLargeFileRequest struct { ID string `json:"fileId"` // The unique identifier of the file being uploaded. SHA1s []string `json:"partSha1Array"` // A JSON array of hex SHA1 checksums of the parts of the large file. This is a double-check that the right parts were uploaded in the right order, and that none were missed. Note that the part numbers start at 1, and the SHA1 of the part 1 is the first string in the array, at index 0. } + +// CancelLargeFileRequest is passed to b2_finish_large_file +// +// The response is a CancelLargeFileResponse +type CancelLargeFileRequest struct { + ID string `json:"fileId"` // The unique identifier of the file being uploaded. +} + +// CancelLargeFileResponse is the response to CancelLargeFileRequest +type CancelLargeFileResponse struct { + ID string `json:"fileId"` // The unique identifier of the file being uploaded. + Name string `json:"fileName"` // The name of this file. + AccountID string `json:"accountId"` // The identifier for the account. + BucketID string `json:"bucketId"` // The unique ID of the bucket. +} diff --git a/b2/b2.go b/b2/b2.go index 191428aa2..54cdc4f89 100644 --- a/b2/b2.go +++ b/b2/b2.go @@ -89,6 +89,7 @@ type Fs struct { uploads []*api.GetUploadURLResponse // result of get upload URL calls authMu sync.Mutex // lock for authorizing the account pacer *pacer.Pacer // To pace and retry the API calls + uploadTokens chan struct{} // control concurrency of uploads } // Object describes a b2 object @@ -213,14 +214,19 @@ func NewFs(name, root string) (fs.Fs, error) { } endpoint := fs.ConfigFile.MustValue(name, "endpoint", defaultEndpoint) f := &Fs{ - name: name, - bucket: bucket, - root: directory, - account: account, - key: key, - endpoint: endpoint, - srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler), - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + name: name, + bucket: bucket, + root: directory, + account: account, + key: key, + endpoint: endpoint, + srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler), + pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + uploadTokens: make(chan struct{}, fs.Config.Transfers), + } + // Fill up the upload tokens + for i := 0; i < fs.Config.Transfers; i++ { + f.returnUploadToken() } err = f.authorizeAccount() if err != nil { @@ -324,6 +330,16 @@ func (f *Fs) clearUploadURL() { f.uploadMu.Unlock() } +// Gets an upload token to control the concurrency +func (f *Fs) getUploadToken() { + <-f.uploadTokens +} + +// Return an upload token +func (f *Fs) returnUploadToken() { + f.uploadTokens <- struct{}{} +} + // Return an Object from a path // // If it can't be found it returns the error fs.ErrorObjectNotFound. @@ -1092,6 +1108,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { in = fd } + // Get upload Token + o.fs.getUploadToken() + defer o.fs.returnUploadToken() + // Get upload URL upload, err := o.fs.getUploadURL() if err != nil { diff --git a/b2/upload.go b/b2/upload.go index 99125764d..e961590b4 100644 --- a/b2/upload.go +++ b/b2/upload.go @@ -188,6 +188,11 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error { up.returnUploadURL(upload) return up.f.shouldRetryNoReauth(resp, err) }) + if err != nil { + fs.Debug(up.o, "Error sending chunk %d: %v", part, err) + } else { + fs.Debug(up.o, "Done sending chunk %d", part) + } return err } @@ -212,34 +217,85 @@ func (up *largeUpload) finish() error { return up.o.decodeMetaDataFileInfo(&response) } +// cancel aborts the large upload +func (up *largeUpload) cancel() error { + opts := rest.Opts{ + Method: "POST", + Path: "/b2_cancel_large_file", + } + var request = api.CancelLargeFileRequest{ + ID: up.id, + } + var response api.CancelLargeFileResponse + err := up.f.pacer.Call(func() (bool, error) { + resp, err := up.f.srv.CallJSON(&opts, &request, &response) + return up.f.shouldRetry(resp, err) + }) + return err +} + // Upload uploads the chunks from the input func (up *largeUpload) Upload() error { fs.Debug(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id) - buf := make([]byte, chunkSize) remaining := up.size + errs := make(chan error, 1) + var wg sync.WaitGroup + var err error +outer: for part := int64(1); part <= up.parts; part++ { + // Check any errors + select { + case err = <-errs: + break outer + default: + } + reqSize := remaining if reqSize >= int64(chunkSize) { reqSize = int64(chunkSize) - } else { - buf = buf[:reqSize] } - // FIXME could parallelise this - // Read the chunk - _, err := io.ReadFull(up.in, buf) + buf := make([]byte, reqSize) + _, err = io.ReadFull(up.in, buf) if err != nil { - return err + break outer } // Transfer the chunk - err = up.transferChunk(part, buf) - if err != nil { - return err - } + // Get upload Token + up.f.getUploadToken() + wg.Add(1) + go func(part int64, buf []byte) { + defer up.f.returnUploadToken() + defer wg.Done() + err := up.transferChunk(part, buf) + if err != nil { + select { + case errs <- err: + default: + } + } + }(part, buf) remaining -= reqSize } + wg.Wait() + if err == nil { + select { + case err = <-errs: + default: + } + } + if err != nil { + fs.Debug(up.o, "Cancelling large file upload due to error: %v", err) + cancelErr := up.cancel() + if cancelErr != nil { + fs.ErrorLog(up.o, "Failed to cancel large file upload: %v", cancelErr) + } + return err + } + // Check any errors + fs.Debug(up.o, "Finishing large file upload") return up.finish() }