forked from TrueCloudLab/rclone
b2: Make sure each upload has at least one upload slot - fixes #731
This commit is contained in:
parent
0238558a4b
commit
544ca6035a
2 changed files with 73 additions and 5 deletions
68
b2/b2.go
68
b2/b2.go
|
@ -94,6 +94,7 @@ type Fs struct {
|
|||
authMu sync.Mutex // lock for authorizing the account
|
||||
pacer *pacer.Pacer // To pace and retry the API calls
|
||||
uploadTokens chan struct{} // control concurrency of uploads
|
||||
extraTokens chan struct{} // extra tokens for multipart uploads
|
||||
}
|
||||
|
||||
// Object describes a b2 object
|
||||
|
@ -248,6 +249,7 @@ func NewFs(name, root string) (fs.Fs, error) {
|
|||
srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler),
|
||||
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
||||
uploadTokens: make(chan struct{}, fs.Config.Transfers),
|
||||
extraTokens: make(chan struct{}, fs.Config.Transfers),
|
||||
}
|
||||
// Set the test flag if required
|
||||
if *b2TestMode != "" {
|
||||
|
@ -255,9 +257,10 @@ func NewFs(name, root string) (fs.Fs, error) {
|
|||
f.srv.SetHeader(testModeHeader, testMode)
|
||||
fs.Debug(f, "Setting test header \"%s: %s\"", testModeHeader, testMode)
|
||||
}
|
||||
// Fill up the upload tokens
|
||||
// Fill up the upload and extra tokens
|
||||
for i := 0; i < fs.Config.Transfers; i++ {
|
||||
f.returnUploadToken()
|
||||
f.extraTokens <- struct{}{}
|
||||
}
|
||||
err = f.authorizeAccount()
|
||||
if err != nil {
|
||||
|
@ -371,6 +374,69 @@ func (f *Fs) returnUploadToken() {
|
|||
f.uploadTokens <- struct{}{}
|
||||
}
|
||||
|
||||
// Help count the multipart uploads
|
||||
type multipartUploadCounter struct {
|
||||
f *Fs
|
||||
uploadToken chan struct{}
|
||||
}
|
||||
|
||||
// Create a new upload counter. This gets an upload token for
|
||||
// exclusive use by this multipart upload - the other tokens are
|
||||
// shared between all the multipart uploads.
|
||||
//
|
||||
// Call .finished() when done to return the upload token.
|
||||
func (f *Fs) newMultipartUploadCounter() *multipartUploadCounter {
|
||||
m := &multipartUploadCounter{
|
||||
f: f,
|
||||
uploadToken: make(chan struct{}, 1),
|
||||
}
|
||||
f.getUploadToken()
|
||||
m.uploadToken <- struct{}{}
|
||||
return m
|
||||
}
|
||||
|
||||
// Gets an upload token for a multipart upload
|
||||
//
|
||||
// This gets one token only from the first class tokens. This means
|
||||
// that the multiplart upload is guaranteed at least one token as
|
||||
// there is one first class token per possible upload.
|
||||
//
|
||||
// Pass the return value to returnMultipartUploadToken
|
||||
func (m *multipartUploadCounter) getMultipartUploadToken() bool {
|
||||
// get the upload token by preference
|
||||
select {
|
||||
case <-m.uploadToken:
|
||||
return true
|
||||
default:
|
||||
}
|
||||
// ...otherwise wait for the first one to appear.
|
||||
//
|
||||
// If both uploadToken and extraTokens are ready at this point
|
||||
// (unlikely but possible) and we get an extraToken instead of
|
||||
// an uploadToken this will not cause any harm - this
|
||||
// multipart upload will get an extra upload slot temporarily.
|
||||
select {
|
||||
case <-m.uploadToken:
|
||||
return true
|
||||
case <-m.f.extraTokens:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Return a multipart upload token retreived from getMultipartUploadToken
|
||||
func (m *multipartUploadCounter) returnMultipartUploadToken(firstClass bool) {
|
||||
if firstClass {
|
||||
m.uploadToken <- struct{}{}
|
||||
} else {
|
||||
m.f.extraTokens <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Mark us finished with this upload counter
|
||||
func (m *multipartUploadCounter) finished() {
|
||||
m.f.returnUploadToken()
|
||||
}
|
||||
|
||||
// Return an Object from a path
|
||||
//
|
||||
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
||||
|
|
10
b2/upload.go
10
b2/upload.go
|
@ -240,6 +240,8 @@ func (up *largeUpload) Upload() error {
|
|||
errs := make(chan error, 1)
|
||||
var wg sync.WaitGroup
|
||||
var err error
|
||||
uploadCounter := up.f.newMultipartUploadCounter()
|
||||
defer uploadCounter.finished()
|
||||
fs.AccountByPart(up.o) // Cancel whole file accounting before reading
|
||||
outer:
|
||||
for part := int64(1); part <= up.parts; part++ {
|
||||
|
@ -264,10 +266,10 @@ outer:
|
|||
|
||||
// Transfer the chunk
|
||||
// Get upload Token
|
||||
up.f.getUploadToken()
|
||||
token := uploadCounter.getMultipartUploadToken()
|
||||
wg.Add(1)
|
||||
go func(part int64, buf []byte) {
|
||||
defer up.f.returnUploadToken()
|
||||
go func(part int64, buf []byte, token bool) {
|
||||
defer uploadCounter.returnMultipartUploadToken(token)
|
||||
defer wg.Done()
|
||||
err := up.transferChunk(part, buf)
|
||||
if err != nil {
|
||||
|
@ -276,7 +278,7 @@ outer:
|
|||
default:
|
||||
}
|
||||
}
|
||||
}(part, buf)
|
||||
}(part, buf, token)
|
||||
|
||||
remaining -= reqSize
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue