// Upload large files for b2 // // Docs - https://www.backblaze.com/b2/docs/large_files.html package b2 import ( "bytes" "crypto/sha1" "fmt" "io" "sync" "github.com/ncw/rclone/b2/api" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/rest" "github.com/pkg/errors" ) // largeUpload is used to control the upload of large files which need chunking type largeUpload struct { f *Fs // parent Fs o *Object // object being uploaded in io.Reader // read the data from here id string // ID of the file being uploaded size int64 // total size parts int64 // calculated number of parts sha1s []string // slice of SHA1s for each part uploadMu sync.Mutex // lock for upload variable uploads []*api.GetUploadPartURLResponse // result of get upload URL calls } // newLargeUpload starts an upload of object o from in with metadata in src func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) { remote := src.Remote() size := src.Size() parts := size / int64(chunkSize) if size%int64(chunkSize) != 0 { parts++ } if parts > maxParts { return nil, errors.Errorf("%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size", remote, size, parts, maxParts) } modTime := src.ModTime() opts := rest.Opts{ Method: "POST", Path: "/b2_start_large_file", } bucketID, err := f.getBucketID() if err != nil { return nil, err } var request = api.StartLargeFileRequest{ BucketID: bucketID, Name: remote, ContentType: fs.MimeType(src), Info: map[string]string{ timeKey: timeString(modTime), }, } // Set the SHA1 if known if calculatedSha1, err := src.Hash(fs.HashSHA1); err == nil && calculatedSha1 != "" { request.Info[sha1Key] = calculatedSha1 } var response api.StartLargeFileResponse err = f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &request, &response) return f.shouldRetry(resp, err) }) if err != nil { return nil, err } up = &largeUpload{ f: f, o: o, in: in, id: response.ID, size: size, parts: parts, sha1s: make([]string, parts), } return up, nil } // getUploadURL returns the upload info with the UploadURL and the AuthorizationToken // // This should be returned with returnUploadURL when finished func (up *largeUpload) getUploadURL() (upload *api.GetUploadPartURLResponse, err error) { up.uploadMu.Lock() defer up.uploadMu.Unlock() if len(up.uploads) == 0 { opts := rest.Opts{ Method: "POST", Path: "/b2_get_upload_part_url", } var request = api.GetUploadPartURLRequest{ ID: up.id, } err := up.f.pacer.Call(func() (bool, error) { resp, err := up.f.srv.CallJSON(&opts, &request, &upload) return up.f.shouldRetry(resp, err) }) if err != nil { return nil, errors.Wrap(err, "failed to get upload URL") } } else { upload, up.uploads = up.uploads[0], up.uploads[1:] } return upload, nil } // returnUploadURL returns the UploadURL to the cache func (up *largeUpload) returnUploadURL(upload *api.GetUploadPartURLResponse) { if upload == nil { return } up.uploadMu.Lock() up.uploads = append(up.uploads, upload) up.uploadMu.Unlock() } // clearUploadURL clears the current UploadURL and the AuthorizationToken func (up *largeUpload) clearUploadURL() { up.uploadMu.Lock() up.uploads = nil up.uploadMu.Unlock() } // Transfer a chunk func (up *largeUpload) transferChunk(part int64, body []byte) error { calculatedSHA1 := fmt.Sprintf("%x", sha1.Sum(body)) up.sha1s[part-1] = calculatedSHA1 size := int64(len(body)) err := up.f.pacer.Call(func() (bool, error) { fs.Debug(up.o, "Sending chunk %d length %d", part, len(body)) // Get upload URL upload, err := up.getUploadURL() if err != nil { return false, err } // Authorization // // An upload authorization token, from b2_get_upload_part_url. // // X-Bz-Part-Number // // A number from 1 to 10000. The parts uploaded for one file // must have contiguous numbers, starting with 1. // // Content-Length // // The number of bytes in the file being uploaded. Note that // this header is required; you cannot leave it out and just // use chunked encoding. The minimum size of every part but // the last one is 100MB. // // X-Bz-Content-Sha1 // // The SHA1 checksum of the this part of the file. B2 will // check this when the part is uploaded, to make sure that the // data arrived correctly. The same SHA1 checksum must be // passed to b2_finish_large_file. opts := rest.Opts{ Method: "POST", Absolute: true, Path: upload.UploadURL, Body: bytes.NewBuffer(body), ExtraHeaders: map[string]string{ "Authorization": upload.AuthorizationToken, "X-Bz-Part-Number": fmt.Sprintf("%d", part), sha1Header: calculatedSHA1, }, ContentLength: &size, } var response api.UploadPartResponse resp, err := up.f.srv.CallJSON(&opts, nil, &response) if err == nil && resp != nil && resp.StatusCode == 401 { err = errorUploadPartTokenExpired fs.Debug(up.o, "%v", err) // Refetch upload part URLs and ditch this current one up.clearUploadURL() return true, err } up.returnUploadURL(upload) return up.f.shouldRetryNoReauth(resp, err) }) if err != nil { fs.Debug(up.o, "Error sending chunk %d: %v", part, err) } else { fs.Debug(up.o, "Done sending chunk %d", part) } return err } // finish closes off the large upload func (up *largeUpload) finish() error { opts := rest.Opts{ Method: "POST", Path: "/b2_finish_large_file", } var request = api.FinishLargeFileRequest{ ID: up.id, SHA1s: up.sha1s, } var response api.FileInfo err := up.f.pacer.Call(func() (bool, error) { resp, err := up.f.srv.CallJSON(&opts, &request, &response) return up.f.shouldRetry(resp, err) }) if err != nil { return err } return up.o.decodeMetaDataFileInfo(&response) } // cancel aborts the large upload func (up *largeUpload) cancel() error { opts := rest.Opts{ Method: "POST", Path: "/b2_cancel_large_file", } var request = api.CancelLargeFileRequest{ ID: up.id, } var response api.CancelLargeFileResponse err := up.f.pacer.Call(func() (bool, error) { resp, err := up.f.srv.CallJSON(&opts, &request, &response) return up.f.shouldRetry(resp, err) }) return err } // Upload uploads the chunks from the input func (up *largeUpload) Upload() error { fs.Debug(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id) remaining := up.size errs := make(chan error, 1) var wg sync.WaitGroup var err error outer: for part := int64(1); part <= up.parts; part++ { // Check any errors select { case err = <-errs: break outer default: } reqSize := remaining if reqSize >= int64(chunkSize) { reqSize = int64(chunkSize) } // Read the chunk buf := make([]byte, reqSize) _, err = io.ReadFull(up.in, buf) if err != nil { break outer } // Transfer the chunk // Get upload Token up.f.getUploadToken() wg.Add(1) go func(part int64, buf []byte) { defer up.f.returnUploadToken() defer wg.Done() err := up.transferChunk(part, buf) if err != nil { select { case errs <- err: default: } } }(part, buf) remaining -= reqSize } wg.Wait() if err == nil { select { case err = <-errs: default: } } if err != nil { fs.Debug(up.o, "Cancelling large file upload due to error: %v", err) cancelErr := up.cancel() if cancelErr != nil { fs.ErrorLog(up.o, "Failed to cancel large file upload: %v", cancelErr) } return err } // Check any errors fs.Debug(up.o, "Finishing large file upload") return up.finish() }