// Upload large files for b2 // // Docs - https://www.backblaze.com/b2/docs/large_files.html package b2 import ( "bytes" "crypto/sha1" "encoding/hex" "fmt" "hash" "io" "strings" "sync" "github.com/ncw/rclone/b2/api" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/rest" "github.com/pkg/errors" ) type hashAppendingReader struct { h hash.Hash in io.Reader hexSum string hexReader io.Reader } // Read returns bytes all bytes from the original reader, then the hex sum // of what was read so far, then EOF. func (har *hashAppendingReader) Read(b []byte) (int, error) { if har.hexReader == nil { n, err := har.in.Read(b) if err == io.EOF { har.in = nil // allow GC err = nil // allow reading hexSum before EOF har.hexSum = hex.EncodeToString(har.h.Sum(nil)) har.hexReader = strings.NewReader(har.hexSum) } return n, err } return har.hexReader.Read(b) } // AdditionalLength returns how many bytes the appended hex sum will take up. func (har *hashAppendingReader) AdditionalLength() int { return hex.EncodedLen(har.h.Size()) } // HexSum returns the hash sum as hex. It's only available after the original // reader has EOF'd. It's an empty string before that. func (har *hashAppendingReader) HexSum() string { return har.hexSum } // newHashAppendingReader takes a Reader and a Hash and will append the hex sum // after the original reader reaches EOF. The increased size depends on the // given hash, which may be queried through AdditionalLength() func newHashAppendingReader(in io.Reader, h hash.Hash) *hashAppendingReader { withHash := io.TeeReader(in, h) return &hashAppendingReader{h: h, in: withHash} } // largeUpload is used to control the upload of large files which need chunking type largeUpload struct { f *Fs // parent Fs o *Object // object being uploaded in io.Reader // read the data from here id string // ID of the file being uploaded size int64 // total size parts int64 // calculated number of parts sha1s []string // slice of SHA1s for each part uploadMu sync.Mutex // lock for upload variable uploads []*api.GetUploadPartURLResponse // result of get upload URL calls } // newLargeUpload starts an upload of object o from in with metadata in src func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) { remote := o.remote size := src.Size() parts := size / int64(chunkSize) if size%int64(chunkSize) != 0 { parts++ } if parts > maxParts { return nil, errors.Errorf("%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size", remote, size, parts, maxParts) } modTime := src.ModTime() opts := rest.Opts{ Method: "POST", Path: "/b2_start_large_file", } bucketID, err := f.getBucketID() if err != nil { return nil, err } var request = api.StartLargeFileRequest{ BucketID: bucketID, Name: o.fs.root + remote, ContentType: fs.MimeType(src), Info: map[string]string{ timeKey: timeString(modTime), }, } // Set the SHA1 if known if calculatedSha1, err := src.Hash(fs.HashSHA1); err == nil && calculatedSha1 != "" { request.Info[sha1Key] = calculatedSha1 } var response api.StartLargeFileResponse err = f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &request, &response) return f.shouldRetry(resp, err) }) if err != nil { return nil, err } up = &largeUpload{ f: f, o: o, in: in, id: response.ID, size: size, parts: parts, sha1s: make([]string, parts), } return up, nil } // getUploadURL returns the upload info with the UploadURL and the AuthorizationToken // // This should be returned with returnUploadURL when finished func (up *largeUpload) getUploadURL() (upload *api.GetUploadPartURLResponse, err error) { up.uploadMu.Lock() defer up.uploadMu.Unlock() if len(up.uploads) == 0 { opts := rest.Opts{ Method: "POST", Path: "/b2_get_upload_part_url", } var request = api.GetUploadPartURLRequest{ ID: up.id, } err := up.f.pacer.Call(func() (bool, error) { resp, err := up.f.srv.CallJSON(&opts, &request, &upload) return up.f.shouldRetry(resp, err) }) if err != nil { return nil, errors.Wrap(err, "failed to get upload URL") } } else { upload, up.uploads = up.uploads[0], up.uploads[1:] } return upload, nil } // returnUploadURL returns the UploadURL to the cache func (up *largeUpload) returnUploadURL(upload *api.GetUploadPartURLResponse) { if upload == nil { return } up.uploadMu.Lock() up.uploads = append(up.uploads, upload) up.uploadMu.Unlock() } // clearUploadURL clears the current UploadURL and the AuthorizationToken func (up *largeUpload) clearUploadURL() { up.uploadMu.Lock() up.uploads = nil up.uploadMu.Unlock() } // Transfer a chunk func (up *largeUpload) transferChunk(part int64, body []byte) error { in := newHashAppendingReader(bytes.NewReader(body), sha1.New()) size := int64(len(body)) + int64(in.AdditionalLength()) err := up.f.pacer.Call(func() (bool, error) { fs.Debugf(up.o, "Sending chunk %d length %d", part, len(body)) // Get upload URL upload, err := up.getUploadURL() if err != nil { return false, err } // Authorization // // An upload authorization token, from b2_get_upload_part_url. // // X-Bz-Part-Number // // A number from 1 to 10000. The parts uploaded for one file // must have contiguous numbers, starting with 1. // // Content-Length // // The number of bytes in the file being uploaded. Note that // this header is required; you cannot leave it out and just // use chunked encoding. The minimum size of every part but // the last one is 100MB. // // X-Bz-Content-Sha1 // // The SHA1 checksum of the this part of the file. B2 will // check this when the part is uploaded, to make sure that the // data arrived correctly. The same SHA1 checksum must be // passed to b2_finish_large_file. opts := rest.Opts{ Method: "POST", RootURL: upload.UploadURL, Body: fs.AccountPart(up.o, in), ExtraHeaders: map[string]string{ "Authorization": upload.AuthorizationToken, "X-Bz-Part-Number": fmt.Sprintf("%d", part), sha1Header: "hex_digits_at_end", }, ContentLength: &size, } var response api.UploadPartResponse resp, err := up.f.srv.CallJSON(&opts, nil, &response) retry, err := up.f.shouldRetry(resp, err) // On retryable error clear PartUploadURL if retry { fs.Debugf(up.o, "Clearing part upload URL because of error: %v", err) upload = nil } up.returnUploadURL(upload) return retry, err }) if err != nil { fs.Debugf(up.o, "Error sending chunk %d: %v", part, err) } else { fs.Debugf(up.o, "Done sending chunk %d", part) } up.sha1s[part-1] = in.HexSum() return err } // finish closes off the large upload func (up *largeUpload) finish() error { opts := rest.Opts{ Method: "POST", Path: "/b2_finish_large_file", } var request = api.FinishLargeFileRequest{ ID: up.id, SHA1s: up.sha1s, } var response api.FileInfo err := up.f.pacer.Call(func() (bool, error) { resp, err := up.f.srv.CallJSON(&opts, &request, &response) return up.f.shouldRetry(resp, err) }) if err != nil { return err } return up.o.decodeMetaDataFileInfo(&response) } // cancel aborts the large upload func (up *largeUpload) cancel() error { opts := rest.Opts{ Method: "POST", Path: "/b2_cancel_large_file", } var request = api.CancelLargeFileRequest{ ID: up.id, } var response api.CancelLargeFileResponse err := up.f.pacer.Call(func() (bool, error) { resp, err := up.f.srv.CallJSON(&opts, &request, &response) return up.f.shouldRetry(resp, err) }) return err } // Upload uploads the chunks from the input func (up *largeUpload) Upload() error { fs.Debugf(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id) remaining := up.size errs := make(chan error, 1) var wg sync.WaitGroup var err error fs.AccountByPart(up.o) // Cancel whole file accounting before reading outer: for part := int64(1); part <= up.parts; part++ { // Check any errors select { case err = <-errs: break outer default: } reqSize := remaining if reqSize >= int64(chunkSize) { reqSize = int64(chunkSize) } // Get a block of memory buf := up.f.getUploadBlock()[:reqSize] // Read the chunk _, err = io.ReadFull(up.in, buf) if err != nil { up.f.putUploadBlock(buf) break outer } // Transfer the chunk wg.Add(1) go func(part int64, buf []byte) { defer wg.Done() defer up.f.putUploadBlock(buf) err := up.transferChunk(part, buf) if err != nil { select { case errs <- err: default: } } }(part, buf) remaining -= reqSize } wg.Wait() if err == nil { select { case err = <-errs: default: } } if err != nil { fs.Debugf(up.o, "Cancelling large file upload due to error: %v", err) cancelErr := up.cancel() if cancelErr != nil { fs.Errorf(up.o, "Failed to cancel large file upload: %v", cancelErr) } return err } // Check any errors fs.Debugf(up.o, "Finishing large file upload") return up.finish() }