2016-06-15 17:49:11 +00:00
// Upload large files for b2
//
// Docs - https://www.backblaze.com/b2/docs/large_files.html
package b2
import (
"bytes"
"crypto/sha1"
2017-08-12 10:57:34 +00:00
"encoding/hex"
2016-06-15 17:49:11 +00:00
"fmt"
2018-01-12 16:30:54 +00:00
gohash "hash"
2016-06-15 17:49:11 +00:00
"io"
2017-08-12 10:57:34 +00:00
"strings"
2016-06-15 17:49:11 +00:00
"sync"
2018-01-11 16:05:41 +00:00
"github.com/ncw/rclone/backend/b2/api"
2016-06-15 17:49:11 +00:00
"github.com/ncw/rclone/fs"
2018-01-12 16:30:54 +00:00
"github.com/ncw/rclone/fs/accounting"
"github.com/ncw/rclone/fs/hash"
2018-01-11 16:29:20 +00:00
"github.com/ncw/rclone/lib/rest"
2016-06-15 17:49:11 +00:00
"github.com/pkg/errors"
)
2017-08-12 10:57:34 +00:00
type hashAppendingReader struct {
2018-01-12 16:30:54 +00:00
h gohash . Hash
2017-08-12 10:57:34 +00:00
in io . Reader
hexSum string
hexReader io . Reader
}
// Read returns bytes all bytes from the original reader, then the hex sum
// of what was read so far, then EOF.
func ( har * hashAppendingReader ) Read ( b [ ] byte ) ( int , error ) {
if har . hexReader == nil {
n , err := har . in . Read ( b )
if err == io . EOF {
har . in = nil // allow GC
err = nil // allow reading hexSum before EOF
har . hexSum = hex . EncodeToString ( har . h . Sum ( nil ) )
har . hexReader = strings . NewReader ( har . hexSum )
}
return n , err
}
return har . hexReader . Read ( b )
}
// AdditionalLength returns how many bytes the appended hex sum will take up.
func ( har * hashAppendingReader ) AdditionalLength ( ) int {
return hex . EncodedLen ( har . h . Size ( ) )
}
// HexSum returns the hash sum as hex. It's only available after the original
// reader has EOF'd. It's an empty string before that.
func ( har * hashAppendingReader ) HexSum ( ) string {
return har . hexSum
}
// newHashAppendingReader takes a Reader and a Hash and will append the hex sum
// after the original reader reaches EOF. The increased size depends on the
// given hash, which may be queried through AdditionalLength()
2018-01-12 16:30:54 +00:00
func newHashAppendingReader ( in io . Reader , h gohash . Hash ) * hashAppendingReader {
2017-08-12 10:57:34 +00:00
withHash := io . TeeReader ( in , h )
return & hashAppendingReader { h : h , in : withHash }
}
2016-06-15 17:49:11 +00:00
// largeUpload is used to control the upload of large files which need chunking
type largeUpload struct {
f * Fs // parent Fs
o * Object // object being uploaded
in io . Reader // read the data from here
2018-02-01 15:41:58 +00:00
wrap accounting . WrapFn // account parts being transferred
2016-06-15 17:49:11 +00:00
id string // ID of the file being uploaded
size int64 // total size
2017-09-16 20:43:48 +00:00
parts int64 // calculated number of parts, if known
2016-06-15 17:49:11 +00:00
sha1s [ ] string // slice of SHA1s for each part
uploadMu sync . Mutex // lock for upload variable
uploads [ ] * api . GetUploadPartURLResponse // result of get upload URL calls
}
// newLargeUpload starts an upload of object o from in with metadata in src
func ( f * Fs ) newLargeUpload ( o * Object , in io . Reader , src fs . ObjectInfo ) ( up * largeUpload , err error ) {
2016-07-13 14:28:39 +00:00
remote := o . remote
2016-06-15 17:49:11 +00:00
size := src . Size ( )
2017-09-16 20:43:48 +00:00
parts := int64 ( 0 )
sha1SliceSize := int64 ( maxParts )
if size == - 1 {
fs . Debugf ( o , "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached." , fs . SizeSuffix ( chunkSize ) , fs . SizeSuffix ( maxParts * chunkSize ) )
} else {
parts = size / int64 ( chunkSize )
if size % int64 ( chunkSize ) != 0 {
parts ++
}
if parts > maxParts {
return nil , errors . Errorf ( "%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size" , remote , size , parts , maxParts )
}
sha1SliceSize = parts
2016-06-15 17:49:11 +00:00
}
2017-09-16 20:43:48 +00:00
2016-06-15 17:49:11 +00:00
modTime := src . ModTime ( )
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_start_large_file" ,
}
bucketID , err := f . getBucketID ( )
if err != nil {
return nil , err
}
var request = api . StartLargeFileRequest {
BucketID : bucketID ,
2016-07-13 14:28:39 +00:00
Name : o . fs . root + remote ,
2016-06-15 17:49:11 +00:00
ContentType : fs . MimeType ( src ) ,
Info : map [ string ] string {
timeKey : timeString ( modTime ) ,
} ,
}
// Set the SHA1 if known
2018-01-18 20:27:52 +00:00
if calculatedSha1 , err := src . Hash ( hash . SHA1 ) ; err == nil && calculatedSha1 != "" {
2016-06-15 17:49:11 +00:00
request . Info [ sha1Key ] = calculatedSha1
}
var response api . StartLargeFileResponse
err = f . pacer . Call ( func ( ) ( bool , error ) {
resp , err := f . srv . CallJSON ( & opts , & request , & response )
return f . shouldRetry ( resp , err )
} )
if err != nil {
return nil , err
}
2018-02-01 15:41:58 +00:00
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in , wrap := accounting . UnWrap ( in )
2016-06-15 17:49:11 +00:00
up = & largeUpload {
f : f ,
o : o ,
in : in ,
2018-02-01 15:41:58 +00:00
wrap : wrap ,
2016-06-15 17:49:11 +00:00
id : response . ID ,
size : size ,
parts : parts ,
2017-09-16 20:43:48 +00:00
sha1s : make ( [ ] string , sha1SliceSize ) ,
2016-06-15 17:49:11 +00:00
}
return up , nil
}
// getUploadURL returns the upload info with the UploadURL and the AuthorizationToken
//
// This should be returned with returnUploadURL when finished
func ( up * largeUpload ) getUploadURL ( ) ( upload * api . GetUploadPartURLResponse , err error ) {
up . uploadMu . Lock ( )
defer up . uploadMu . Unlock ( )
if len ( up . uploads ) == 0 {
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_get_upload_part_url" ,
}
var request = api . GetUploadPartURLRequest {
ID : up . id ,
}
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
resp , err := up . f . srv . CallJSON ( & opts , & request , & upload )
return up . f . shouldRetry ( resp , err )
} )
if err != nil {
return nil , errors . Wrap ( err , "failed to get upload URL" )
}
} else {
upload , up . uploads = up . uploads [ 0 ] , up . uploads [ 1 : ]
}
return upload , nil
}
// returnUploadURL returns the UploadURL to the cache
func ( up * largeUpload ) returnUploadURL ( upload * api . GetUploadPartURLResponse ) {
if upload == nil {
return
}
up . uploadMu . Lock ( )
up . uploads = append ( up . uploads , upload )
up . uploadMu . Unlock ( )
}
// clearUploadURL clears the current UploadURL and the AuthorizationToken
func ( up * largeUpload ) clearUploadURL ( ) {
up . uploadMu . Lock ( )
up . uploads = nil
up . uploadMu . Unlock ( )
}
// Transfer a chunk
func ( up * largeUpload ) transferChunk ( part int64 , body [ ] byte ) error {
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
2017-02-09 11:01:20 +00:00
fs . Debugf ( up . o , "Sending chunk %d length %d" , part , len ( body ) )
2016-06-15 17:49:11 +00:00
// Get upload URL
upload , err := up . getUploadURL ( )
if err != nil {
return false , err
}
2017-12-13 10:11:20 +00:00
in := newHashAppendingReader ( bytes . NewReader ( body ) , sha1 . New ( ) )
size := int64 ( len ( body ) ) + int64 ( in . AdditionalLength ( ) )
2016-06-15 17:49:11 +00:00
// Authorization
//
// An upload authorization token, from b2_get_upload_part_url.
//
// X-Bz-Part-Number
//
// A number from 1 to 10000. The parts uploaded for one file
// must have contiguous numbers, starting with 1.
//
// Content-Length
//
// The number of bytes in the file being uploaded. Note that
// this header is required; you cannot leave it out and just
// use chunked encoding. The minimum size of every part but
// the last one is 100MB.
//
// X-Bz-Content-Sha1
//
// The SHA1 checksum of the this part of the file. B2 will
// check this when the part is uploaded, to make sure that the
// data arrived correctly. The same SHA1 checksum must be
// passed to b2_finish_large_file.
opts := rest . Opts {
2017-07-07 07:18:13 +00:00
Method : "POST" ,
RootURL : upload . UploadURL ,
2018-02-01 15:41:58 +00:00
Body : up . wrap ( in ) ,
2016-06-15 17:49:11 +00:00
ExtraHeaders : map [ string ] string {
"Authorization" : upload . AuthorizationToken ,
"X-Bz-Part-Number" : fmt . Sprintf ( "%d" , part ) ,
2017-08-12 10:57:34 +00:00
sha1Header : "hex_digits_at_end" ,
2016-06-15 17:49:11 +00:00
} ,
ContentLength : & size ,
}
var response api . UploadPartResponse
resp , err := up . f . srv . CallJSON ( & opts , nil , & response )
2016-11-07 13:30:51 +00:00
retry , err := up . f . shouldRetry ( resp , err )
2016-07-01 15:23:23 +00:00
// On retryable error clear PartUploadURL
if retry {
2017-02-09 11:01:20 +00:00
fs . Debugf ( up . o , "Clearing part upload URL because of error: %v" , err )
2016-07-01 15:23:23 +00:00
upload = nil
2016-06-15 17:49:11 +00:00
}
up . returnUploadURL ( upload )
2017-12-13 10:11:20 +00:00
up . sha1s [ part - 1 ] = in . HexSum ( )
2016-07-01 15:23:23 +00:00
return retry , err
2016-06-15 17:49:11 +00:00
} )
2016-07-01 09:04:52 +00:00
if err != nil {
2017-02-09 11:01:20 +00:00
fs . Debugf ( up . o , "Error sending chunk %d: %v" , part , err )
2016-07-01 09:04:52 +00:00
} else {
2017-02-09 11:01:20 +00:00
fs . Debugf ( up . o , "Done sending chunk %d" , part )
2016-07-01 09:04:52 +00:00
}
2016-06-15 17:49:11 +00:00
return err
}
// finish closes off the large upload
func ( up * largeUpload ) finish ( ) error {
2017-09-16 20:43:48 +00:00
fs . Debugf ( up . o , "Finishing large file upload with %d parts" , up . parts )
2016-06-15 17:49:11 +00:00
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_finish_large_file" ,
}
var request = api . FinishLargeFileRequest {
ID : up . id ,
SHA1s : up . sha1s ,
}
var response api . FileInfo
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
resp , err := up . f . srv . CallJSON ( & opts , & request , & response )
return up . f . shouldRetry ( resp , err )
} )
if err != nil {
return err
}
return up . o . decodeMetaDataFileInfo ( & response )
}
2016-07-01 09:04:52 +00:00
// cancel aborts the large upload
func ( up * largeUpload ) cancel ( ) error {
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_cancel_large_file" ,
}
var request = api . CancelLargeFileRequest {
ID : up . id ,
}
var response api . CancelLargeFileResponse
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
resp , err := up . f . srv . CallJSON ( & opts , & request , & response )
return up . f . shouldRetry ( resp , err )
} )
return err
}
2017-09-16 20:43:48 +00:00
func ( up * largeUpload ) managedTransferChunk ( wg * sync . WaitGroup , errs chan error , part int64 , buf [ ] byte ) {
wg . Add ( 1 )
go func ( part int64 , buf [ ] byte ) {
defer wg . Done ( )
defer up . f . putUploadBlock ( buf )
err := up . transferChunk ( part , buf )
if err != nil {
select {
case errs <- err :
default :
}
}
} ( part , buf )
}
func ( up * largeUpload ) finishOrCancelOnError ( err error , errs chan error ) error {
if err == nil {
select {
case err = <- errs :
default :
}
}
if err != nil {
fs . Debugf ( up . o , "Cancelling large file upload due to error: %v" , err )
cancelErr := up . cancel ( )
if cancelErr != nil {
fs . Errorf ( up . o , "Failed to cancel large file upload: %v" , cancelErr )
}
return err
}
return up . finish ( )
}
// Stream uploads the chunks from the input, starting with a required initial
// chunk. Assumes the file size is unknown and will upload until the input
// reaches EOF.
func ( up * largeUpload ) Stream ( initialUploadBlock [ ] byte ) ( err error ) {
fs . Debugf ( up . o , "Starting streaming of large file (id %q)" , up . id )
errs := make ( chan error , 1 )
hasMoreParts := true
var wg sync . WaitGroup
// Transfer initial chunk
up . size = int64 ( len ( initialUploadBlock ) )
up . managedTransferChunk ( & wg , errs , 1 , initialUploadBlock )
outer :
for part := int64 ( 2 ) ; hasMoreParts ; part ++ {
// Check any errors
select {
case err = <- errs :
break outer
default :
}
// Get a block of memory
buf := up . f . getUploadBlock ( )
// Read the chunk
n , err := io . ReadFull ( up . in , buf )
if err == io . ErrUnexpectedEOF {
fs . Debugf ( up . o , "Read less than a full chunk, making this the last one." )
buf = buf [ : n ]
hasMoreParts = false
err = nil
} else if err == io . EOF {
fs . Debugf ( up . o , "Could not read any more bytes, previous chunk was the last." )
up . f . putUploadBlock ( buf )
hasMoreParts = false
err = nil
break outer
} else if err != nil {
// other kinds of errors indicate failure
up . f . putUploadBlock ( buf )
break outer
}
// Keep stats up to date
up . parts = part
up . size += int64 ( n )
if part > maxParts {
err = errors . Errorf ( "%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size" , up . o , up . size , up . parts , maxParts )
break outer
}
// Transfer the chunk
up . managedTransferChunk ( & wg , errs , part , buf )
}
wg . Wait ( )
up . sha1s = up . sha1s [ : up . parts ]
return up . finishOrCancelOnError ( err , errs )
}
2016-06-15 17:49:11 +00:00
// Upload uploads the chunks from the input
func ( up * largeUpload ) Upload ( ) error {
2017-02-09 11:01:20 +00:00
fs . Debugf ( up . o , "Starting upload of large file in %d chunks (id %q)" , up . parts , up . id )
2016-06-15 17:49:11 +00:00
remaining := up . size
2016-07-01 09:04:52 +00:00
errs := make ( chan error , 1 )
var wg sync . WaitGroup
var err error
outer :
2016-06-15 17:49:11 +00:00
for part := int64 ( 1 ) ; part <= up . parts ; part ++ {
2016-07-01 09:04:52 +00:00
// Check any errors
select {
case err = <- errs :
break outer
default :
}
2016-06-15 17:49:11 +00:00
reqSize := remaining
if reqSize >= int64 ( chunkSize ) {
reqSize = int64 ( chunkSize )
}
2017-01-29 22:21:39 +00:00
// Get a block of memory
buf := up . f . getUploadBlock ( ) [ : reqSize ]
2016-06-15 17:49:11 +00:00
// Read the chunk
2016-07-01 09:04:52 +00:00
_ , err = io . ReadFull ( up . in , buf )
2016-06-15 17:49:11 +00:00
if err != nil {
2017-01-29 22:21:39 +00:00
up . f . putUploadBlock ( buf )
2016-07-01 09:04:52 +00:00
break outer
2016-06-15 17:49:11 +00:00
}
// Transfer the chunk
2017-09-16 20:43:48 +00:00
up . managedTransferChunk ( & wg , errs , part , buf )
2016-06-15 17:49:11 +00:00
remaining -= reqSize
}
2016-07-01 09:04:52 +00:00
wg . Wait ( )
2017-09-16 20:43:48 +00:00
return up . finishOrCancelOnError ( err , errs )
2016-06-15 17:49:11 +00:00
}