diff --git a/b2/b2.go b/b2/b2.go index f28630b83..a4991a1b5 100644 --- a/b2/b2.go +++ b/b2/b2.go @@ -94,6 +94,7 @@ type Fs struct { authMu sync.Mutex // lock for authorizing the account pacer *pacer.Pacer // To pace and retry the API calls uploadTokens chan struct{} // control concurrency of uploads + extraTokens chan struct{} // extra tokens for multipart uploads } // Object describes a b2 object @@ -248,6 +249,7 @@ func NewFs(name, root string) (fs.Fs, error) { srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler), pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), uploadTokens: make(chan struct{}, fs.Config.Transfers), + extraTokens: make(chan struct{}, fs.Config.Transfers), } // Set the test flag if required if *b2TestMode != "" { @@ -255,9 +257,10 @@ func NewFs(name, root string) (fs.Fs, error) { f.srv.SetHeader(testModeHeader, testMode) fs.Debug(f, "Setting test header \"%s: %s\"", testModeHeader, testMode) } - // Fill up the upload tokens + // Fill up the upload and extra tokens for i := 0; i < fs.Config.Transfers; i++ { f.returnUploadToken() + f.extraTokens <- struct{}{} } err = f.authorizeAccount() if err != nil { @@ -371,6 +374,69 @@ func (f *Fs) returnUploadToken() { f.uploadTokens <- struct{}{} } +// Help count the multipart uploads +type multipartUploadCounter struct { + f *Fs + uploadToken chan struct{} +} + +// Create a new upload counter. This gets an upload token for +// exclusive use by this multipart upload - the other tokens are +// shared between all the multipart uploads. +// +// Call .finished() when done to return the upload token. +func (f *Fs) newMultipartUploadCounter() *multipartUploadCounter { + m := &multipartUploadCounter{ + f: f, + uploadToken: make(chan struct{}, 1), + } + f.getUploadToken() + m.uploadToken <- struct{}{} + return m +} + +// Gets an upload token for a multipart upload +// +// This gets one token only from the first class tokens. This means +// that the multiplart upload is guaranteed at least one token as +// there is one first class token per possible upload. +// +// Pass the return value to returnMultipartUploadToken +func (m *multipartUploadCounter) getMultipartUploadToken() bool { + // get the upload token by preference + select { + case <-m.uploadToken: + return true + default: + } + // ...otherwise wait for the first one to appear. + // + // If both uploadToken and extraTokens are ready at this point + // (unlikely but possible) and we get an extraToken instead of + // an uploadToken this will not cause any harm - this + // multipart upload will get an extra upload slot temporarily. + select { + case <-m.uploadToken: + return true + case <-m.f.extraTokens: + return false + } +} + +// Return a multipart upload token retreived from getMultipartUploadToken +func (m *multipartUploadCounter) returnMultipartUploadToken(firstClass bool) { + if firstClass { + m.uploadToken <- struct{}{} + } else { + m.f.extraTokens <- struct{}{} + } +} + +// Mark us finished with this upload counter +func (m *multipartUploadCounter) finished() { + m.f.returnUploadToken() +} + // Return an Object from a path // // If it can't be found it returns the error fs.ErrorObjectNotFound. diff --git a/b2/upload.go b/b2/upload.go index cdb7dd31f..cb43adc70 100644 --- a/b2/upload.go +++ b/b2/upload.go @@ -240,6 +240,8 @@ func (up *largeUpload) Upload() error { errs := make(chan error, 1) var wg sync.WaitGroup var err error + uploadCounter := up.f.newMultipartUploadCounter() + defer uploadCounter.finished() fs.AccountByPart(up.o) // Cancel whole file accounting before reading outer: for part := int64(1); part <= up.parts; part++ { @@ -264,10 +266,10 @@ outer: // Transfer the chunk // Get upload Token - up.f.getUploadToken() + token := uploadCounter.getMultipartUploadToken() wg.Add(1) - go func(part int64, buf []byte) { - defer up.f.returnUploadToken() + go func(part int64, buf []byte, token bool) { + defer uploadCounter.returnMultipartUploadToken(token) defer wg.Done() err := up.transferChunk(part, buf) if err != nil { @@ -276,7 +278,7 @@ outer: default: } } - }(part, buf) + }(part, buf, token) remaining -= reqSize }