diff --git a/b2/b2.go b/b2/b2.go index 280a34958..bd225593f 100644 --- a/b2/b2.go +++ b/b2/b2.go @@ -93,8 +93,7 @@ type Fs struct { uploads []*api.GetUploadURLResponse // result of get upload URL calls authMu sync.Mutex // lock for authorizing the account pacer *pacer.Pacer // To pace and retry the API calls - uploadTokens chan struct{} // control concurrency of uploads - extraTokens chan struct{} // extra tokens for multipart uploads + bufferTokens chan []byte // control concurrency of multipart uploads } // Object describes a b2 object @@ -253,8 +252,7 @@ func NewFs(name, root string) (fs.Fs, error) { endpoint: endpoint, srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler), pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), - uploadTokens: make(chan struct{}, fs.Config.Transfers), - extraTokens: make(chan struct{}, fs.Config.Transfers), + bufferTokens: make(chan []byte, fs.Config.Transfers), } f.features = (&fs.Features{ReadMimeType: true, WriteMimeType: true}).Fill(f) // Set the test flag if required @@ -263,10 +261,9 @@ func NewFs(name, root string) (fs.Fs, error) { f.srv.SetHeader(testModeHeader, testMode) fs.Debug(f, "Setting test header \"%s: %s\"", testModeHeader, testMode) } - // Fill up the upload and extra tokens + // Fill up the buffer tokens for i := 0; i < fs.Config.Transfers; i++ { - f.returnUploadToken() - f.extraTokens <- struct{}{} + f.bufferTokens <- nil } err = f.authorizeAccount() if err != nil { @@ -370,77 +367,24 @@ func (f *Fs) clearUploadURL() { f.uploadMu.Unlock() } -// Gets an upload token to control the concurrency -func (f *Fs) getUploadToken() { - <-f.uploadTokens -} - -// Return an upload token -func (f *Fs) returnUploadToken() { - f.uploadTokens <- struct{}{} -} - -// Help count the multipart uploads -type multipartUploadCounter struct { - f *Fs - uploadToken chan struct{} -} - -// Create a new upload counter. This gets an upload token for -// exclusive use by this multipart upload - the other tokens are -// shared between all the multipart uploads. -// -// Call .finished() when done to return the upload token. -func (f *Fs) newMultipartUploadCounter() *multipartUploadCounter { - m := &multipartUploadCounter{ - f: f, - uploadToken: make(chan struct{}, 1), +// getUploadBlock gets a block from the pool of size chunkSize +func (f *Fs) getUploadBlock() []byte { + buf := <-f.bufferTokens + if buf == nil { + buf = make([]byte, chunkSize) } - f.getUploadToken() - m.uploadToken <- struct{}{} - return m + // fs.Debug(f, "Getting upload block %p", buf) + return buf } -// Gets an upload token for a multipart upload -// -// This gets one token only from the first class tokens. This means -// that the multiplart upload is guaranteed at least one token as -// there is one first class token per possible upload. -// -// Pass the return value to returnMultipartUploadToken -func (m *multipartUploadCounter) getMultipartUploadToken() bool { - // get the upload token by preference - select { - case <-m.uploadToken: - return true - default: +// putUploadBlock returns a block to the pool of size chunkSize +func (f *Fs) putUploadBlock(buf []byte) { + buf = buf[:cap(buf)] + if len(buf) != int(chunkSize) { + panic("bad blocksize returned to pool") } - // ...otherwise wait for the first one to appear. - // - // If both uploadToken and extraTokens are ready at this point - // (unlikely but possible) and we get an extraToken instead of - // an uploadToken this will not cause any harm - this - // multipart upload will get an extra upload slot temporarily. - select { - case <-m.uploadToken: - return true - case <-m.f.extraTokens: - return false - } -} - -// Return a multipart upload token retreived from getMultipartUploadToken -func (m *multipartUploadCounter) returnMultipartUploadToken(firstClass bool) { - if firstClass { - m.uploadToken <- struct{}{} - } else { - m.f.extraTokens <- struct{}{} - } -} - -// Mark us finished with this upload counter -func (m *multipartUploadCounter) finished() { - m.f.returnUploadToken() + // fs.Debug(f, "Returning upload block %p", buf) + f.bufferTokens <- buf } // Return an Object from a path @@ -1271,10 +1215,6 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { in = fd } - // Get upload Token - o.fs.getUploadToken() - defer o.fs.returnUploadToken() - // Get upload URL upload, err := o.fs.getUploadURL() if err != nil { diff --git a/b2/upload.go b/b2/upload.go index b9a9ff1fd..9690efeb9 100644 --- a/b2/upload.go +++ b/b2/upload.go @@ -240,8 +240,6 @@ func (up *largeUpload) Upload() error { errs := make(chan error, 1) var wg sync.WaitGroup var err error - uploadCounter := up.f.newMultipartUploadCounter() - defer uploadCounter.finished() fs.AccountByPart(up.o) // Cancel whole file accounting before reading outer: for part := int64(1); part <= up.parts; part++ { @@ -257,20 +255,21 @@ outer: reqSize = int64(chunkSize) } + // Get a block of memory + buf := up.f.getUploadBlock()[:reqSize] + // Read the chunk - buf := make([]byte, reqSize) _, err = io.ReadFull(up.in, buf) if err != nil { + up.f.putUploadBlock(buf) break outer } // Transfer the chunk - // Get upload Token - token := uploadCounter.getMultipartUploadToken() wg.Add(1) - go func(part int64, buf []byte, token bool) { - defer uploadCounter.returnMultipartUploadToken(token) + go func(part int64, buf []byte) { defer wg.Done() + defer up.f.putUploadBlock(buf) err := up.transferChunk(part, buf) if err != nil { select { @@ -278,7 +277,7 @@ outer: default: } } - }(part, buf, token) + }(part, buf) remaining -= reqSize }