2016-06-15 17:49:11 +00:00
// Upload large files for b2
//
// Docs - https://www.backblaze.com/b2/docs/large_files.html
package b2
import (
2019-06-17 08:34:30 +00:00
"context"
2016-06-15 17:49:11 +00:00
"crypto/sha1"
2017-08-12 10:57:34 +00:00
"encoding/hex"
2016-06-15 17:49:11 +00:00
"fmt"
2018-01-12 16:30:54 +00:00
gohash "hash"
2016-06-15 17:49:11 +00:00
"io"
2017-08-12 10:57:34 +00:00
"strings"
2016-06-15 17:49:11 +00:00
"sync"
2019-07-28 17:47:38 +00:00
"github.com/rclone/rclone/backend/b2/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
2022-05-06 19:25:30 +00:00
"github.com/rclone/rclone/fs/chunksize"
2019-07-28 17:47:38 +00:00
"github.com/rclone/rclone/fs/hash"
2020-06-25 13:14:44 +00:00
"github.com/rclone/rclone/lib/atexit"
2023-01-22 12:46:23 +00:00
"github.com/rclone/rclone/lib/pool"
2019-07-28 17:47:38 +00:00
"github.com/rclone/rclone/lib/rest"
2020-05-29 11:36:31 +00:00
"golang.org/x/sync/errgroup"
2016-06-15 17:49:11 +00:00
)
2017-08-12 10:57:34 +00:00
type hashAppendingReader struct {
2018-01-12 16:30:54 +00:00
h gohash . Hash
2017-08-12 10:57:34 +00:00
in io . Reader
hexSum string
hexReader io . Reader
}
// Read returns bytes all bytes from the original reader, then the hex sum
// of what was read so far, then EOF.
func ( har * hashAppendingReader ) Read ( b [ ] byte ) ( int , error ) {
if har . hexReader == nil {
n , err := har . in . Read ( b )
if err == io . EOF {
har . in = nil // allow GC
err = nil // allow reading hexSum before EOF
har . hexSum = hex . EncodeToString ( har . h . Sum ( nil ) )
har . hexReader = strings . NewReader ( har . hexSum )
}
return n , err
}
return har . hexReader . Read ( b )
}
// AdditionalLength returns how many bytes the appended hex sum will take up.
func ( har * hashAppendingReader ) AdditionalLength ( ) int {
return hex . EncodedLen ( har . h . Size ( ) )
}
// HexSum returns the hash sum as hex. It's only available after the original
// reader has EOF'd. It's an empty string before that.
func ( har * hashAppendingReader ) HexSum ( ) string {
return har . hexSum
}
// newHashAppendingReader takes a Reader and a Hash and will append the hex sum
// after the original reader reaches EOF. The increased size depends on the
// given hash, which may be queried through AdditionalLength()
2018-01-12 16:30:54 +00:00
func newHashAppendingReader ( in io . Reader , h gohash . Hash ) * hashAppendingReader {
2017-08-12 10:57:34 +00:00
withHash := io . TeeReader ( in , h )
return & hashAppendingReader { h : h , in : withHash }
}
2016-06-15 17:49:11 +00:00
// largeUpload is used to control the upload of large files which need chunking
type largeUpload struct {
2020-05-29 11:36:31 +00:00
f * Fs // parent Fs
o * Object // object being uploaded
doCopy bool // doing copy rather than upload
what string // text name of operation for logs
in io . Reader // read the data from here
wrap accounting . WrapFn // account parts being transferred
id string // ID of the file being uploaded
size int64 // total size
2023-08-19 14:32:45 +00:00
parts int // calculated number of parts, if known
sha1smu sync . Mutex // mutex to protect sha1s
2020-05-29 11:36:31 +00:00
sha1s [ ] string // slice of SHA1s for each part
uploadMu sync . Mutex // lock for upload variable
uploads [ ] * api . GetUploadPartURLResponse // result of get upload URL calls
chunkSize int64 // chunk size to use
src * Object // if copying, object we are reading from
2023-09-18 19:41:31 +00:00
info * api . FileInfo // final response with info about the object
2016-06-15 17:49:11 +00:00
}
// newLargeUpload starts an upload of object o from in with metadata in src
2020-05-29 11:36:31 +00:00
//
// If newInfo is set then metadata from that will be used instead of reading it from src
2022-05-06 19:25:30 +00:00
func ( f * Fs ) newLargeUpload ( ctx context . Context , o * Object , in io . Reader , src fs . ObjectInfo , defaultChunkSize fs . SizeSuffix , doCopy bool , newInfo * api . File ) ( up * largeUpload , err error ) {
2016-06-15 17:49:11 +00:00
size := src . Size ( )
2023-08-19 14:32:45 +00:00
parts := 0
2022-05-06 19:25:30 +00:00
chunkSize := defaultChunkSize
2017-09-16 20:43:48 +00:00
if size == - 1 {
2018-05-14 17:06:57 +00:00
fs . Debugf ( o , "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached." , f . opt . ChunkSize , maxParts * f . opt . ChunkSize )
2017-09-16 20:43:48 +00:00
} else {
2022-08-09 09:44:54 +00:00
chunkSize = chunksize . Calculator ( o , size , maxParts , defaultChunkSize )
2023-08-19 14:32:45 +00:00
parts = int ( size / int64 ( chunkSize ) )
2020-05-29 11:36:31 +00:00
if size % int64 ( chunkSize ) != 0 {
2017-09-16 20:43:48 +00:00
parts ++
}
2016-06-15 17:49:11 +00:00
}
2017-09-16 20:43:48 +00:00
2016-06-15 17:49:11 +00:00
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_start_large_file" ,
}
2019-08-09 14:19:02 +00:00
bucket , bucketPath := o . split ( )
2019-09-04 19:00:37 +00:00
bucketID , err := f . getBucketID ( ctx , bucket )
2016-06-15 17:49:11 +00:00
if err != nil {
return nil , err
}
var request = api . StartLargeFileRequest {
2020-05-29 11:36:31 +00:00
BucketID : bucketID ,
Name : f . opt . Enc . FromStandardPath ( bucketPath ) ,
2016-06-15 17:49:11 +00:00
}
2020-05-29 11:36:31 +00:00
if newInfo == nil {
modTime := src . ModTime ( ctx )
request . ContentType = fs . MimeType ( ctx , src )
request . Info = map [ string ] string {
timeKey : timeString ( modTime ) ,
}
// Set the SHA1 if known
if ! o . fs . opt . DisableCheckSum || doCopy {
if calculatedSha1 , err := src . Hash ( ctx , hash . SHA1 ) ; err == nil && calculatedSha1 != "" {
request . Info [ sha1Key ] = calculatedSha1
}
2019-01-20 15:33:42 +00:00
}
2020-05-29 11:36:31 +00:00
} else {
request . ContentType = newInfo . ContentType
request . Info = newInfo . Info
2016-06-15 17:49:11 +00:00
}
var response api . StartLargeFileResponse
err = f . pacer . Call ( func ( ) ( bool , error ) {
2019-09-04 19:00:37 +00:00
resp , err := f . srv . CallJSON ( ctx , & opts , & request , & response )
return f . shouldRetry ( ctx , resp , err )
2016-06-15 17:49:11 +00:00
} )
if err != nil {
return nil , err
}
2020-05-29 11:36:31 +00:00
up = & largeUpload {
f : f ,
o : o ,
doCopy : doCopy ,
what : "upload" ,
id : response . ID ,
size : size ,
parts : parts ,
2023-08-19 14:32:45 +00:00
sha1s : make ( [ ] string , 0 , 16 ) ,
2020-05-29 11:36:31 +00:00
chunkSize : int64 ( chunkSize ) ,
}
2018-02-01 15:41:58 +00:00
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
2020-05-29 11:36:31 +00:00
if doCopy {
up . what = "copy"
up . src = src . ( * Object )
} else {
up . in , up . wrap = accounting . UnWrap ( in )
2016-06-15 17:49:11 +00:00
}
return up , nil
}
// getUploadURL returns the upload info with the UploadURL and the AuthorizationToken
//
// This should be returned with returnUploadURL when finished
2019-09-04 19:00:37 +00:00
func ( up * largeUpload ) getUploadURL ( ctx context . Context ) ( upload * api . GetUploadPartURLResponse , err error ) {
2016-06-15 17:49:11 +00:00
up . uploadMu . Lock ( )
2023-09-25 10:13:40 +00:00
if len ( up . uploads ) > 0 {
2016-06-15 17:49:11 +00:00
upload , up . uploads = up . uploads [ 0 ] , up . uploads [ 1 : ]
2023-09-25 10:13:40 +00:00
up . uploadMu . Unlock ( )
return upload , nil
}
up . uploadMu . Unlock ( )
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_get_upload_part_url" ,
}
var request = api . GetUploadPartURLRequest {
ID : up . id ,
}
err = up . f . pacer . Call ( func ( ) ( bool , error ) {
resp , err := up . f . srv . CallJSON ( ctx , & opts , & request , & upload )
return up . f . shouldRetry ( ctx , resp , err )
} )
if err != nil {
return nil , fmt . Errorf ( "failed to get upload URL: %w" , err )
2016-06-15 17:49:11 +00:00
}
return upload , nil
}
// returnUploadURL returns the UploadURL to the cache
func ( up * largeUpload ) returnUploadURL ( upload * api . GetUploadPartURLResponse ) {
if upload == nil {
return
}
up . uploadMu . Lock ( )
up . uploads = append ( up . uploads , upload )
up . uploadMu . Unlock ( )
}
2023-08-19 14:32:45 +00:00
// Add an sha1 to the being built up sha1s
func ( up * largeUpload ) addSha1 ( chunkNumber int , sha1 string ) {
up . sha1smu . Lock ( )
defer up . sha1smu . Unlock ( )
if len ( up . sha1s ) < chunkNumber + 1 {
up . sha1s = append ( up . sha1s , make ( [ ] string , chunkNumber + 1 - len ( up . sha1s ) ) ... )
}
up . sha1s [ chunkNumber ] = sha1
}
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
func ( up * largeUpload ) WriteChunk ( ctx context . Context , chunkNumber int , reader io . ReadSeeker ) ( size int64 , err error ) {
2023-08-24 16:14:50 +00:00
// Only account after the checksum reads have been done
if do , ok := reader . ( pool . DelayAccountinger ) ; ok {
// To figure out this number, do a transfer and if the accounted size is 0 or a
// multiple of what it should be, increase or decrease this number.
do . DelayAccounting ( 1 )
}
2023-08-19 14:32:45 +00:00
err = up . f . pacer . Call ( func ( ) ( bool , error ) {
// Discover the size by seeking to the end
size , err = reader . Seek ( 0 , io . SeekEnd )
if err != nil {
return false , err
}
// rewind the reader on retry and after reading size
_ , err = reader . Seek ( 0 , io . SeekStart )
if err != nil {
return false , err
}
fs . Debugf ( up . o , "Sending chunk %d length %d" , chunkNumber , size )
2016-06-15 17:49:11 +00:00
// Get upload URL
2019-09-04 19:00:37 +00:00
upload , err := up . getUploadURL ( ctx )
2016-06-15 17:49:11 +00:00
if err != nil {
return false , err
}
2023-08-19 14:32:45 +00:00
in := newHashAppendingReader ( reader , sha1 . New ( ) )
2023-09-03 12:53:11 +00:00
sizeWithHash := size + int64 ( in . AdditionalLength ( ) )
2017-12-13 10:11:20 +00:00
2016-06-15 17:49:11 +00:00
// Authorization
//
// An upload authorization token, from b2_get_upload_part_url.
//
// X-Bz-Part-Number
//
// A number from 1 to 10000. The parts uploaded for one file
// must have contiguous numbers, starting with 1.
//
// Content-Length
//
// The number of bytes in the file being uploaded. Note that
// this header is required; you cannot leave it out and just
2021-03-02 19:11:57 +00:00
// use chunked encoding. The minimum size of every part but
// the last one is 100 MB (100,000,000 bytes)
2016-06-15 17:49:11 +00:00
//
// X-Bz-Content-Sha1
//
// The SHA1 checksum of the this part of the file. B2 will
// check this when the part is uploaded, to make sure that the
2021-03-02 19:11:57 +00:00
// data arrived correctly. The same SHA1 checksum must be
2016-06-15 17:49:11 +00:00
// passed to b2_finish_large_file.
opts := rest . Opts {
2017-07-07 07:18:13 +00:00
Method : "POST" ,
RootURL : upload . UploadURL ,
2018-02-01 15:41:58 +00:00
Body : up . wrap ( in ) ,
2016-06-15 17:49:11 +00:00
ExtraHeaders : map [ string ] string {
"Authorization" : upload . AuthorizationToken ,
2023-08-19 14:32:45 +00:00
"X-Bz-Part-Number" : fmt . Sprintf ( "%d" , chunkNumber + 1 ) ,
2017-08-12 10:57:34 +00:00
sha1Header : "hex_digits_at_end" ,
2016-06-15 17:49:11 +00:00
} ,
2023-09-03 12:53:11 +00:00
ContentLength : & sizeWithHash ,
2016-06-15 17:49:11 +00:00
}
var response api . UploadPartResponse
2019-09-04 19:00:37 +00:00
resp , err := up . f . srv . CallJSON ( ctx , & opts , nil , & response )
retry , err := up . f . shouldRetry ( ctx , resp , err )
2018-02-04 11:25:44 +00:00
if err != nil {
2023-08-19 14:32:45 +00:00
fs . Debugf ( up . o , "Error sending chunk %d (retry=%v): %v: %#v" , chunkNumber , retry , err , err )
2018-02-04 11:25:44 +00:00
}
2016-07-01 15:23:23 +00:00
// On retryable error clear PartUploadURL
if retry {
2017-02-09 11:01:20 +00:00
fs . Debugf ( up . o , "Clearing part upload URL because of error: %v" , err )
2016-07-01 15:23:23 +00:00
upload = nil
2016-06-15 17:49:11 +00:00
}
up . returnUploadURL ( upload )
2023-08-19 14:32:45 +00:00
up . addSha1 ( chunkNumber , in . HexSum ( ) )
2016-07-01 15:23:23 +00:00
return retry , err
2016-06-15 17:49:11 +00:00
} )
2016-07-01 09:04:52 +00:00
if err != nil {
2023-08-19 14:32:45 +00:00
fs . Debugf ( up . o , "Error sending chunk %d: %v" , chunkNumber , err )
2016-07-01 09:04:52 +00:00
} else {
2023-08-19 14:32:45 +00:00
fs . Debugf ( up . o , "Done sending chunk %d" , chunkNumber )
2016-07-01 09:04:52 +00:00
}
2023-08-19 14:32:45 +00:00
return size , err
2016-06-15 17:49:11 +00:00
}
2020-05-29 11:36:31 +00:00
// Copy a chunk
2023-08-19 14:32:45 +00:00
func ( up * largeUpload ) copyChunk ( ctx context . Context , part int , partSize int64 ) error {
2020-05-29 11:36:31 +00:00
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
fs . Debugf ( up . o , "Copying chunk %d length %d" , part , partSize )
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_copy_part" ,
}
2023-08-19 14:32:45 +00:00
offset := int64 ( part ) * up . chunkSize // where we are in the source file
2020-05-29 11:36:31 +00:00
var request = api . CopyPartRequest {
SourceID : up . src . id ,
LargeFileID : up . id ,
2023-08-19 14:32:45 +00:00
PartNumber : int64 ( part + 1 ) ,
2020-05-29 11:36:31 +00:00
Range : fmt . Sprintf ( "bytes=%d-%d" , offset , offset + partSize - 1 ) ,
}
var response api . UploadPartResponse
resp , err := up . f . srv . CallJSON ( ctx , & opts , & request , & response )
retry , err := up . f . shouldRetry ( ctx , resp , err )
if err != nil {
fs . Debugf ( up . o , "Error copying chunk %d (retry=%v): %v: %#v" , part , retry , err , err )
}
2023-08-19 14:32:45 +00:00
up . addSha1 ( part , response . SHA1 )
2020-05-29 11:36:31 +00:00
return retry , err
} )
if err != nil {
fs . Debugf ( up . o , "Error copying chunk %d: %v" , part , err )
} else {
fs . Debugf ( up . o , "Done copying chunk %d" , part )
}
return err
}
2023-08-19 14:32:45 +00:00
// Close closes off the large upload
func ( up * largeUpload ) Close ( ctx context . Context ) error {
2020-05-29 11:36:31 +00:00
fs . Debugf ( up . o , "Finishing large file %s with %d parts" , up . what , up . parts )
2016-06-15 17:49:11 +00:00
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_finish_large_file" ,
}
var request = api . FinishLargeFileRequest {
ID : up . id ,
SHA1s : up . sha1s ,
}
var response api . FileInfo
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
2019-09-04 19:00:37 +00:00
resp , err := up . f . srv . CallJSON ( ctx , & opts , & request , & response )
return up . f . shouldRetry ( ctx , resp , err )
2016-06-15 17:49:11 +00:00
} )
if err != nil {
return err
}
2023-09-18 19:41:31 +00:00
up . info = & response
return nil
2016-06-15 17:49:11 +00:00
}
2023-08-19 14:32:45 +00:00
// Abort aborts the large upload
func ( up * largeUpload ) Abort ( ctx context . Context ) error {
2020-06-25 13:14:44 +00:00
fs . Debugf ( up . o , "Cancelling large file %s" , up . what )
2016-07-01 09:04:52 +00:00
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_cancel_large_file" ,
}
var request = api . CancelLargeFileRequest {
ID : up . id ,
}
var response api . CancelLargeFileResponse
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
2019-09-04 19:00:37 +00:00
resp , err := up . f . srv . CallJSON ( ctx , & opts , & request , & response )
return up . f . shouldRetry ( ctx , resp , err )
2016-07-01 09:04:52 +00:00
} )
2020-06-25 13:14:44 +00:00
if err != nil {
fs . Errorf ( up . o , "Failed to cancel large file %s: %v" , up . what , err )
2017-09-16 20:43:48 +00:00
}
2020-06-25 13:14:44 +00:00
return err
2017-09-16 20:43:48 +00:00
}
// Stream uploads the chunks from the input, starting with a required initial
// chunk. Assumes the file size is unknown and will upload until the input
// reaches EOF.
2020-05-29 11:36:31 +00:00
//
// Note that initialUploadBlock must be returned to f.putBuf()
2023-08-19 14:32:45 +00:00
func ( up * largeUpload ) Stream ( ctx context . Context , initialUploadBlock * pool . RW ) ( err error ) {
defer atexit . OnError ( & err , func ( ) { _ = up . Abort ( ctx ) } ) ( )
2017-09-16 20:43:48 +00:00
fs . Debugf ( up . o , "Starting streaming of large file (id %q)" , up . id )
2020-05-29 11:36:31 +00:00
var (
g , gCtx = errgroup . WithContext ( ctx )
hasMoreParts = true
)
2023-08-19 14:32:45 +00:00
up . size = initialUploadBlock . Size ( )
2023-10-13 14:46:36 +00:00
up . parts = 0
2023-08-19 14:32:45 +00:00
for part := 0 ; hasMoreParts ; part ++ {
// Get a block of memory from the pool and token which limits concurrency.
var rw * pool . RW
2023-10-13 14:46:36 +00:00
if part == 0 {
2023-08-19 14:32:45 +00:00
rw = initialUploadBlock
} else {
rw = up . f . getRW ( false )
}
2017-09-16 20:43:48 +00:00
2023-08-19 14:32:45 +00:00
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx . Err ( ) != nil {
up . f . putRW ( rw )
break
}
2017-09-16 20:43:48 +00:00
2023-08-19 14:32:45 +00:00
// Read the chunk
var n int64
2023-10-13 14:46:36 +00:00
if part == 0 {
2023-08-19 14:32:45 +00:00
n = rw . Size ( )
} else {
n , err = io . CopyN ( rw , up . in , up . chunkSize )
if err == io . EOF {
fs . Debugf ( up . o , "Read less than a full chunk, making this the last one." )
hasMoreParts = false
} else if err != nil {
// other kinds of errors indicate failure
up . f . putRW ( rw )
return err
2020-05-29 11:36:31 +00:00
}
2023-08-19 14:32:45 +00:00
}
2020-05-29 11:36:31 +00:00
2023-08-19 14:32:45 +00:00
// Keep stats up to date
2023-10-13 14:46:36 +00:00
up . parts += 1
2023-08-19 14:32:45 +00:00
up . size += n
if part > maxParts {
up . f . putRW ( rw )
return fmt . Errorf ( "%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size" , up . o , up . size , up . parts , maxParts )
2020-05-29 11:36:31 +00:00
}
2023-08-19 14:32:45 +00:00
part := part // for the closure
g . Go ( func ( ) ( err error ) {
defer up . f . putRW ( rw )
_ , err = up . WriteChunk ( gCtx , part , rw )
return err
} )
}
2020-05-29 11:36:31 +00:00
err = g . Wait ( )
if err != nil {
return err
2017-09-16 20:43:48 +00:00
}
2023-08-19 14:32:45 +00:00
return up . Close ( ctx )
2017-09-16 20:43:48 +00:00
}
2023-08-19 14:32:45 +00:00
// Copy the chunks from the source to the destination
func ( up * largeUpload ) Copy ( ctx context . Context ) ( err error ) {
defer atexit . OnError ( & err , func ( ) { _ = up . Abort ( ctx ) } ) ( )
2020-05-29 11:36:31 +00:00
fs . Debugf ( up . o , "Starting %s of large file in %d chunks (id %q)" , up . what , up . parts , up . id )
var (
2023-08-19 14:32:45 +00:00
g , gCtx = errgroup . WithContext ( ctx )
remaining = up . size
2020-05-29 11:36:31 +00:00
)
2023-08-19 14:32:45 +00:00
g . SetLimit ( up . f . opt . UploadConcurrency )
2023-10-12 10:15:42 +00:00
for part := 0 ; part < up . parts ; part ++ {
2023-08-19 14:32:45 +00:00
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in copying all the other parts.
if gCtx . Err ( ) != nil {
break
2023-01-22 12:46:23 +00:00
}
2017-01-29 22:21:39 +00:00
2023-08-19 14:32:45 +00:00
reqSize := remaining
if reqSize >= up . chunkSize {
reqSize = up . chunkSize
2016-06-15 17:49:11 +00:00
}
2023-08-19 14:32:45 +00:00
part := part // for the closure
g . Go ( func ( ) ( err error ) {
return up . copyChunk ( gCtx , part , reqSize )
} )
remaining -= reqSize
}
2020-05-29 11:36:31 +00:00
err = g . Wait ( )
if err != nil {
return err
2016-06-15 17:49:11 +00:00
}
2023-08-19 14:32:45 +00:00
return up . Close ( ctx )
2016-06-15 17:49:11 +00:00
}