diff --git a/b2/api/types.go b/b2/api/types.go index a9734d7a1..b8a8d612c 100644 --- a/b2/api/types.go +++ b/b2/api/types.go @@ -107,17 +107,18 @@ type GetUploadURLResponse struct { AuthorizationToken string `json:"authorizationToken"` // The authorizationToken that must be used when uploading files to this bucket, see b2_upload_file. } -// FileInfo is received from b2_upload_file and b2_get_file_info +// FileInfo is received from b2_upload_file, b2_get_file_info and b2_finish_large_file type FileInfo struct { - ID string `json:"fileId"` // The unique identifier for this version of this file. Used with b2_get_file_info, b2_download_file_by_id, and b2_delete_file_version. - Name string `json:"fileName"` // The name of this file, which can be used with b2_download_file_by_name. - Action string `json:"action"` // Either "upload" or "hide". "upload" means a file that was uploaded to B2 Cloud Storage. "hide" means a file version marking the file as hidden, so that it will not show up in b2_list_file_names. The result of b2_list_file_names will contain only "upload". The result of b2_list_file_versions may have both. - AccountID string `json:"accountId"` // Your account ID. - BucketID string `json:"bucketId"` // The bucket that the file is in. - Size int64 `json:"contentLength"` // The number of bytes stored in the file. - SHA1 string `json:"contentSha1"` // The SHA1 of the bytes stored in the file. - ContentType string `json:"contentType"` // The MIME type of the file. - Info map[string]string `json:"fileInfo"` // The custom information that was uploaded with the file. This is a JSON object, holding the name/value pairs that were uploaded with the file. + ID string `json:"fileId"` // The unique identifier for this version of this file. Used with b2_get_file_info, b2_download_file_by_id, and b2_delete_file_version. + Name string `json:"fileName"` // The name of this file, which can be used with b2_download_file_by_name. + Action string `json:"action"` // Either "upload" or "hide". "upload" means a file that was uploaded to B2 Cloud Storage. "hide" means a file version marking the file as hidden, so that it will not show up in b2_list_file_names. The result of b2_list_file_names will contain only "upload". The result of b2_list_file_versions may have both. + AccountID string `json:"accountId"` // Your account ID. + BucketID string `json:"bucketId"` // The bucket that the file is in. + Size int64 `json:"contentLength"` // The number of bytes stored in the file. + UploadTimestamp Timestamp `json:"uploadTimestamp"` // This is a UTC time when this file was uploaded. + SHA1 string `json:"contentSha1"` // The SHA1 of the bytes stored in the file. + ContentType string `json:"contentType"` // The MIME type of the file. + Info map[string]string `json:"fileInfo"` // The custom information that was uploaded with the file. This is a JSON object, holding the name/value pairs that were uploaded with the file. } // CreateBucketRequest is used to create a bucket @@ -149,3 +150,67 @@ type HideFileRequest struct { type GetFileInfoRequest struct { ID string `json:"fileId"` // The ID of the file, as returned by b2_upload_file, b2_list_file_names, or b2_list_file_versions. } + +// StartLargeFileRequest (b2_start_large_file) Prepares for uploading the parts of a large file. +// +// If the original source of the file being uploaded has a last +// modified time concept, Backblaze recommends using +// src_last_modified_millis as the name, and a string holding the base +// 10 number number of milliseconds since midnight, January 1, 1970 +// UTC. This fits in a 64 bit integer such as the type "long" in the +// programming language Java. It is intended to be compatible with +// Java's time long. For example, it can be passed directly into the +// Java call Date.setTime(long time). +// +// If the caller knows the SHA1 of the entire large file being +// uploaded, Backblaze recommends using large_file_sha1 as the name, +// and a 40 byte hex string representing the SHA1. +// +// Example: { "src_last_modified_millis" : "1452802803026", "large_file_sha1" : "a3195dc1e7b46a2ff5da4b3c179175b75671e80d", "color": "blue" } +type StartLargeFileRequest struct { + BucketID string `json:"bucketId"` //The ID of the bucket that the file will go in. + Name string `json:"fileName"` // The name of the file. See Files for requirements on file names. + ContentType string `json:"contentType"` // The MIME type of the content of the file, which will be returned in the Content-Type header when downloading the file. Use the Content-Type b2/x-auto to automatically set the stored Content-Type post upload. In the case where a file extension is absent or the lookup fails, the Content-Type is set to application/octet-stream. + Info map[string]string `json:"fileInfo"` // A JSON object holding the name/value pairs for the custom file info. +} + +// StartLargeFileResponse is the response to StartLargeFileRequest +type StartLargeFileResponse struct { + ID string `json:"fileId"` // The unique identifier for this version of this file. Used with b2_get_file_info, b2_download_file_by_id, and b2_delete_file_version. + Name string `json:"fileName"` // The name of this file, which can be used with b2_download_file_by_name. + AccountID string `json:"accountId"` // The identifier for the account. + BucketID string `json:"bucketId"` // The unique ID of the bucket. + ContentType string `json:"contentType"` // The MIME type of the file. + Info map[string]string `json:"fileInfo"` // The custom information that was uploaded with the file. This is a JSON object, holding the name/value pairs that were uploaded with the file. + UploadTimestamp Timestamp `json:"uploadTimestamp"` // This is a UTC time when this file was uploaded. +} + +// GetUploadPartURLRequest is passed to b2_get_upload_part_url +type GetUploadPartURLRequest struct { + ID string `json:"fileId"` // The unique identifier of the file being uploaded. +} + +// GetUploadPartURLResponse is received from b2_get_upload_url +type GetUploadPartURLResponse struct { + ID string `json:"fileId"` // The unique identifier of the file being uploaded. + UploadURL string `json:"uploadUrl"` // The URL that can be used to upload files to this bucket, see b2_upload_part. + AuthorizationToken string `json:"authorizationToken"` // The authorizationToken that must be used when uploading files to this bucket, see b2_upload_part. +} + +// UploadPartResponse is the response to b2_upload_part +type UploadPartResponse struct { + ID string `json:"fileId"` // The unique identifier of the file being uploaded. + PartNumber int64 `json:"partNumber"` // Which part this is (starting from 1) + Size int64 `json:"contentLength"` // The number of bytes stored in the file. + SHA1 string `json:"contentSha1"` // The SHA1 of the bytes stored in the file. +} + +// FinishLargeFileRequest is passed to b2_finish_large_file +// +// The response is a FileInfo object (with extra AccountID and BucketID fields which we ignore). +// +// Large files do not have a SHA1 checksum. The value will always be "none". +type FinishLargeFileRequest struct { + ID string `json:"fileId"` // The unique identifier of the file being uploaded. + SHA1s []string `json:"partSha1Array"` // A JSON array of hex SHA1 checksums of the parts of the large file. This is a double-check that the right parts were uploaded in the right order, and that none were missed. Note that the part numbers start at 1, and the SHA1 of the part 1 is the first string in the array, at index 0. +} diff --git a/b2/b2.go b/b2/b2.go index fb5c3b8b3..1176eb63e 100644 --- a/b2/b2.go +++ b/b2/b2.go @@ -1,9 +1,6 @@ // Package b2 provides an interface to the Backblaze B2 object storage system package b2 -// FIXME if b2 could set the mod time then it has everything else to -// implement mod times. It is just missing that bit of API. - // FIXME should we remove sha1 checks from here as rclone now supports // checking SHA1s? @@ -28,6 +25,7 @@ import ( "github.com/ncw/rclone/pacer" "github.com/ncw/rclone/rest" "github.com/pkg/errors" + "github.com/spf13/pflag" ) const ( @@ -35,10 +33,22 @@ const ( headerPrefix = "x-bz-info-" // lower case as that is what the server returns timeKey = "src_last_modified_millis" timeHeader = headerPrefix + timeKey + sha1Key = "large_file_sha1" sha1Header = "X-Bz-Content-Sha1" minSleep = 10 * time.Millisecond maxSleep = 2 * time.Second decayConstant = 2 // bigger for slower decay, exponential + maxParts = 10000 +) + +// Globals +var ( + minChunkSize = fs.SizeSuffix(100E6) + chunkSize = fs.SizeSuffix(96 * 1024 * 1024) + uploadCutoff = fs.SizeSuffix(5E9) + errorAuthTokenExpired = errors.New("b2 auth token expired") + errorUploadTokenExpired = errors.New("b2 upload token expired") + errorUploadPartTokenExpired = errors.New("b2 upload part token expired") ) // Register with Fs @@ -59,6 +69,8 @@ func init() { }, }, }) + pflag.VarP(&uploadCutoff, "b2-upload-cutoff", "", "Cutoff for switching to chunked upload") + pflag.VarP(&chunkSize, "b2-chunk-size", "", "Upload chunk size. Must fit in memory.") } // Fs represents a remote b2 server @@ -146,15 +158,14 @@ func (f *Fs) shouldRetryNoReauth(resp *http.Response, err error) (bool, error) { // shouldRetry returns a boolean as to whether this resp and err // deserve to be retried. It returns the err as a convenience func (f *Fs) shouldRetry(resp *http.Response, err error) (bool, error) { - if resp != nil && resp.StatusCode == 401 { - fs.Debug(f, "b2 auth token expired refetching") + if err == nil && resp != nil && resp.StatusCode == 401 { + err = errorAuthTokenExpired + fs.Debug(f, "%v", err) // Reauth authErr := f.authorizeAccount() if authErr != nil { err = authErr } - // Refetch upload URL - f.clearUploadURL() return true, err } return f.shouldRetryNoReauth(resp, err) @@ -182,6 +193,12 @@ func errorHandler(resp *http.Response) error { // NewFs contstructs an Fs from the path, bucket:path func NewFs(name, root string) (fs.Fs, error) { + if uploadCutoff < chunkSize { + return nil, errors.Errorf("b2: upload cutoff must be less than chunk size %v - was %v", chunkSize, uploadCutoff) + } + if chunkSize < minChunkSize { + return nil, errors.Errorf("b2: chunk size can't be less than %v - was %v", minChunkSize, chunkSize) + } bucket, directory, err := parsePath(root) if err != nil { return nil, err @@ -273,7 +290,7 @@ func (f *Fs) getUploadURL() (upload *api.GetUploadURLResponse, err error) { } err := f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &request, &upload) - return f.shouldRetryNoReauth(resp, err) + return f.shouldRetry(resp, err) }) if err != nil { return nil, errors.Wrap(err, "failed to get upload URL") @@ -286,6 +303,9 @@ func (f *Fs) getUploadURL() (upload *api.GetUploadURLResponse, err error) { // returnUploadURL returns the UploadURL to the cache func (f *Fs) returnUploadURL(upload *api.GetUploadURLResponse) { + if upload == nil { + return + } f.uploadMu.Lock() f.uploads = append(f.uploads, upload) f.uploadMu.Unlock() @@ -770,7 +790,27 @@ func (o *Object) Size() int64 { return o.size } -// decodeMetaData sets the metadata in the object from info +// decodeMetaDataRaw sets the metadata from the data passed in +// +// Sets +// o.id +// o.modTime +// o.size +// o.sha1 +func (o *Object) decodeMetaDataRaw(ID, SHA1 string, Size int64, UploadTimestamp api.Timestamp, Info map[string]string) (err error) { + o.id = ID + o.sha1 = SHA1 + // Read SHA1 from metadata if it exists and isn't set + if o.sha1 == "" || o.sha1 == "none" { + o.sha1 = Info[sha1Key] + } + o.size = Size + // Use the UploadTimestamp if can't get file info + o.modTime = time.Time(UploadTimestamp) + return o.parseTimeString(Info[timeKey]) +} + +// decodeMetaData sets the metadata in the object from an api.File // // Sets // o.id @@ -778,12 +818,18 @@ func (o *Object) Size() int64 { // o.size // o.sha1 func (o *Object) decodeMetaData(info *api.File) (err error) { - o.id = info.ID - o.sha1 = info.SHA1 - o.size = info.Size - // Use the UploadTimestamp if can't get file info - o.modTime = time.Time(info.UploadTimestamp) - return o.parseTimeString(info.Info[timeKey]) + return o.decodeMetaDataRaw(info.ID, info.SHA1, info.Size, info.UploadTimestamp, info.Info) +} + +// decodeMetaDataFileInfo sets the metadata in the object from an api.FileInfo +// +// Sets +// o.id +// o.modTime +// o.size +// o.sha1 +func (o *Object) decodeMetaDataFileInfo(info *api.FileInfo) (err error) { + return o.decodeMetaDataRaw(info.ID, info.SHA1, info.Size, info.UploadTimestamp, info.Info) } // readMetaData gets the metadata if it hasn't already been fetched @@ -989,6 +1035,16 @@ func urlEncode(in string) string { // The new object may have been created if an error is returned func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { size := src.Size() + + // If a large file upload in chunks - see upload.go + if size >= int64(uploadCutoff) { + up, err := o.fs.newLargeUpload(o, in, src) + if err != nil { + return err + } + return up.Upload() + } + modTime := src.ModTime() calculatedSha1, _ := src.Hash(fs.HashSHA1) @@ -1106,17 +1162,21 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { // Don't retry, return a retry error instead err = o.fs.pacer.CallNoRetry(func() (bool, error) { resp, err := o.fs.srv.CallJSON(&opts, nil, &response) - return o.fs.shouldRetry(resp, err) + if err == nil && resp != nil && resp.StatusCode == 401 { + err = errorUploadTokenExpired + fs.Debug(o, "%v", err) + // Invalidate this Upload URL + upload = nil + // Refetch upload URLs + o.fs.clearUploadURL() + return true, err + } + return o.fs.shouldRetryNoReauth(resp, err) }) if err != nil { return err } - o.id = response.ID - o.sha1 = response.SHA1 - o.size = response.Size - o.modTime = modTime - _ = o.parseTimeString(response.Info[timeKey]) - return nil + return o.decodeMetaDataFileInfo(&response) } // Remove an object diff --git a/b2/upload.go b/b2/upload.go new file mode 100644 index 000000000..99125764d --- /dev/null +++ b/b2/upload.go @@ -0,0 +1,245 @@ +// Upload large files for b2 +// +// Docs - https://www.backblaze.com/b2/docs/large_files.html + +package b2 + +import ( + "bytes" + "crypto/sha1" + "fmt" + "io" + "sync" + + "github.com/ncw/rclone/b2/api" + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/rest" + "github.com/pkg/errors" +) + +// largeUpload is used to control the upload of large files which need chunking +type largeUpload struct { + f *Fs // parent Fs + o *Object // object being uploaded + in io.Reader // read the data from here + id string // ID of the file being uploaded + size int64 // total size + parts int64 // calculated number of parts + sha1s []string // slice of SHA1s for each part + uploadMu sync.Mutex // lock for upload variable + uploads []*api.GetUploadPartURLResponse // result of get upload URL calls +} + +// newLargeUpload starts an upload of object o from in with metadata in src +func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) { + remote := src.Remote() + size := src.Size() + parts := size / int64(chunkSize) + if size%int64(chunkSize) != 0 { + parts++ + } + if parts > maxParts { + return nil, errors.Errorf("%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size", remote, size, parts, maxParts) + } + modTime := src.ModTime() + opts := rest.Opts{ + Method: "POST", + Path: "/b2_start_large_file", + } + bucketID, err := f.getBucketID() + if err != nil { + return nil, err + } + var request = api.StartLargeFileRequest{ + BucketID: bucketID, + Name: remote, + ContentType: fs.MimeType(src), + Info: map[string]string{ + timeKey: timeString(modTime), + }, + } + // Set the SHA1 if known + if calculatedSha1, err := src.Hash(fs.HashSHA1); err == nil && calculatedSha1 != "" { + request.Info[sha1Key] = calculatedSha1 + } + var response api.StartLargeFileResponse + err = f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, &request, &response) + return f.shouldRetry(resp, err) + }) + if err != nil { + return nil, err + } + up = &largeUpload{ + f: f, + o: o, + in: in, + id: response.ID, + size: size, + parts: parts, + sha1s: make([]string, parts), + } + return up, nil +} + +// getUploadURL returns the upload info with the UploadURL and the AuthorizationToken +// +// This should be returned with returnUploadURL when finished +func (up *largeUpload) getUploadURL() (upload *api.GetUploadPartURLResponse, err error) { + up.uploadMu.Lock() + defer up.uploadMu.Unlock() + if len(up.uploads) == 0 { + opts := rest.Opts{ + Method: "POST", + Path: "/b2_get_upload_part_url", + } + var request = api.GetUploadPartURLRequest{ + ID: up.id, + } + err := up.f.pacer.Call(func() (bool, error) { + resp, err := up.f.srv.CallJSON(&opts, &request, &upload) + return up.f.shouldRetry(resp, err) + }) + if err != nil { + return nil, errors.Wrap(err, "failed to get upload URL") + } + } else { + upload, up.uploads = up.uploads[0], up.uploads[1:] + } + return upload, nil +} + +// returnUploadURL returns the UploadURL to the cache +func (up *largeUpload) returnUploadURL(upload *api.GetUploadPartURLResponse) { + if upload == nil { + return + } + up.uploadMu.Lock() + up.uploads = append(up.uploads, upload) + up.uploadMu.Unlock() +} + +// clearUploadURL clears the current UploadURL and the AuthorizationToken +func (up *largeUpload) clearUploadURL() { + up.uploadMu.Lock() + up.uploads = nil + up.uploadMu.Unlock() +} + +// Transfer a chunk +func (up *largeUpload) transferChunk(part int64, body []byte) error { + calculatedSHA1 := fmt.Sprintf("%x", sha1.Sum(body)) + up.sha1s[part-1] = calculatedSHA1 + size := int64(len(body)) + err := up.f.pacer.Call(func() (bool, error) { + fs.Debug(up.o, "Sending chunk %d length %d", part, len(body)) + + // Get upload URL + upload, err := up.getUploadURL() + if err != nil { + return false, err + } + + // Authorization + // + // An upload authorization token, from b2_get_upload_part_url. + // + // X-Bz-Part-Number + // + // A number from 1 to 10000. The parts uploaded for one file + // must have contiguous numbers, starting with 1. + // + // Content-Length + // + // The number of bytes in the file being uploaded. Note that + // this header is required; you cannot leave it out and just + // use chunked encoding. The minimum size of every part but + // the last one is 100MB. + // + // X-Bz-Content-Sha1 + // + // The SHA1 checksum of the this part of the file. B2 will + // check this when the part is uploaded, to make sure that the + // data arrived correctly. The same SHA1 checksum must be + // passed to b2_finish_large_file. + opts := rest.Opts{ + Method: "POST", + Absolute: true, + Path: upload.UploadURL, + Body: bytes.NewBuffer(body), + ExtraHeaders: map[string]string{ + "Authorization": upload.AuthorizationToken, + "X-Bz-Part-Number": fmt.Sprintf("%d", part), + sha1Header: calculatedSHA1, + }, + ContentLength: &size, + } + + var response api.UploadPartResponse + + resp, err := up.f.srv.CallJSON(&opts, nil, &response) + if err == nil && resp != nil && resp.StatusCode == 401 { + err = errorUploadPartTokenExpired + fs.Debug(up.o, "%v", err) + // Refetch upload part URLs and ditch this current one + up.clearUploadURL() + return true, err + } + up.returnUploadURL(upload) + return up.f.shouldRetryNoReauth(resp, err) + }) + return err +} + +// finish closes off the large upload +func (up *largeUpload) finish() error { + opts := rest.Opts{ + Method: "POST", + Path: "/b2_finish_large_file", + } + var request = api.FinishLargeFileRequest{ + ID: up.id, + SHA1s: up.sha1s, + } + var response api.FileInfo + err := up.f.pacer.Call(func() (bool, error) { + resp, err := up.f.srv.CallJSON(&opts, &request, &response) + return up.f.shouldRetry(resp, err) + }) + if err != nil { + return err + } + return up.o.decodeMetaDataFileInfo(&response) +} + +// Upload uploads the chunks from the input +func (up *largeUpload) Upload() error { + fs.Debug(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id) + buf := make([]byte, chunkSize) + remaining := up.size + for part := int64(1); part <= up.parts; part++ { + reqSize := remaining + if reqSize >= int64(chunkSize) { + reqSize = int64(chunkSize) + } else { + buf = buf[:reqSize] + } + + // FIXME could parallelise this + + // Read the chunk + _, err := io.ReadFull(up.in, buf) + if err != nil { + return err + } + + // Transfer the chunk + err = up.transferChunk(part, buf) + if err != nil { + return err + } + + remaining -= reqSize + } + return up.finish() +} diff --git a/docs/content/b2.md b/docs/content/b2.md index 35a0109b3..870712f1b 100644 --- a/docs/content/b2.md +++ b/docs/content/b2.md @@ -1,7 +1,7 @@ --- title: "B2" description: "Backblaze B2" -date: "2015-12-29" +date: "2016-06-15" --- Backblaze B2 @@ -106,6 +106,9 @@ method to set the modification time independent of doing an upload. The SHA1 checksums of the files are checked on upload and download and will be used in the syncing process. You can use the `--checksum` flag. +Large files which are uploaded in chunks will store their SHA1 on the +object as `X-Bz-Info-large_file_sha1` as recommended by Backblaze. + ### Versions ### When rclone uploads a new version of a file it creates a [new version @@ -130,8 +133,26 @@ depending on your hardware, how big the files are, how much you want to load your computer, etc. The default of `--transfers 4` is definitely too low for Backblaze B2 though. +### Specific options ### + +Here are the command line options specific to this cloud storage +system. + +#### --b2-chunk-size valuee=SIZE #### + +When uploading large files chunk the file into this size. Note that +these chunks are buffered in memory. 100,000,000 Bytes is the minimim +size (default 96M). + +#### --b2-upload-cutoff=SIZE #### + +Cutoff for switching to chunked upload (default 4.657GiB == +5GB). Files above this size will be uploaded in chunks of +`--b2-chunk-size`. The default value is the largest file which can be +uploaded without chunks. + ### API ### Here are [some notes I made on the backblaze API](https://gist.github.com/ncw/166dabf352b399f1cc1c) while -integrating it with rclone which detail the changes I'd like to see. +integrating it with rclone. diff --git a/fs/operations.go b/fs/operations.go index 497b65747..d688ce87d 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -169,7 +169,7 @@ func Equal(src, dst Object) bool { } // MimeType returns a guess at the mime type from the extension -func MimeType(o Object) string { +func MimeType(o ObjectInfo) string { mimeType := mime.TypeByExtension(path.Ext(o.Remote())) if !strings.ContainsRune(mimeType, '/') { mimeType = "application/octet-stream"