b2: implement large file uploading - fixes #456

This commit is contained in:
Nick Craig-Wood 2016-06-15 18:49:11 +01:00
parent 1b4370bde1
commit 2a46be8cf3
5 changed files with 426 additions and 35 deletions

View file

@ -107,17 +107,18 @@ type GetUploadURLResponse struct {
AuthorizationToken string `json:"authorizationToken"` // The authorizationToken that must be used when uploading files to this bucket, see b2_upload_file.
}
// FileInfo is received from b2_upload_file and b2_get_file_info
// FileInfo is received from b2_upload_file, b2_get_file_info and b2_finish_large_file
type FileInfo struct {
ID string `json:"fileId"` // The unique identifier for this version of this file. Used with b2_get_file_info, b2_download_file_by_id, and b2_delete_file_version.
Name string `json:"fileName"` // The name of this file, which can be used with b2_download_file_by_name.
Action string `json:"action"` // Either "upload" or "hide". "upload" means a file that was uploaded to B2 Cloud Storage. "hide" means a file version marking the file as hidden, so that it will not show up in b2_list_file_names. The result of b2_list_file_names will contain only "upload". The result of b2_list_file_versions may have both.
AccountID string `json:"accountId"` // Your account ID.
BucketID string `json:"bucketId"` // The bucket that the file is in.
Size int64 `json:"contentLength"` // The number of bytes stored in the file.
SHA1 string `json:"contentSha1"` // The SHA1 of the bytes stored in the file.
ContentType string `json:"contentType"` // The MIME type of the file.
Info map[string]string `json:"fileInfo"` // The custom information that was uploaded with the file. This is a JSON object, holding the name/value pairs that were uploaded with the file.
ID string `json:"fileId"` // The unique identifier for this version of this file. Used with b2_get_file_info, b2_download_file_by_id, and b2_delete_file_version.
Name string `json:"fileName"` // The name of this file, which can be used with b2_download_file_by_name.
Action string `json:"action"` // Either "upload" or "hide". "upload" means a file that was uploaded to B2 Cloud Storage. "hide" means a file version marking the file as hidden, so that it will not show up in b2_list_file_names. The result of b2_list_file_names will contain only "upload". The result of b2_list_file_versions may have both.
AccountID string `json:"accountId"` // Your account ID.
BucketID string `json:"bucketId"` // The bucket that the file is in.
Size int64 `json:"contentLength"` // The number of bytes stored in the file.
UploadTimestamp Timestamp `json:"uploadTimestamp"` // This is a UTC time when this file was uploaded.
SHA1 string `json:"contentSha1"` // The SHA1 of the bytes stored in the file.
ContentType string `json:"contentType"` // The MIME type of the file.
Info map[string]string `json:"fileInfo"` // The custom information that was uploaded with the file. This is a JSON object, holding the name/value pairs that were uploaded with the file.
}
// CreateBucketRequest is used to create a bucket
@ -149,3 +150,67 @@ type HideFileRequest struct {
type GetFileInfoRequest struct {
ID string `json:"fileId"` // The ID of the file, as returned by b2_upload_file, b2_list_file_names, or b2_list_file_versions.
}
// StartLargeFileRequest (b2_start_large_file) Prepares for uploading the parts of a large file.
//
// If the original source of the file being uploaded has a last
// modified time concept, Backblaze recommends using
// src_last_modified_millis as the name, and a string holding the base
// 10 number number of milliseconds since midnight, January 1, 1970
// UTC. This fits in a 64 bit integer such as the type "long" in the
// programming language Java. It is intended to be compatible with
// Java's time long. For example, it can be passed directly into the
// Java call Date.setTime(long time).
//
// If the caller knows the SHA1 of the entire large file being
// uploaded, Backblaze recommends using large_file_sha1 as the name,
// and a 40 byte hex string representing the SHA1.
//
// Example: { "src_last_modified_millis" : "1452802803026", "large_file_sha1" : "a3195dc1e7b46a2ff5da4b3c179175b75671e80d", "color": "blue" }
type StartLargeFileRequest struct {
BucketID string `json:"bucketId"` //The ID of the bucket that the file will go in.
Name string `json:"fileName"` // The name of the file. See Files for requirements on file names.
ContentType string `json:"contentType"` // The MIME type of the content of the file, which will be returned in the Content-Type header when downloading the file. Use the Content-Type b2/x-auto to automatically set the stored Content-Type post upload. In the case where a file extension is absent or the lookup fails, the Content-Type is set to application/octet-stream.
Info map[string]string `json:"fileInfo"` // A JSON object holding the name/value pairs for the custom file info.
}
// StartLargeFileResponse is the response to StartLargeFileRequest
type StartLargeFileResponse struct {
ID string `json:"fileId"` // The unique identifier for this version of this file. Used with b2_get_file_info, b2_download_file_by_id, and b2_delete_file_version.
Name string `json:"fileName"` // The name of this file, which can be used with b2_download_file_by_name.
AccountID string `json:"accountId"` // The identifier for the account.
BucketID string `json:"bucketId"` // The unique ID of the bucket.
ContentType string `json:"contentType"` // The MIME type of the file.
Info map[string]string `json:"fileInfo"` // The custom information that was uploaded with the file. This is a JSON object, holding the name/value pairs that were uploaded with the file.
UploadTimestamp Timestamp `json:"uploadTimestamp"` // This is a UTC time when this file was uploaded.
}
// GetUploadPartURLRequest is passed to b2_get_upload_part_url
type GetUploadPartURLRequest struct {
ID string `json:"fileId"` // The unique identifier of the file being uploaded.
}
// GetUploadPartURLResponse is received from b2_get_upload_url
type GetUploadPartURLResponse struct {
ID string `json:"fileId"` // The unique identifier of the file being uploaded.
UploadURL string `json:"uploadUrl"` // The URL that can be used to upload files to this bucket, see b2_upload_part.
AuthorizationToken string `json:"authorizationToken"` // The authorizationToken that must be used when uploading files to this bucket, see b2_upload_part.
}
// UploadPartResponse is the response to b2_upload_part
type UploadPartResponse struct {
ID string `json:"fileId"` // The unique identifier of the file being uploaded.
PartNumber int64 `json:"partNumber"` // Which part this is (starting from 1)
Size int64 `json:"contentLength"` // The number of bytes stored in the file.
SHA1 string `json:"contentSha1"` // The SHA1 of the bytes stored in the file.
}
// FinishLargeFileRequest is passed to b2_finish_large_file
//
// The response is a FileInfo object (with extra AccountID and BucketID fields which we ignore).
//
// Large files do not have a SHA1 checksum. The value will always be "none".
type FinishLargeFileRequest struct {
ID string `json:"fileId"` // The unique identifier of the file being uploaded.
SHA1s []string `json:"partSha1Array"` // A JSON array of hex SHA1 checksums of the parts of the large file. This is a double-check that the right parts were uploaded in the right order, and that none were missed. Note that the part numbers start at 1, and the SHA1 of the part 1 is the first string in the array, at index 0.
}

104
b2/b2.go
View file

@ -1,9 +1,6 @@
// Package b2 provides an interface to the Backblaze B2 object storage system
package b2
// FIXME if b2 could set the mod time then it has everything else to
// implement mod times. It is just missing that bit of API.
// FIXME should we remove sha1 checks from here as rclone now supports
// checking SHA1s?
@ -28,6 +25,7 @@ import (
"github.com/ncw/rclone/pacer"
"github.com/ncw/rclone/rest"
"github.com/pkg/errors"
"github.com/spf13/pflag"
)
const (
@ -35,10 +33,22 @@ const (
headerPrefix = "x-bz-info-" // lower case as that is what the server returns
timeKey = "src_last_modified_millis"
timeHeader = headerPrefix + timeKey
sha1Key = "large_file_sha1"
sha1Header = "X-Bz-Content-Sha1"
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
maxParts = 10000
)
// Globals
var (
minChunkSize = fs.SizeSuffix(100E6)
chunkSize = fs.SizeSuffix(96 * 1024 * 1024)
uploadCutoff = fs.SizeSuffix(5E9)
errorAuthTokenExpired = errors.New("b2 auth token expired")
errorUploadTokenExpired = errors.New("b2 upload token expired")
errorUploadPartTokenExpired = errors.New("b2 upload part token expired")
)
// Register with Fs
@ -59,6 +69,8 @@ func init() {
},
},
})
pflag.VarP(&uploadCutoff, "b2-upload-cutoff", "", "Cutoff for switching to chunked upload")
pflag.VarP(&chunkSize, "b2-chunk-size", "", "Upload chunk size. Must fit in memory.")
}
// Fs represents a remote b2 server
@ -146,15 +158,14 @@ func (f *Fs) shouldRetryNoReauth(resp *http.Response, err error) (bool, error) {
// shouldRetry returns a boolean as to whether this resp and err
// deserve to be retried. It returns the err as a convenience
func (f *Fs) shouldRetry(resp *http.Response, err error) (bool, error) {
if resp != nil && resp.StatusCode == 401 {
fs.Debug(f, "b2 auth token expired refetching")
if err == nil && resp != nil && resp.StatusCode == 401 {
err = errorAuthTokenExpired
fs.Debug(f, "%v", err)
// Reauth
authErr := f.authorizeAccount()
if authErr != nil {
err = authErr
}
// Refetch upload URL
f.clearUploadURL()
return true, err
}
return f.shouldRetryNoReauth(resp, err)
@ -182,6 +193,12 @@ func errorHandler(resp *http.Response) error {
// NewFs contstructs an Fs from the path, bucket:path
func NewFs(name, root string) (fs.Fs, error) {
if uploadCutoff < chunkSize {
return nil, errors.Errorf("b2: upload cutoff must be less than chunk size %v - was %v", chunkSize, uploadCutoff)
}
if chunkSize < minChunkSize {
return nil, errors.Errorf("b2: chunk size can't be less than %v - was %v", minChunkSize, chunkSize)
}
bucket, directory, err := parsePath(root)
if err != nil {
return nil, err
@ -273,7 +290,7 @@ func (f *Fs) getUploadURL() (upload *api.GetUploadURLResponse, err error) {
}
err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &upload)
return f.shouldRetryNoReauth(resp, err)
return f.shouldRetry(resp, err)
})
if err != nil {
return nil, errors.Wrap(err, "failed to get upload URL")
@ -286,6 +303,9 @@ func (f *Fs) getUploadURL() (upload *api.GetUploadURLResponse, err error) {
// returnUploadURL returns the UploadURL to the cache
func (f *Fs) returnUploadURL(upload *api.GetUploadURLResponse) {
if upload == nil {
return
}
f.uploadMu.Lock()
f.uploads = append(f.uploads, upload)
f.uploadMu.Unlock()
@ -770,7 +790,27 @@ func (o *Object) Size() int64 {
return o.size
}
// decodeMetaData sets the metadata in the object from info
// decodeMetaDataRaw sets the metadata from the data passed in
//
// Sets
// o.id
// o.modTime
// o.size
// o.sha1
func (o *Object) decodeMetaDataRaw(ID, SHA1 string, Size int64, UploadTimestamp api.Timestamp, Info map[string]string) (err error) {
o.id = ID
o.sha1 = SHA1
// Read SHA1 from metadata if it exists and isn't set
if o.sha1 == "" || o.sha1 == "none" {
o.sha1 = Info[sha1Key]
}
o.size = Size
// Use the UploadTimestamp if can't get file info
o.modTime = time.Time(UploadTimestamp)
return o.parseTimeString(Info[timeKey])
}
// decodeMetaData sets the metadata in the object from an api.File
//
// Sets
// o.id
@ -778,12 +818,18 @@ func (o *Object) Size() int64 {
// o.size
// o.sha1
func (o *Object) decodeMetaData(info *api.File) (err error) {
o.id = info.ID
o.sha1 = info.SHA1
o.size = info.Size
// Use the UploadTimestamp if can't get file info
o.modTime = time.Time(info.UploadTimestamp)
return o.parseTimeString(info.Info[timeKey])
return o.decodeMetaDataRaw(info.ID, info.SHA1, info.Size, info.UploadTimestamp, info.Info)
}
// decodeMetaDataFileInfo sets the metadata in the object from an api.FileInfo
//
// Sets
// o.id
// o.modTime
// o.size
// o.sha1
func (o *Object) decodeMetaDataFileInfo(info *api.FileInfo) (err error) {
return o.decodeMetaDataRaw(info.ID, info.SHA1, info.Size, info.UploadTimestamp, info.Info)
}
// readMetaData gets the metadata if it hasn't already been fetched
@ -989,6 +1035,16 @@ func urlEncode(in string) string {
// The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
size := src.Size()
// If a large file upload in chunks - see upload.go
if size >= int64(uploadCutoff) {
up, err := o.fs.newLargeUpload(o, in, src)
if err != nil {
return err
}
return up.Upload()
}
modTime := src.ModTime()
calculatedSha1, _ := src.Hash(fs.HashSHA1)
@ -1106,17 +1162,21 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
// Don't retry, return a retry error instead
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
resp, err := o.fs.srv.CallJSON(&opts, nil, &response)
return o.fs.shouldRetry(resp, err)
if err == nil && resp != nil && resp.StatusCode == 401 {
err = errorUploadTokenExpired
fs.Debug(o, "%v", err)
// Invalidate this Upload URL
upload = nil
// Refetch upload URLs
o.fs.clearUploadURL()
return true, err
}
return o.fs.shouldRetryNoReauth(resp, err)
})
if err != nil {
return err
}
o.id = response.ID
o.sha1 = response.SHA1
o.size = response.Size
o.modTime = modTime
_ = o.parseTimeString(response.Info[timeKey])
return nil
return o.decodeMetaDataFileInfo(&response)
}
// Remove an object

245
b2/upload.go Normal file
View file

@ -0,0 +1,245 @@
// Upload large files for b2
//
// Docs - https://www.backblaze.com/b2/docs/large_files.html
package b2
import (
"bytes"
"crypto/sha1"
"fmt"
"io"
"sync"
"github.com/ncw/rclone/b2/api"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/rest"
"github.com/pkg/errors"
)
// largeUpload is used to control the upload of large files which need chunking
type largeUpload struct {
f *Fs // parent Fs
o *Object // object being uploaded
in io.Reader // read the data from here
id string // ID of the file being uploaded
size int64 // total size
parts int64 // calculated number of parts
sha1s []string // slice of SHA1s for each part
uploadMu sync.Mutex // lock for upload variable
uploads []*api.GetUploadPartURLResponse // result of get upload URL calls
}
// newLargeUpload starts an upload of object o from in with metadata in src
func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) {
remote := src.Remote()
size := src.Size()
parts := size / int64(chunkSize)
if size%int64(chunkSize) != 0 {
parts++
}
if parts > maxParts {
return nil, errors.Errorf("%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size", remote, size, parts, maxParts)
}
modTime := src.ModTime()
opts := rest.Opts{
Method: "POST",
Path: "/b2_start_large_file",
}
bucketID, err := f.getBucketID()
if err != nil {
return nil, err
}
var request = api.StartLargeFileRequest{
BucketID: bucketID,
Name: remote,
ContentType: fs.MimeType(src),
Info: map[string]string{
timeKey: timeString(modTime),
},
}
// Set the SHA1 if known
if calculatedSha1, err := src.Hash(fs.HashSHA1); err == nil && calculatedSha1 != "" {
request.Info[sha1Key] = calculatedSha1
}
var response api.StartLargeFileResponse
err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
})
if err != nil {
return nil, err
}
up = &largeUpload{
f: f,
o: o,
in: in,
id: response.ID,
size: size,
parts: parts,
sha1s: make([]string, parts),
}
return up, nil
}
// getUploadURL returns the upload info with the UploadURL and the AuthorizationToken
//
// This should be returned with returnUploadURL when finished
func (up *largeUpload) getUploadURL() (upload *api.GetUploadPartURLResponse, err error) {
up.uploadMu.Lock()
defer up.uploadMu.Unlock()
if len(up.uploads) == 0 {
opts := rest.Opts{
Method: "POST",
Path: "/b2_get_upload_part_url",
}
var request = api.GetUploadPartURLRequest{
ID: up.id,
}
err := up.f.pacer.Call(func() (bool, error) {
resp, err := up.f.srv.CallJSON(&opts, &request, &upload)
return up.f.shouldRetry(resp, err)
})
if err != nil {
return nil, errors.Wrap(err, "failed to get upload URL")
}
} else {
upload, up.uploads = up.uploads[0], up.uploads[1:]
}
return upload, nil
}
// returnUploadURL returns the UploadURL to the cache
func (up *largeUpload) returnUploadURL(upload *api.GetUploadPartURLResponse) {
if upload == nil {
return
}
up.uploadMu.Lock()
up.uploads = append(up.uploads, upload)
up.uploadMu.Unlock()
}
// clearUploadURL clears the current UploadURL and the AuthorizationToken
func (up *largeUpload) clearUploadURL() {
up.uploadMu.Lock()
up.uploads = nil
up.uploadMu.Unlock()
}
// Transfer a chunk
func (up *largeUpload) transferChunk(part int64, body []byte) error {
calculatedSHA1 := fmt.Sprintf("%x", sha1.Sum(body))
up.sha1s[part-1] = calculatedSHA1
size := int64(len(body))
err := up.f.pacer.Call(func() (bool, error) {
fs.Debug(up.o, "Sending chunk %d length %d", part, len(body))
// Get upload URL
upload, err := up.getUploadURL()
if err != nil {
return false, err
}
// Authorization
//
// An upload authorization token, from b2_get_upload_part_url.
//
// X-Bz-Part-Number
//
// A number from 1 to 10000. The parts uploaded for one file
// must have contiguous numbers, starting with 1.
//
// Content-Length
//
// The number of bytes in the file being uploaded. Note that
// this header is required; you cannot leave it out and just
// use chunked encoding. The minimum size of every part but
// the last one is 100MB.
//
// X-Bz-Content-Sha1
//
// The SHA1 checksum of the this part of the file. B2 will
// check this when the part is uploaded, to make sure that the
// data arrived correctly. The same SHA1 checksum must be
// passed to b2_finish_large_file.
opts := rest.Opts{
Method: "POST",
Absolute: true,
Path: upload.UploadURL,
Body: bytes.NewBuffer(body),
ExtraHeaders: map[string]string{
"Authorization": upload.AuthorizationToken,
"X-Bz-Part-Number": fmt.Sprintf("%d", part),
sha1Header: calculatedSHA1,
},
ContentLength: &size,
}
var response api.UploadPartResponse
resp, err := up.f.srv.CallJSON(&opts, nil, &response)
if err == nil && resp != nil && resp.StatusCode == 401 {
err = errorUploadPartTokenExpired
fs.Debug(up.o, "%v", err)
// Refetch upload part URLs and ditch this current one
up.clearUploadURL()
return true, err
}
up.returnUploadURL(upload)
return up.f.shouldRetryNoReauth(resp, err)
})
return err
}
// finish closes off the large upload
func (up *largeUpload) finish() error {
opts := rest.Opts{
Method: "POST",
Path: "/b2_finish_large_file",
}
var request = api.FinishLargeFileRequest{
ID: up.id,
SHA1s: up.sha1s,
}
var response api.FileInfo
err := up.f.pacer.Call(func() (bool, error) {
resp, err := up.f.srv.CallJSON(&opts, &request, &response)
return up.f.shouldRetry(resp, err)
})
if err != nil {
return err
}
return up.o.decodeMetaDataFileInfo(&response)
}
// Upload uploads the chunks from the input
func (up *largeUpload) Upload() error {
fs.Debug(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id)
buf := make([]byte, chunkSize)
remaining := up.size
for part := int64(1); part <= up.parts; part++ {
reqSize := remaining
if reqSize >= int64(chunkSize) {
reqSize = int64(chunkSize)
} else {
buf = buf[:reqSize]
}
// FIXME could parallelise this
// Read the chunk
_, err := io.ReadFull(up.in, buf)
if err != nil {
return err
}
// Transfer the chunk
err = up.transferChunk(part, buf)
if err != nil {
return err
}
remaining -= reqSize
}
return up.finish()
}

View file

@ -1,7 +1,7 @@
---
title: "B2"
description: "Backblaze B2"
date: "2015-12-29"
date: "2016-06-15"
---
<i class="fa fa-fire"></i>Backblaze B2
@ -106,6 +106,9 @@ method to set the modification time independent of doing an upload.
The SHA1 checksums of the files are checked on upload and download and
will be used in the syncing process. You can use the `--checksum` flag.
Large files which are uploaded in chunks will store their SHA1 on the
object as `X-Bz-Info-large_file_sha1` as recommended by Backblaze.
### Versions ###
When rclone uploads a new version of a file it creates a [new version
@ -130,8 +133,26 @@ depending on your hardware, how big the files are, how much you want
to load your computer, etc. The default of `--transfers 4` is
definitely too low for Backblaze B2 though.
### Specific options ###
Here are the command line options specific to this cloud storage
system.
#### --b2-chunk-size valuee=SIZE ####
When uploading large files chunk the file into this size. Note that
these chunks are buffered in memory. 100,000,000 Bytes is the minimim
size (default 96M).
#### --b2-upload-cutoff=SIZE ####
Cutoff for switching to chunked upload (default 4.657GiB ==
5GB). Files above this size will be uploaded in chunks of
`--b2-chunk-size`. The default value is the largest file which can be
uploaded without chunks.
### API ###
Here are [some notes I made on the backblaze
API](https://gist.github.com/ncw/166dabf352b399f1cc1c) while
integrating it with rclone which detail the changes I'd like to see.
integrating it with rclone.

View file

@ -169,7 +169,7 @@ func Equal(src, dst Object) bool {
}
// MimeType returns a guess at the mime type from the extension
func MimeType(o Object) string {
func MimeType(o ObjectInfo) string {
mimeType := mime.TypeByExtension(path.Ext(o.Remote()))
if !strings.ContainsRune(mimeType, '/') {
mimeType = "application/octet-stream"