b2: Make upload multi-threaded - fixes #531

This commit is contained in:
Nick Craig-Wood 2016-07-01 10:04:52 +01:00
parent 70dc97231e
commit 46f8e50614
3 changed files with 110 additions and 19 deletions

View file

@ -214,3 +214,18 @@ type FinishLargeFileRequest struct {
ID string `json:"fileId"` // The unique identifier of the file being uploaded.
SHA1s []string `json:"partSha1Array"` // A JSON array of hex SHA1 checksums of the parts of the large file. This is a double-check that the right parts were uploaded in the right order, and that none were missed. Note that the part numbers start at 1, and the SHA1 of the part 1 is the first string in the array, at index 0.
}
// CancelLargeFileRequest is passed to b2_finish_large_file
//
// The response is a CancelLargeFileResponse
type CancelLargeFileRequest struct {
ID string `json:"fileId"` // The unique identifier of the file being uploaded.
}
// CancelLargeFileResponse is the response to CancelLargeFileRequest
type CancelLargeFileResponse struct {
ID string `json:"fileId"` // The unique identifier of the file being uploaded.
Name string `json:"fileName"` // The name of this file.
AccountID string `json:"accountId"` // The identifier for the account.
BucketID string `json:"bucketId"` // The unique ID of the bucket.
}

View file

@ -89,6 +89,7 @@ type Fs struct {
uploads []*api.GetUploadURLResponse // result of get upload URL calls
authMu sync.Mutex // lock for authorizing the account
pacer *pacer.Pacer // To pace and retry the API calls
uploadTokens chan struct{} // control concurrency of uploads
}
// Object describes a b2 object
@ -213,14 +214,19 @@ func NewFs(name, root string) (fs.Fs, error) {
}
endpoint := fs.ConfigFile.MustValue(name, "endpoint", defaultEndpoint)
f := &Fs{
name: name,
bucket: bucket,
root: directory,
account: account,
key: key,
endpoint: endpoint,
srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
name: name,
bucket: bucket,
root: directory,
account: account,
key: key,
endpoint: endpoint,
srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler),
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
uploadTokens: make(chan struct{}, fs.Config.Transfers),
}
// Fill up the upload tokens
for i := 0; i < fs.Config.Transfers; i++ {
f.returnUploadToken()
}
err = f.authorizeAccount()
if err != nil {
@ -324,6 +330,16 @@ func (f *Fs) clearUploadURL() {
f.uploadMu.Unlock()
}
// Gets an upload token to control the concurrency
func (f *Fs) getUploadToken() {
<-f.uploadTokens
}
// Return an upload token
func (f *Fs) returnUploadToken() {
f.uploadTokens <- struct{}{}
}
// Return an Object from a path
//
// If it can't be found it returns the error fs.ErrorObjectNotFound.
@ -1092,6 +1108,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
in = fd
}
// Get upload Token
o.fs.getUploadToken()
defer o.fs.returnUploadToken()
// Get upload URL
upload, err := o.fs.getUploadURL()
if err != nil {

View file

@ -188,6 +188,11 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error {
up.returnUploadURL(upload)
return up.f.shouldRetryNoReauth(resp, err)
})
if err != nil {
fs.Debug(up.o, "Error sending chunk %d: %v", part, err)
} else {
fs.Debug(up.o, "Done sending chunk %d", part)
}
return err
}
@ -212,34 +217,85 @@ func (up *largeUpload) finish() error {
return up.o.decodeMetaDataFileInfo(&response)
}
// cancel aborts the large upload
func (up *largeUpload) cancel() error {
opts := rest.Opts{
Method: "POST",
Path: "/b2_cancel_large_file",
}
var request = api.CancelLargeFileRequest{
ID: up.id,
}
var response api.CancelLargeFileResponse
err := up.f.pacer.Call(func() (bool, error) {
resp, err := up.f.srv.CallJSON(&opts, &request, &response)
return up.f.shouldRetry(resp, err)
})
return err
}
// Upload uploads the chunks from the input
func (up *largeUpload) Upload() error {
fs.Debug(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id)
buf := make([]byte, chunkSize)
remaining := up.size
errs := make(chan error, 1)
var wg sync.WaitGroup
var err error
outer:
for part := int64(1); part <= up.parts; part++ {
// Check any errors
select {
case err = <-errs:
break outer
default:
}
reqSize := remaining
if reqSize >= int64(chunkSize) {
reqSize = int64(chunkSize)
} else {
buf = buf[:reqSize]
}
// FIXME could parallelise this
// Read the chunk
_, err := io.ReadFull(up.in, buf)
buf := make([]byte, reqSize)
_, err = io.ReadFull(up.in, buf)
if err != nil {
return err
break outer
}
// Transfer the chunk
err = up.transferChunk(part, buf)
if err != nil {
return err
}
// Get upload Token
up.f.getUploadToken()
wg.Add(1)
go func(part int64, buf []byte) {
defer up.f.returnUploadToken()
defer wg.Done()
err := up.transferChunk(part, buf)
if err != nil {
select {
case errs <- err:
default:
}
}
}(part, buf)
remaining -= reqSize
}
wg.Wait()
if err == nil {
select {
case err = <-errs:
default:
}
}
if err != nil {
fs.Debug(up.o, "Cancelling large file upload due to error: %v", err)
cancelErr := up.cancel()
if cancelErr != nil {
fs.ErrorLog(up.o, "Failed to cancel large file upload: %v", cancelErr)
}
return err
}
// Check any errors
fs.Debug(up.o, "Finishing large file upload")
return up.finish()
}