b2: Use one upload URL per go routine
This fixes `more than one upload using auth token` errors.
This commit is contained in:
parent
6f46270735
commit
6b6b43402b
1 changed files with 25 additions and 13 deletions
38
b2/b2.go
38
b2/b2.go
|
@ -74,7 +74,7 @@ type Fs struct {
|
||||||
root string // the path we are working on if any
|
root string // the path we are working on if any
|
||||||
info api.AuthorizeAccountResponse // result of authorize call
|
info api.AuthorizeAccountResponse // result of authorize call
|
||||||
uploadMu sync.Mutex // lock for upload variable
|
uploadMu sync.Mutex // lock for upload variable
|
||||||
upload api.GetUploadURLResponse // result of get upload URL call
|
uploads []*api.GetUploadURLResponse // result of get upload URL calls
|
||||||
authMu sync.Mutex // lock for authorizing the account
|
authMu sync.Mutex // lock for authorizing the account
|
||||||
pacer *pacer.Pacer // To pace and retry the API calls
|
pacer *pacer.Pacer // To pace and retry the API calls
|
||||||
}
|
}
|
||||||
|
@ -254,15 +254,17 @@ func (f *Fs) authorizeAccount() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getUploadURL returns the UploadURL and the AuthorizationToken
|
// getUploadURL returns the upload info with the UploadURL and the AuthorizationToken
|
||||||
func (f *Fs) getUploadURL() (string, string, error) {
|
//
|
||||||
|
// This should be returned with returnUploadURL when finished
|
||||||
|
func (f *Fs) getUploadURL() (upload *api.GetUploadURLResponse, err error) {
|
||||||
f.uploadMu.Lock()
|
f.uploadMu.Lock()
|
||||||
defer f.uploadMu.Unlock()
|
defer f.uploadMu.Unlock()
|
||||||
bucketID, err := f.getBucketID()
|
bucketID, err := f.getBucketID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", err
|
return nil, err
|
||||||
}
|
}
|
||||||
if f.upload.UploadURL == "" || f.upload.AuthorizationToken == "" {
|
if len(f.uploads) == 0 {
|
||||||
opts := rest.Opts{
|
opts := rest.Opts{
|
||||||
Method: "POST",
|
Method: "POST",
|
||||||
Path: "/b2_get_upload_url",
|
Path: "/b2_get_upload_url",
|
||||||
|
@ -271,21 +273,30 @@ func (f *Fs) getUploadURL() (string, string, error) {
|
||||||
BucketID: bucketID,
|
BucketID: bucketID,
|
||||||
}
|
}
|
||||||
err := f.pacer.Call(func() (bool, error) {
|
err := f.pacer.Call(func() (bool, error) {
|
||||||
resp, err := f.srv.CallJSON(&opts, &request, &f.upload)
|
resp, err := f.srv.CallJSON(&opts, &request, &upload)
|
||||||
return f.shouldRetryNoReauth(resp, err)
|
return f.shouldRetryNoReauth(resp, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", fmt.Errorf("Failed to get upload URL: %v", err)
|
return nil, fmt.Errorf("Failed to get upload URL: %v", err)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
upload, f.uploads = f.uploads[0], f.uploads[1:]
|
||||||
}
|
}
|
||||||
return f.upload.UploadURL, f.upload.AuthorizationToken, nil
|
return upload, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// returnUploadURL returns the UploadURL to the cache
|
||||||
|
func (f *Fs) returnUploadURL(upload *api.GetUploadURLResponse) {
|
||||||
|
f.uploadMu.Lock()
|
||||||
|
f.uploads = append(f.uploads, upload)
|
||||||
|
f.uploadMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// clearUploadURL clears the current UploadURL and the AuthorizationToken
|
// clearUploadURL clears the current UploadURL and the AuthorizationToken
|
||||||
func (f *Fs) clearUploadURL() {
|
func (f *Fs) clearUploadURL() {
|
||||||
f.uploadMu.Lock()
|
f.uploadMu.Lock()
|
||||||
f.upload = api.GetUploadURLResponse{}
|
f.uploads = nil
|
||||||
defer f.uploadMu.Unlock()
|
f.uploadMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return an FsObject from a path
|
// Return an FsObject from a path
|
||||||
|
@ -1002,10 +1013,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get upload URL
|
// Get upload URL
|
||||||
UploadURL, AuthorizationToken, err := o.fs.getUploadURL()
|
upload, err := o.fs.getUploadURL()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer o.fs.returnUploadURL(upload)
|
||||||
|
|
||||||
// Headers for upload file
|
// Headers for upload file
|
||||||
//
|
//
|
||||||
|
@ -1063,10 +1075,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
|
||||||
opts := rest.Opts{
|
opts := rest.Opts{
|
||||||
Method: "POST",
|
Method: "POST",
|
||||||
Absolute: true,
|
Absolute: true,
|
||||||
Path: UploadURL,
|
Path: upload.UploadURL,
|
||||||
Body: in,
|
Body: in,
|
||||||
ExtraHeaders: map[string]string{
|
ExtraHeaders: map[string]string{
|
||||||
"Authorization": AuthorizationToken,
|
"Authorization": upload.AuthorizationToken,
|
||||||
"X-Bz-File-Name": urlEncode(o.fs.root + o.remote),
|
"X-Bz-File-Name": urlEncode(o.fs.root + o.remote),
|
||||||
"Content-Type": fs.MimeType(o),
|
"Content-Type": fs.MimeType(o),
|
||||||
sha1Header: calculatedSha1,
|
sha1Header: calculatedSha1,
|
||||||
|
|
Loading…
Reference in a new issue