// Package b2 provides an interface to the Backblaze B2 object storage system package b2 // FIXME should we remove sha1 checks from here as rclone now supports // checking SHA1s? import ( "bytes" "crypto/sha1" "fmt" "hash" "io" "io/ioutil" "net/http" "os" "path" "regexp" "strconv" "strings" "sync" "time" "github.com/ncw/rclone/b2/api" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/pacer" "github.com/ncw/rclone/rest" "github.com/pkg/errors" "github.com/spf13/pflag" ) const ( defaultEndpoint = "https://api.backblaze.com" headerPrefix = "x-bz-info-" // lower case as that is what the server returns timeKey = "src_last_modified_millis" timeHeader = headerPrefix + timeKey sha1Key = "large_file_sha1" sha1Header = "X-Bz-Content-Sha1" testModeHeader = "X-Bz-Test-Mode" retryAfterHeader = "Retry-After" minSleep = 10 * time.Millisecond maxSleep = 5 * time.Minute decayConstant = 1 // bigger for slower decay, exponential maxParts = 10000 maxVersions = 100 // maximum number of versions we search in --b2-versions mode ) // Globals var ( minChunkSize = fs.SizeSuffix(100E6) chunkSize = fs.SizeSuffix(96 * 1024 * 1024) uploadCutoff = fs.SizeSuffix(200E6) b2TestMode = pflag.StringP("b2-test-mode", "", "", "A flag string for X-Bz-Test-Mode header.") b2Versions = pflag.BoolP("b2-versions", "", false, "Include old versions in directory listings.") errNotWithVersions = errors.New("can't modify or delete files in --b2-versions mode") ) // Register with Fs func init() { fs.Register(&fs.RegInfo{ Name: "b2", Description: "Backblaze B2", NewFs: NewFs, Options: []fs.Option{{ Name: "account", Help: "Account ID", }, { Name: "key", Help: "Application Key", }, { Name: "endpoint", Help: "Endpoint for the service - leave blank normally.", }, }, }) pflag.VarP(&uploadCutoff, "b2-upload-cutoff", "", "Cutoff for switching to chunked upload") pflag.VarP(&chunkSize, "b2-chunk-size", "", "Upload chunk size. Must fit in memory.") } // Fs represents a remote b2 server type Fs struct { name string // name of this remote account string // account name key string // auth key endpoint string // name of the starting api endpoint srv *rest.Client // the connection to the b2 server bucket string // the bucket we are working on bucketIDMutex sync.Mutex // mutex to protect _bucketID _bucketID string // the ID of the bucket we are working on root string // the path we are working on if any info api.AuthorizeAccountResponse // result of authorize call uploadMu sync.Mutex // lock for upload variable uploads []*api.GetUploadURLResponse // result of get upload URL calls authMu sync.Mutex // lock for authorizing the account pacer *pacer.Pacer // To pace and retry the API calls uploadTokens chan struct{} // control concurrency of uploads } // Object describes a b2 object type Object struct { fs *Fs // what this object is part of remote string // The remote path id string // b2 id of the file modTime time.Time // The modified time of the object if known sha1 string // SHA-1 hash if known size int64 // Size of the object } // ------------------------------------------------------------ // Name of the remote (as passed into NewFs) func (f *Fs) Name() string { return f.name } // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { if f.root == "" { return f.bucket } return f.bucket + "/" + f.root } // String converts this Fs to a string func (f *Fs) String() string { if f.root == "" { return fmt.Sprintf("B2 bucket %s", f.bucket) } return fmt.Sprintf("B2 bucket %s path %s", f.bucket, f.root) } // Pattern to match a b2 path var matcher = regexp.MustCompile(`^([^/]*)(.*)$`) // parseParse parses a b2 'url' func parsePath(path string) (bucket, directory string, err error) { parts := matcher.FindStringSubmatch(path) if parts == nil { err = errors.Errorf("couldn't find bucket in b2 path %q", path) } else { bucket, directory = parts[1], parts[2] directory = strings.Trim(directory, "/") } return } // retryErrorCodes is a slice of error codes that we will retry var retryErrorCodes = []int{ 401, // Unauthorized (eg "Token has expired") 408, // Request Timeout 429, // Rate exceeded. 500, // Get occasional 500 Internal Server Error 503, // Service Unavailable 504, // Gateway Time-out } // shouldRetryNoAuth returns a boolean as to whether this resp and err // deserve to be retried. It returns the err as a convenience func (f *Fs) shouldRetryNoReauth(resp *http.Response, err error) (bool, error) { // For 429 or 503 errors look at the Retry-After: header and // set the retry appropriately, starting with a minimum of 1 // second if it isn't set. if resp != nil && (resp.StatusCode == 429 || resp.StatusCode == 503) { var retryAfter = 1 retryAfterString := resp.Header.Get(retryAfterHeader) if retryAfterString != "" { var err error retryAfter, err = strconv.Atoi(retryAfterString) if err != nil { fs.ErrorLog(f, "Malformed %s header %q: %v", retryAfterHeader, retryAfterString, err) } } retryAfterDuration := time.Duration(retryAfter) * time.Second if f.pacer.GetSleep() < retryAfterDuration { fs.Debug(f, "Setting sleep to %v after error: %v", retryAfterDuration, err) // We set 1/2 the value here because the pacer will double it immediately f.pacer.SetSleep(retryAfterDuration / 2) } return true, err } return fs.ShouldRetry(err) || fs.ShouldRetryHTTP(resp, retryErrorCodes), err } // shouldRetry returns a boolean as to whether this resp and err // deserve to be retried. It returns the err as a convenience func (f *Fs) shouldRetry(resp *http.Response, err error) (bool, error) { if resp != nil && resp.StatusCode == 401 { fs.Debug(f, "Unauthorized: %v", err) // Reauth authErr := f.authorizeAccount() if authErr != nil { err = authErr } return true, err } return f.shouldRetryNoReauth(resp, err) } // errorHandler parses a non 2xx error response into an error func errorHandler(resp *http.Response) error { // Decode error response errResponse := new(api.Error) err := rest.DecodeJSON(resp, &errResponse) if err != nil { fs.Debug(nil, "Couldn't decode error response: %v", err) } if errResponse.Code == "" { errResponse.Code = "unknown" } if errResponse.Status == 0 { errResponse.Status = resp.StatusCode } if errResponse.Message == "" { errResponse.Message = "Unknown " + resp.Status } return errResponse } // NewFs contstructs an Fs from the path, bucket:path func NewFs(name, root string) (fs.Fs, error) { if uploadCutoff < chunkSize { return nil, errors.Errorf("b2: upload cutoff must be less than chunk size %v - was %v", chunkSize, uploadCutoff) } if chunkSize < minChunkSize { return nil, errors.Errorf("b2: chunk size can't be less than %v - was %v", minChunkSize, chunkSize) } bucket, directory, err := parsePath(root) if err != nil { return nil, err } account := fs.ConfigFile.MustValue(name, "account") if account == "" { return nil, errors.New("account not found") } key := fs.ConfigFile.MustValue(name, "key") if key == "" { return nil, errors.New("key not found") } endpoint := fs.ConfigFile.MustValue(name, "endpoint", defaultEndpoint) f := &Fs{ name: name, bucket: bucket, root: directory, account: account, key: key, endpoint: endpoint, srv: rest.NewClient(fs.Config.Client()).SetErrorHandler(errorHandler), pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), uploadTokens: make(chan struct{}, fs.Config.Transfers), } // Set the test flag if required if *b2TestMode != "" { testMode := strings.TrimSpace(*b2TestMode) f.srv.SetHeader(testModeHeader, testMode) fs.Debug(f, "Setting test header \"%s: %s\"", testModeHeader, testMode) } // Fill up the upload tokens for i := 0; i < fs.Config.Transfers; i++ { f.returnUploadToken() } err = f.authorizeAccount() if err != nil { return nil, errors.Wrap(err, "failed to authorize account") } if f.root != "" { f.root += "/" // Check to see if the (bucket,directory) is actually an existing file oldRoot := f.root remote := path.Base(directory) f.root = path.Dir(directory) if f.root == "." { f.root = "" } else { f.root += "/" } _, err := f.NewObject(remote) if err != nil { if err == fs.ErrorObjectNotFound { // File doesn't exist so return old f f.root = oldRoot return f, nil } return nil, err } // return an error with an fs which points to the parent return f, fs.ErrorIsFile } return f, nil } // authorizeAccount gets the API endpoint and auth token. Can be used // for reauthentication too. func (f *Fs) authorizeAccount() error { f.authMu.Lock() defer f.authMu.Unlock() opts := rest.Opts{ Absolute: true, Method: "GET", Path: f.endpoint + "/b2api/v1/b2_authorize_account", UserName: f.account, Password: f.key, ExtraHeaders: map[string]string{"Authorization": ""}, // unset the Authorization for this request } err := f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, nil, &f.info) return f.shouldRetryNoReauth(resp, err) }) if err != nil { return errors.Wrap(err, "failed to authenticate") } f.srv.SetRoot(f.info.APIURL+"/b2api/v1").SetHeader("Authorization", f.info.AuthorizationToken) return nil } // getUploadURL returns the upload info with the UploadURL and the AuthorizationToken // // This should be returned with returnUploadURL when finished func (f *Fs) getUploadURL() (upload *api.GetUploadURLResponse, err error) { f.uploadMu.Lock() defer f.uploadMu.Unlock() bucketID, err := f.getBucketID() if err != nil { return nil, err } if len(f.uploads) == 0 { opts := rest.Opts{ Method: "POST", Path: "/b2_get_upload_url", } var request = api.GetUploadURLRequest{ BucketID: bucketID, } err := f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &request, &upload) return f.shouldRetry(resp, err) }) if err != nil { return nil, errors.Wrap(err, "failed to get upload URL") } } else { upload, f.uploads = f.uploads[0], f.uploads[1:] } return upload, nil } // returnUploadURL returns the UploadURL to the cache func (f *Fs) returnUploadURL(upload *api.GetUploadURLResponse) { if upload == nil { return } f.uploadMu.Lock() f.uploads = append(f.uploads, upload) f.uploadMu.Unlock() } // clearUploadURL clears the current UploadURL and the AuthorizationToken func (f *Fs) clearUploadURL() { f.uploadMu.Lock() f.uploads = nil f.uploadMu.Unlock() } // Gets an upload token to control the concurrency func (f *Fs) getUploadToken() { <-f.uploadTokens } // Return an upload token func (f *Fs) returnUploadToken() { f.uploadTokens <- struct{}{} } // Return an Object from a path // // If it can't be found it returns the error fs.ErrorObjectNotFound. func (f *Fs) newObjectWithInfo(remote string, info *api.File) (fs.Object, error) { o := &Object{ fs: f, remote: remote, } if info != nil { err := o.decodeMetaData(info) if err != nil { return nil, err } } else { err := o.readMetaData() // reads info and headers, returning an error if err != nil { return nil, err } } return o, nil } // NewObject finds the Object at remote. If it can't be found // it returns the error fs.ErrorObjectNotFound. func (f *Fs) NewObject(remote string) (fs.Object, error) { return f.newObjectWithInfo(remote, nil) } // sendDir works out given a lastDir and a remote which directories should be sent func sendDir(lastDir string, remote string, level int) (dirNames []string, newLastDir string) { dir := path.Dir(remote) if dir == "." { // No slashes - nothing to do! return nil, lastDir } if dir == lastDir { // Still in same directory return nil, lastDir } newLastDir = lastDir for { slashes := strings.Count(dir, "/") if !strings.HasPrefix(lastDir, dir) && slashes < level { dirNames = append([]string{dir}, dirNames...) } if newLastDir == lastDir { newLastDir = dir } dir = path.Dir(dir) if dir == "." { break } } return dirNames, newLastDir } // listFn is called from list to handle an object type listFn func(remote string, object *api.File, isDirectory bool) error // errEndList is a sentinel used to end the list iteration now. // listFn should return it to end the iteration with no errors. var errEndList = errors.New("end list") // list lists the objects into the function supplied from // the bucket and root supplied // // level is the depth to search to // // If prefix is set then startFileName is used as a prefix which all // files must have // // If limit is > 0 then it limits to that many files (must be less // than 1000) // // If hidden is set then it will list the hidden (deleted) files too. func (f *Fs) list(dir string, level int, prefix string, limit int, hidden bool, fn listFn) error { root := f.root if dir != "" { root += dir + "/" } bucketID, err := f.getBucketID() if err != nil { return err } chunkSize := 1000 if limit > 0 { chunkSize = limit } var request = api.ListFileNamesRequest{ BucketID: bucketID, MaxFileCount: chunkSize, } prefix = root + prefix if prefix != "" { request.StartFileName = prefix } var response api.ListFileNamesResponse opts := rest.Opts{ Method: "POST", Path: "/b2_list_file_names", } if hidden { opts.Path = "/b2_list_file_versions" } lastDir := dir for { err := f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &request, &response) return f.shouldRetry(resp, err) }) if err != nil { return err } for i := range response.Files { file := &response.Files[i] // Finish if file name no longer has prefix if !strings.HasPrefix(file.Name, prefix) { return nil } remote := file.Name[len(f.root):] slashes := strings.Count(remote, "/") // Check if this file makes a new directories var dirNames []string dirNames, lastDir = sendDir(lastDir, remote, level) for _, dirName := range dirNames { err = fn(dirName, nil, true) if err != nil { if err == errEndList { return nil } return err } } // Send the file if slashes < level { err = fn(remote, file, false) if err != nil { if err == errEndList { return nil } return err } } } // end if no NextFileName if response.NextFileName == nil { break } request.StartFileName = *response.NextFileName if response.NextFileID != nil { request.StartFileID = *response.NextFileID } } return nil } // listFiles walks the path returning files and directories to out func (f *Fs) listFiles(out fs.ListOpts, dir string) { defer out.Finished() // List the objects last := "" err := f.list(dir, out.Level(), "", 0, *b2Versions, func(remote string, object *api.File, isDirectory bool) error { if isDirectory { dir := &fs.Dir{ Name: remote, Bytes: -1, Count: -1, } if out.AddDir(dir) { return fs.ErrorListAborted } } else { if remote == last { remote = object.UploadTimestamp.AddVersion(remote) } else { last = remote } // hide objects represent deleted files which we don't list if object.Action == "hide" { return nil } o, err := f.newObjectWithInfo(remote, object) if err != nil { return err } if out.Add(o) { return fs.ErrorListAborted } } return nil }) if err != nil { out.SetError(err) } } // listBuckets returns all the buckets to out func (f *Fs) listBuckets(out fs.ListOpts, dir string) { defer out.Finished() if dir != "" { out.SetError(fs.ErrorListOnlyRoot) return } err := f.listBucketsToFn(func(bucket *api.Bucket) error { dir := &fs.Dir{ Name: bucket.Name, Bytes: -1, Count: -1, } if out.AddDir(dir) { return fs.ErrorListAborted } return nil }) if err != nil { out.SetError(err) } } // List walks the path returning files and directories to out func (f *Fs) List(out fs.ListOpts, dir string) { if f.bucket == "" { f.listBuckets(out, dir) } else { f.listFiles(out, dir) } return } // listBucketFn is called from listBucketsToFn to handle a bucket type listBucketFn func(*api.Bucket) error // listBucketsToFn lists the buckets to the function supplied func (f *Fs) listBucketsToFn(fn listBucketFn) error { var account = api.Account{ID: f.info.AccountID} var response api.ListBucketsResponse opts := rest.Opts{ Method: "POST", Path: "/b2_list_buckets", } err := f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &account, &response) return f.shouldRetry(resp, err) }) if err != nil { return err } for i := range response.Buckets { err = fn(&response.Buckets[i]) if err != nil { return err } } return nil } // getBucketID finds the ID for the current bucket name func (f *Fs) getBucketID() (bucketID string, err error) { f.bucketIDMutex.Lock() defer f.bucketIDMutex.Unlock() if f._bucketID != "" { return f._bucketID, nil } err = f.listBucketsToFn(func(bucket *api.Bucket) error { if bucket.Name == f.bucket { bucketID = bucket.ID } return nil }) if bucketID == "" { err = fs.ErrorDirNotFound } f._bucketID = bucketID return bucketID, err } // setBucketID sets the ID for the current bucket name func (f *Fs) setBucketID(ID string) { f.bucketIDMutex.Lock() f._bucketID = ID f.bucketIDMutex.Unlock() } // clearBucketID clears the ID for the current bucket name func (f *Fs) clearBucketID() { f.bucketIDMutex.Lock() f._bucketID = "" f.bucketIDMutex.Unlock() } // Put the object into the bucket // // Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { // Temporary Object under construction fs := &Object{ fs: f, remote: src.Remote(), } return fs, fs.Update(in, src) } // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir() error { opts := rest.Opts{ Method: "POST", Path: "/b2_create_bucket", } var request = api.CreateBucketRequest{ AccountID: f.info.AccountID, Name: f.bucket, Type: "allPrivate", } var response api.Bucket err := f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &request, &response) return f.shouldRetry(resp, err) }) if err != nil { if apiErr, ok := err.(*api.Error); ok { if apiErr.Code == "duplicate_bucket_name" { return nil } } return errors.Wrap(err, "failed to create bucket") } f.setBucketID(response.ID) return nil } // Rmdir deletes the bucket if the fs is at the root // // Returns an error if it isn't empty func (f *Fs) Rmdir() error { if f.root != "" { return nil } opts := rest.Opts{ Method: "POST", Path: "/b2_delete_bucket", } bucketID, err := f.getBucketID() if err != nil { return err } var request = api.DeleteBucketRequest{ ID: bucketID, AccountID: f.info.AccountID, } var response api.Bucket err = f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &request, &response) return f.shouldRetry(resp, err) }) if err != nil { return errors.Wrap(err, "failed to delete bucket") } f.clearBucketID() f.clearUploadURL() return nil } // Precision of the remote func (f *Fs) Precision() time.Duration { return time.Millisecond } // deleteByID deletes a file version given Name and ID func (f *Fs) deleteByID(ID, Name string) error { opts := rest.Opts{ Method: "POST", Path: "/b2_delete_file_version", } var request = api.DeleteFileRequest{ ID: ID, Name: Name, } var response api.File err := f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &request, &response) return f.shouldRetry(resp, err) }) if err != nil { return errors.Wrapf(err, "failed to delete %q", Name) } return nil } // purge deletes all the files and directories // // if oldOnly is true then it deletes only non current files. // // Implemented here so we can make sure we delete old versions. func (f *Fs) purge(oldOnly bool) error { var errReturn error var checkErrMutex sync.Mutex var checkErr = func(err error) { if err == nil { return } checkErrMutex.Lock() defer checkErrMutex.Unlock() if errReturn == nil { errReturn = err } } // Delete Config.Transfers in parallel toBeDeleted := make(chan *api.File, fs.Config.Transfers) var wg sync.WaitGroup wg.Add(fs.Config.Transfers) for i := 0; i < fs.Config.Transfers; i++ { go func() { defer wg.Done() for object := range toBeDeleted { fs.Stats.Transferring(object.Name) checkErr(f.deleteByID(object.ID, object.Name)) fs.Stats.DoneTransferring(object.Name) } }() } last := "" checkErr(f.list("", fs.MaxLevel, "", 0, true, func(remote string, object *api.File, isDirectory bool) error { if !isDirectory { fs.Stats.Checking(remote) if oldOnly && last != remote { fs.Debug(remote, "Not deleting current version (id %q) %q", object.ID, object.Action) } else { fs.Debug(remote, "Deleting (id %q)", object.ID) toBeDeleted <- object } last = remote fs.Stats.DoneChecking(remote) } return nil })) close(toBeDeleted) wg.Wait() if !oldOnly { checkErr(f.Rmdir()) } return errReturn } // Purge deletes all the files and directories including the old versions. func (f *Fs) Purge() error { return f.purge(false) } // CleanUp deletes all the hidden files. func (f *Fs) CleanUp() error { return f.purge(true) } // Hashes returns the supported hash sets. func (f *Fs) Hashes() fs.HashSet { return fs.HashSet(fs.HashSHA1) } // ------------------------------------------------------------ // Fs returns the parent Fs func (o *Object) Fs() fs.Info { return o.fs } // Return a string version func (o *Object) String() string { if o == nil { return "" } return o.remote } // Remote returns the remote path func (o *Object) Remote() string { return o.remote } // Hash returns the Sha-1 of an object returning a lowercase hex string func (o *Object) Hash(t fs.HashType) (string, error) { if t != fs.HashSHA1 { return "", fs.ErrHashUnsupported } if o.sha1 == "" { // Error is logged in readMetaData err := o.readMetaData() if err != nil { return "", err } } return o.sha1, nil } // Size returns the size of an object in bytes func (o *Object) Size() int64 { return o.size } // decodeMetaDataRaw sets the metadata from the data passed in // // Sets // o.id // o.modTime // o.size // o.sha1 func (o *Object) decodeMetaDataRaw(ID, SHA1 string, Size int64, UploadTimestamp api.Timestamp, Info map[string]string) (err error) { o.id = ID o.sha1 = SHA1 // Read SHA1 from metadata if it exists and isn't set if o.sha1 == "" || o.sha1 == "none" { o.sha1 = Info[sha1Key] } o.size = Size // Use the UploadTimestamp if can't get file info o.modTime = time.Time(UploadTimestamp) return o.parseTimeString(Info[timeKey]) } // decodeMetaData sets the metadata in the object from an api.File // // Sets // o.id // o.modTime // o.size // o.sha1 func (o *Object) decodeMetaData(info *api.File) (err error) { return o.decodeMetaDataRaw(info.ID, info.SHA1, info.Size, info.UploadTimestamp, info.Info) } // decodeMetaDataFileInfo sets the metadata in the object from an api.FileInfo // // Sets // o.id // o.modTime // o.size // o.sha1 func (o *Object) decodeMetaDataFileInfo(info *api.FileInfo) (err error) { return o.decodeMetaDataRaw(info.ID, info.SHA1, info.Size, info.UploadTimestamp, info.Info) } // readMetaData gets the metadata if it hasn't already been fetched // // Sets // o.id // o.modTime // o.size // o.sha1 func (o *Object) readMetaData() (err error) { if o.id != "" { return nil } maxSearched := 1 var timestamp api.Timestamp baseRemote := o.remote if *b2Versions { timestamp, baseRemote = api.RemoveVersion(baseRemote) maxSearched = maxVersions } var info *api.File err = o.fs.list("", fs.MaxLevel, baseRemote, maxSearched, *b2Versions, func(remote string, object *api.File, isDirectory bool) error { if isDirectory { return nil } if remote == baseRemote { if !timestamp.IsZero() && !timestamp.Equal(object.UploadTimestamp) { return nil } info = object } return errEndList // read only 1 item }) if err != nil { if err == fs.ErrorDirNotFound { return fs.ErrorObjectNotFound } return err } if info == nil { return fs.ErrorObjectNotFound } return o.decodeMetaData(info) } // timeString returns modTime as the number of milliseconds // elapsed since January 1, 1970 UTC as a decimal string. func timeString(modTime time.Time) string { return strconv.FormatInt(modTime.UnixNano()/1E6, 10) } // parseTimeString converts a decimal string number of milliseconds // elapsed since January 1, 1970 UTC into a time.Time and stores it in // the modTime variable. func (o *Object) parseTimeString(timeString string) (err error) { if timeString == "" { return nil } unixMilliseconds, err := strconv.ParseInt(timeString, 10, 64) if err != nil { fs.Debug(o, "Failed to parse mod time string %q: %v", timeString, err) return err } o.modTime = time.Unix(unixMilliseconds/1E3, (unixMilliseconds%1E3)*1E6).UTC() return nil } // ModTime returns the modification time of the object // // It attempts to read the objects mtime and if that isn't present the // LastModified returned in the http headers // // SHA-1 will also be updated once the request has completed. func (o *Object) ModTime() (result time.Time) { // The error is logged in readMetaData _ = o.readMetaData() return o.modTime } // SetModTime sets the modification time of the local fs object func (o *Object) SetModTime(modTime time.Time) error { // Not possible with B2 return fs.ErrorCantSetModTime } // Storable returns if this object is storable func (o *Object) Storable() bool { return true } // openFile represents an Object open for reading type openFile struct { o *Object // Object we are reading for resp *http.Response // response of the GET body io.Reader // reading from here hash hash.Hash // currently accumulating SHA1 bytes int64 // number of bytes read on this connection eof bool // whether we have read end of file } // newOpenFile wraps an io.ReadCloser and checks the sha1sum func newOpenFile(o *Object, resp *http.Response) *openFile { file := &openFile{ o: o, resp: resp, hash: sha1.New(), } file.body = io.TeeReader(resp.Body, file.hash) return file } // Read bytes from the object - see io.Reader func (file *openFile) Read(p []byte) (n int, err error) { n, err = file.body.Read(p) file.bytes += int64(n) if err == io.EOF { file.eof = true } return } // Close the object and checks the length and SHA1 if all the object // was read func (file *openFile) Close() (err error) { // Close the body at the end defer fs.CheckClose(file.resp.Body, &err) // If not end of file then can't check SHA1 if !file.eof { return nil } // Check to see we read the correct number of bytes if file.o.Size() != file.bytes { return errors.Errorf("object corrupted on transfer - length mismatch (want %d got %d)", file.o.Size(), file.bytes) } // Check the SHA1 receivedSHA1 := file.resp.Header.Get(sha1Header) calculatedSHA1 := fmt.Sprintf("%x", file.hash.Sum(nil)) if receivedSHA1 != calculatedSHA1 { return errors.Errorf("object corrupted on transfer - SHA1 mismatch (want %q got %q)", receivedSHA1, calculatedSHA1) } return nil } // Check it satisfies the interfaces var _ io.ReadCloser = &openFile{} // Open an object for read func (o *Object) Open() (in io.ReadCloser, err error) { opts := rest.Opts{ Method: "GET", Absolute: true, Path: o.fs.info.DownloadURL, } // Download by id if set otherwise by name if o.id != "" { opts.Path += "/b2api/v1/b2_download_file_by_id?fileId=" + urlEncode(o.id) } else { opts.Path += "/file/" + urlEncode(o.fs.bucket) + "/" + urlEncode(o.fs.root+o.remote) } var resp *http.Response err = o.fs.pacer.Call(func() (bool, error) { resp, err = o.fs.srv.Call(&opts) return o.fs.shouldRetry(resp, err) }) if err != nil { return nil, errors.Wrap(err, "failed to open for download") } // Parse the time out of the headers if possible err = o.parseTimeString(resp.Header.Get(timeHeader)) if err != nil { _ = resp.Body.Close() return nil, err } if o.sha1 == "" { o.sha1 = resp.Header.Get(sha1Header) } return newOpenFile(o, resp), nil } // dontEncode is the characters that do not need percent-encoding // // The characters that do not need percent-encoding are a subset of // the printable ASCII characters: upper-case letters, lower-case // letters, digits, ".", "_", "-", "/", "~", "!", "$", "'", "(", ")", // "*", ";", "=", ":", and "@". All other byte values in a UTF-8 must // be replaced with "%" and the two-digit hex value of the byte. const dontEncode = (`abcdefghijklmnopqrstuvwxyz` + `ABCDEFGHIJKLMNOPQRSTUVWXYZ` + `0123456789` + `._-/~!$'()*;=:@`) // noNeedToEncode is a bitmap of characters which don't need % encoding var noNeedToEncode [256]bool func init() { for _, c := range dontEncode { noNeedToEncode[c] = true } } // urlEncode encodes in with % encoding func urlEncode(in string) string { var out bytes.Buffer for i := 0; i < len(in); i++ { c := in[i] if noNeedToEncode[c] { _ = out.WriteByte(c) } else { _, _ = out.WriteString(fmt.Sprintf("%%%2X", c)) } } return out.String() } // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { if *b2Versions { return errNotWithVersions } size := src.Size() // If a large file upload in chunks - see upload.go if size >= int64(uploadCutoff) { up, err := o.fs.newLargeUpload(o, in, src) if err != nil { return err } return up.Upload() } modTime := src.ModTime() calculatedSha1, _ := src.Hash(fs.HashSHA1) // If source cannot provide the hash, copy to a temporary file // and calculate the hash while doing so. // Then we serve the temporary file. if calculatedSha1 == "" { // Open a temp file to copy the input fd, err := ioutil.TempFile("", "rclone-b2-") if err != nil { return err } _ = os.Remove(fd.Name()) // Delete the file - may not work on Windows defer func() { _ = fd.Close() // Ignore error may have been closed already _ = os.Remove(fd.Name()) // Delete the file - may have been deleted already }() // Copy the input while calculating the sha1 hash := sha1.New() teed := io.TeeReader(in, hash) n, err := io.Copy(fd, teed) if err != nil { return err } if n != size { return errors.Errorf("read %d bytes expecting %d", n, size) } calculatedSha1 = fmt.Sprintf("%x", hash.Sum(nil)) // Rewind the temporary file _, err = fd.Seek(0, 0) if err != nil { return err } // Set input to temporary file in = fd } // Get upload Token o.fs.getUploadToken() defer o.fs.returnUploadToken() // Get upload URL upload, err := o.fs.getUploadURL() if err != nil { return err } defer o.fs.returnUploadURL(upload) // Headers for upload file // // Authorization // required // An upload authorization token, from b2_get_upload_url. // // X-Bz-File-Name // required // // The name of the file, in percent-encoded UTF-8. See Files for requirements on file names. See String Encoding. // // Content-Type // required // // The MIME type of the content of the file, which will be returned in // the Content-Type header when downloading the file. Use the // Content-Type b2/x-auto to automatically set the stored Content-Type // post upload. In the case where a file extension is absent or the // lookup fails, the Content-Type is set to application/octet-stream. The // Content-Type mappings can be purused here. // // X-Bz-Content-Sha1 // required // // The SHA1 checksum of the content of the file. B2 will check this when // the file is uploaded, to make sure that the file arrived correctly. It // will be returned in the X-Bz-Content-Sha1 header when the file is // downloaded. // // X-Bz-Info-src_last_modified_millis // optional // // If the original source of the file being uploaded has a last modified // time concept, Backblaze recommends using this spelling of one of your // ten X-Bz-Info-* headers (see below). Using a standard spelling allows // different B2 clients and the B2 web user interface to interoperate // correctly. The value should be a base 10 number which represents a UTC // time when the original source file was last modified. It is a base 10 // number of milliseconds since midnight, January 1, 1970 UTC. This fits // in a 64 bit integer such as the type "long" in the programming // language Java. It is intended to be compatible with Java's time // long. For example, it can be passed directly into the Java call // Date.setTime(long time). // // X-Bz-Info-* // optional // // Up to 10 of these headers may be present. The * part of the header // name is replace with the name of a custom field in the file // information stored with the file, and the value is an arbitrary UTF-8 // string, percent-encoded. The same info headers sent with the upload // will be returned with the download. opts := rest.Opts{ Method: "POST", Absolute: true, Path: upload.UploadURL, Body: in, ExtraHeaders: map[string]string{ "Authorization": upload.AuthorizationToken, "X-Bz-File-Name": urlEncode(o.fs.root + o.remote), "Content-Type": fs.MimeType(o), sha1Header: calculatedSha1, timeHeader: timeString(modTime), }, ContentLength: &size, } var response api.FileInfo // Don't retry, return a retry error instead err = o.fs.pacer.CallNoRetry(func() (bool, error) { resp, err := o.fs.srv.CallJSON(&opts, nil, &response) retry, err := o.fs.shouldRetryNoReauth(resp, err) // On retryable error clear UploadURL if retry { fs.Debug(o, "Clearing upload URL because of error: %v", err) upload = nil } return retry, err }) if err != nil { return err } return o.decodeMetaDataFileInfo(&response) } // Remove an object func (o *Object) Remove() error { if *b2Versions { return errNotWithVersions } bucketID, err := o.fs.getBucketID() if err != nil { return err } opts := rest.Opts{ Method: "POST", Path: "/b2_hide_file", } var request = api.HideFileRequest{ BucketID: bucketID, Name: o.fs.root + o.remote, } var response api.File err = o.fs.pacer.Call(func() (bool, error) { resp, err := o.fs.srv.CallJSON(&opts, &request, &response) return o.fs.shouldRetry(resp, err) }) if err != nil { return errors.Wrap(err, "failed to delete file") } return nil } // Check the interfaces are satisfied var ( _ fs.Fs = &Fs{} _ fs.Purger = &Fs{} _ fs.CleanUpper = &Fs{} _ fs.Object = &Object{} )