forked from TrueCloudLab/rclone
8243ff8bc8
Introduce stats groups that will isolate accounting for logically different transferring operations. That way multiple accounting operations can be done in parallel without interfering with each other stats. Using groups is optional. There is dedicated global stats that will be used by default if no group is specified. This is operating mode for CLI usage which is just fire and forget operation. For running rclone as rc http server each request will create it's own group. Also there is an option to specify your own group.
1795 lines
50 KiB
Go
1795 lines
50 KiB
Go
// Package b2 provides an interface to the Backblaze B2 object storage system
|
|
package b2
|
|
|
|
// FIXME should we remove sha1 checks from here as rclone now supports
|
|
// checking SHA1s?
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha1"
|
|
"fmt"
|
|
gohash "hash"
|
|
"io"
|
|
"net/http"
|
|
"path"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ncw/rclone/backend/b2/api"
|
|
"github.com/ncw/rclone/fs"
|
|
"github.com/ncw/rclone/fs/accounting"
|
|
"github.com/ncw/rclone/fs/config/configmap"
|
|
"github.com/ncw/rclone/fs/config/configstruct"
|
|
"github.com/ncw/rclone/fs/fserrors"
|
|
"github.com/ncw/rclone/fs/fshttp"
|
|
"github.com/ncw/rclone/fs/hash"
|
|
"github.com/ncw/rclone/fs/walk"
|
|
"github.com/ncw/rclone/lib/pacer"
|
|
"github.com/ncw/rclone/lib/rest"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const (
|
|
defaultEndpoint = "https://api.backblazeb2.com"
|
|
headerPrefix = "x-bz-info-" // lower case as that is what the server returns
|
|
timeKey = "src_last_modified_millis"
|
|
timeHeader = headerPrefix + timeKey
|
|
sha1Key = "large_file_sha1"
|
|
sha1Header = "X-Bz-Content-Sha1"
|
|
sha1InfoHeader = headerPrefix + sha1Key
|
|
testModeHeader = "X-Bz-Test-Mode"
|
|
retryAfterHeader = "Retry-After"
|
|
minSleep = 10 * time.Millisecond
|
|
maxSleep = 5 * time.Minute
|
|
decayConstant = 1 // bigger for slower decay, exponential
|
|
maxParts = 10000
|
|
maxVersions = 100 // maximum number of versions we search in --b2-versions mode
|
|
minChunkSize = 5 * fs.MebiByte
|
|
defaultChunkSize = 96 * fs.MebiByte
|
|
defaultUploadCutoff = 200 * fs.MebiByte
|
|
)
|
|
|
|
// Globals
|
|
var (
|
|
errNotWithVersions = errors.New("can't modify or delete files in --b2-versions mode")
|
|
)
|
|
|
|
// Register with Fs
|
|
func init() {
|
|
fs.Register(&fs.RegInfo{
|
|
Name: "b2",
|
|
Description: "Backblaze B2",
|
|
NewFs: NewFs,
|
|
Options: []fs.Option{{
|
|
Name: "account",
|
|
Help: "Account ID or Application Key ID",
|
|
Required: true,
|
|
}, {
|
|
Name: "key",
|
|
Help: "Application Key",
|
|
Required: true,
|
|
}, {
|
|
Name: "endpoint",
|
|
Help: "Endpoint for the service.\nLeave blank normally.",
|
|
Advanced: true,
|
|
}, {
|
|
Name: "test_mode",
|
|
Help: `A flag string for X-Bz-Test-Mode header for debugging.
|
|
|
|
This is for debugging purposes only. Setting it to one of the strings
|
|
below will cause b2 to return specific errors:
|
|
|
|
* "fail_some_uploads"
|
|
* "expire_some_account_authorization_tokens"
|
|
* "force_cap_exceeded"
|
|
|
|
These will be set in the "X-Bz-Test-Mode" header which is documented
|
|
in the [b2 integrations checklist](https://www.backblaze.com/b2/docs/integration_checklist.html).`,
|
|
Default: "",
|
|
Hide: fs.OptionHideConfigurator,
|
|
Advanced: true,
|
|
}, {
|
|
Name: "versions",
|
|
Help: "Include old versions in directory listings.\nNote that when using this no file write operations are permitted,\nso you can't upload files or delete them.",
|
|
Default: false,
|
|
Advanced: true,
|
|
}, {
|
|
Name: "hard_delete",
|
|
Help: "Permanently delete files on remote removal, otherwise hide files.",
|
|
Default: false,
|
|
}, {
|
|
Name: "upload_cutoff",
|
|
Help: `Cutoff for switching to chunked upload.
|
|
|
|
Files above this size will be uploaded in chunks of "--b2-chunk-size".
|
|
|
|
This value should be set no larger than 4.657GiB (== 5GB).`,
|
|
Default: defaultUploadCutoff,
|
|
Advanced: true,
|
|
}, {
|
|
Name: "chunk_size",
|
|
Help: `Upload chunk size. Must fit in memory.
|
|
|
|
When uploading large files, chunk the file into this size. Note that
|
|
these chunks are buffered in memory and there might a maximum of
|
|
"--transfers" chunks in progress at once. 5,000,000 Bytes is the
|
|
minimum size.`,
|
|
Default: defaultChunkSize,
|
|
Advanced: true,
|
|
}, {
|
|
Name: "disable_checksum",
|
|
Help: `Disable checksums for large (> upload cutoff) files`,
|
|
Default: false,
|
|
Advanced: true,
|
|
}, {
|
|
Name: "download_url",
|
|
Help: `Custom endpoint for downloads.
|
|
|
|
This is usually set to a Cloudflare CDN URL as Backblaze offers
|
|
free egress for data downloaded through the Cloudflare network.
|
|
This is probably only useful for a public bucket.
|
|
Leave blank if you want to use the endpoint provided by Backblaze.`,
|
|
Advanced: true,
|
|
}, {
|
|
Name: "download_auth_duration",
|
|
Help: `Time before the authorization token will expire in s or suffix ms|s|m|h|d.
|
|
|
|
The duration before the download authorization token will expire.
|
|
The minimum value is 1 second. The maximum value is one week.`,
|
|
Default: fs.Duration(7 * 24 * time.Hour),
|
|
Advanced: true,
|
|
}},
|
|
})
|
|
}
|
|
|
|
// Options defines the configuration for this backend
|
|
type Options struct {
|
|
Account string `config:"account"`
|
|
Key string `config:"key"`
|
|
Endpoint string `config:"endpoint"`
|
|
TestMode string `config:"test_mode"`
|
|
Versions bool `config:"versions"`
|
|
HardDelete bool `config:"hard_delete"`
|
|
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
|
|
ChunkSize fs.SizeSuffix `config:"chunk_size"`
|
|
DisableCheckSum bool `config:"disable_checksum"`
|
|
DownloadURL string `config:"download_url"`
|
|
DownloadAuthorizationDuration fs.Duration `config:"download_auth_duration"`
|
|
}
|
|
|
|
// Fs represents a remote b2 server
|
|
type Fs struct {
|
|
name string // name of this remote
|
|
root string // the path we are working on if any
|
|
opt Options // parsed config options
|
|
features *fs.Features // optional features
|
|
srv *rest.Client // the connection to the b2 server
|
|
bucket string // the bucket we are working on
|
|
bucketOKMu sync.Mutex // mutex to protect bucket OK
|
|
bucketOK bool // true if we have created the bucket
|
|
bucketIDMutex sync.Mutex // mutex to protect _bucketID
|
|
_bucketID string // the ID of the bucket we are working on
|
|
bucketTypeMutex sync.Mutex // mutex to protect _bucketType
|
|
_bucketType string // the Type of the bucket we are working on
|
|
info api.AuthorizeAccountResponse // result of authorize call
|
|
uploadMu sync.Mutex // lock for upload variable
|
|
uploads []*api.GetUploadURLResponse // result of get upload URL calls
|
|
authMu sync.Mutex // lock for authorizing the account
|
|
pacer *fs.Pacer // To pace and retry the API calls
|
|
bufferTokens chan []byte // control concurrency of multipart uploads
|
|
}
|
|
|
|
// Object describes a b2 object
|
|
type Object struct {
|
|
fs *Fs // what this object is part of
|
|
remote string // The remote path
|
|
id string // b2 id of the file
|
|
modTime time.Time // The modified time of the object if known
|
|
sha1 string // SHA-1 hash if known
|
|
size int64 // Size of the object
|
|
mimeType string // Content-Type of the object
|
|
}
|
|
|
|
// ------------------------------------------------------------
|
|
|
|
// Name of the remote (as passed into NewFs)
|
|
func (f *Fs) Name() string {
|
|
return f.name
|
|
}
|
|
|
|
// Root of the remote (as passed into NewFs)
|
|
func (f *Fs) Root() string {
|
|
if f.root == "" {
|
|
return f.bucket
|
|
}
|
|
return f.bucket + "/" + f.root
|
|
}
|
|
|
|
// String converts this Fs to a string
|
|
func (f *Fs) String() string {
|
|
if f.root == "" {
|
|
return fmt.Sprintf("B2 bucket %s", f.bucket)
|
|
}
|
|
return fmt.Sprintf("B2 bucket %s path %s", f.bucket, f.root)
|
|
}
|
|
|
|
// Features returns the optional features of this Fs
|
|
func (f *Fs) Features() *fs.Features {
|
|
return f.features
|
|
}
|
|
|
|
// Pattern to match a b2 path
|
|
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
|
|
|
|
// parseParse parses a b2 'url'
|
|
func parsePath(path string) (bucket, directory string, err error) {
|
|
parts := matcher.FindStringSubmatch(path)
|
|
if parts == nil {
|
|
err = errors.Errorf("couldn't find bucket in b2 path %q", path)
|
|
} else {
|
|
bucket, directory = parts[1], parts[2]
|
|
directory = strings.Trim(directory, "/")
|
|
}
|
|
return
|
|
}
|
|
|
|
// retryErrorCodes is a slice of error codes that we will retry
|
|
var retryErrorCodes = []int{
|
|
401, // Unauthorized (eg "Token has expired")
|
|
408, // Request Timeout
|
|
429, // Rate exceeded.
|
|
500, // Get occasional 500 Internal Server Error
|
|
503, // Service Unavailable
|
|
504, // Gateway Time-out
|
|
}
|
|
|
|
// shouldRetryNoAuth returns a boolean as to whether this resp and err
|
|
// deserve to be retried. It returns the err as a convenience
|
|
func (f *Fs) shouldRetryNoReauth(resp *http.Response, err error) (bool, error) {
|
|
// For 429 or 503 errors look at the Retry-After: header and
|
|
// set the retry appropriately, starting with a minimum of 1
|
|
// second if it isn't set.
|
|
if resp != nil && (resp.StatusCode == 429 || resp.StatusCode == 503) {
|
|
var retryAfter = 1
|
|
retryAfterString := resp.Header.Get(retryAfterHeader)
|
|
if retryAfterString != "" {
|
|
var err error
|
|
retryAfter, err = strconv.Atoi(retryAfterString)
|
|
if err != nil {
|
|
fs.Errorf(f, "Malformed %s header %q: %v", retryAfterHeader, retryAfterString, err)
|
|
}
|
|
}
|
|
return true, pacer.RetryAfterError(err, time.Duration(retryAfter)*time.Second)
|
|
}
|
|
return fserrors.ShouldRetry(err) || fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
|
|
}
|
|
|
|
// shouldRetry returns a boolean as to whether this resp and err
|
|
// deserve to be retried. It returns the err as a convenience
|
|
func (f *Fs) shouldRetry(resp *http.Response, err error) (bool, error) {
|
|
if resp != nil && resp.StatusCode == 401 {
|
|
fs.Debugf(f, "Unauthorized: %v", err)
|
|
// Reauth
|
|
authErr := f.authorizeAccount()
|
|
if authErr != nil {
|
|
err = authErr
|
|
}
|
|
return true, err
|
|
}
|
|
return f.shouldRetryNoReauth(resp, err)
|
|
}
|
|
|
|
// errorHandler parses a non 2xx error response into an error
|
|
func errorHandler(resp *http.Response) error {
|
|
// Decode error response
|
|
errResponse := new(api.Error)
|
|
err := rest.DecodeJSON(resp, &errResponse)
|
|
if err != nil {
|
|
fs.Debugf(nil, "Couldn't decode error response: %v", err)
|
|
}
|
|
if errResponse.Code == "" {
|
|
errResponse.Code = "unknown"
|
|
}
|
|
if errResponse.Status == 0 {
|
|
errResponse.Status = resp.StatusCode
|
|
}
|
|
if errResponse.Message == "" {
|
|
errResponse.Message = "Unknown " + resp.Status
|
|
}
|
|
return errResponse
|
|
}
|
|
|
|
func checkUploadChunkSize(cs fs.SizeSuffix) error {
|
|
if cs < minChunkSize {
|
|
return errors.Errorf("%s is less than %s", cs, minChunkSize)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
|
err = checkUploadChunkSize(cs)
|
|
if err == nil {
|
|
old, f.opt.ChunkSize = f.opt.ChunkSize, cs
|
|
f.fillBufferTokens() // reset the buffer tokens
|
|
}
|
|
return
|
|
}
|
|
|
|
func checkUploadCutoff(opt *Options, cs fs.SizeSuffix) error {
|
|
if cs < opt.ChunkSize {
|
|
return errors.Errorf("%v is less than chunk size %v", cs, opt.ChunkSize)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
|
err = checkUploadCutoff(&f.opt, cs)
|
|
if err == nil {
|
|
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
|
|
}
|
|
return
|
|
}
|
|
|
|
// NewFs constructs an Fs from the path, bucket:path
|
|
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|
ctx := context.Background()
|
|
// Parse config into Options struct
|
|
opt := new(Options)
|
|
err := configstruct.Set(m, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = checkUploadCutoff(opt, opt.UploadCutoff)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "b2: upload cutoff")
|
|
}
|
|
err = checkUploadChunkSize(opt.ChunkSize)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "b2: chunk size")
|
|
}
|
|
bucket, directory, err := parsePath(root)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if opt.Account == "" {
|
|
return nil, errors.New("account not found")
|
|
}
|
|
if opt.Key == "" {
|
|
return nil, errors.New("key not found")
|
|
}
|
|
if opt.Endpoint == "" {
|
|
opt.Endpoint = defaultEndpoint
|
|
}
|
|
f := &Fs{
|
|
name: name,
|
|
opt: *opt,
|
|
bucket: bucket,
|
|
root: directory,
|
|
srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler),
|
|
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
|
}
|
|
f.features = (&fs.Features{
|
|
ReadMimeType: true,
|
|
WriteMimeType: true,
|
|
BucketBased: true,
|
|
}).Fill(f)
|
|
// Set the test flag if required
|
|
if opt.TestMode != "" {
|
|
testMode := strings.TrimSpace(opt.TestMode)
|
|
f.srv.SetHeader(testModeHeader, testMode)
|
|
fs.Debugf(f, "Setting test header \"%s: %s\"", testModeHeader, testMode)
|
|
}
|
|
f.fillBufferTokens()
|
|
err = f.authorizeAccount()
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to authorize account")
|
|
}
|
|
// If this is a key limited to a single bucket, it must exist already
|
|
if f.bucket != "" && f.info.Allowed.BucketID != "" {
|
|
allowedBucket := f.info.Allowed.BucketName
|
|
if allowedBucket == "" {
|
|
return nil, errors.New("bucket that application key is restricted to no longer exists")
|
|
}
|
|
if allowedBucket != f.bucket {
|
|
return nil, errors.Errorf("you must use bucket %q with this application key", allowedBucket)
|
|
}
|
|
f.markBucketOK()
|
|
f.setBucketID(f.info.Allowed.BucketID)
|
|
}
|
|
if f.root != "" {
|
|
f.root += "/"
|
|
// Check to see if the (bucket,directory) is actually an existing file
|
|
oldRoot := f.root
|
|
remote := path.Base(directory)
|
|
f.root = path.Dir(directory)
|
|
if f.root == "." {
|
|
f.root = ""
|
|
} else {
|
|
f.root += "/"
|
|
}
|
|
_, err := f.NewObject(ctx, remote)
|
|
if err != nil {
|
|
if err == fs.ErrorObjectNotFound {
|
|
// File doesn't exist so return old f
|
|
f.root = oldRoot
|
|
return f, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
// return an error with an fs which points to the parent
|
|
return f, fs.ErrorIsFile
|
|
}
|
|
return f, nil
|
|
}
|
|
|
|
// authorizeAccount gets the API endpoint and auth token. Can be used
|
|
// for reauthentication too.
|
|
func (f *Fs) authorizeAccount() error {
|
|
f.authMu.Lock()
|
|
defer f.authMu.Unlock()
|
|
opts := rest.Opts{
|
|
Method: "GET",
|
|
Path: "/b2api/v1/b2_authorize_account",
|
|
RootURL: f.opt.Endpoint,
|
|
UserName: f.opt.Account,
|
|
Password: f.opt.Key,
|
|
ExtraHeaders: map[string]string{"Authorization": ""}, // unset the Authorization for this request
|
|
}
|
|
err := f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(&opts, nil, &f.info)
|
|
return f.shouldRetryNoReauth(resp, err)
|
|
})
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to authenticate")
|
|
}
|
|
f.srv.SetRoot(f.info.APIURL+"/b2api/v1").SetHeader("Authorization", f.info.AuthorizationToken)
|
|
return nil
|
|
}
|
|
|
|
// hasPermission returns if the current AuthorizationToken has the selected permission
|
|
func (f *Fs) hasPermission(permission string) bool {
|
|
for _, capability := range f.info.Allowed.Capabilities {
|
|
if capability == permission {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// getUploadURL returns the upload info with the UploadURL and the AuthorizationToken
|
|
//
|
|
// This should be returned with returnUploadURL when finished
|
|
func (f *Fs) getUploadURL() (upload *api.GetUploadURLResponse, err error) {
|
|
f.uploadMu.Lock()
|
|
defer f.uploadMu.Unlock()
|
|
bucketID, err := f.getBucketID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(f.uploads) == 0 {
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_get_upload_url",
|
|
}
|
|
var request = api.GetUploadURLRequest{
|
|
BucketID: bucketID,
|
|
}
|
|
err := f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(&opts, &request, &upload)
|
|
return f.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to get upload URL")
|
|
}
|
|
} else {
|
|
upload, f.uploads = f.uploads[0], f.uploads[1:]
|
|
}
|
|
return upload, nil
|
|
}
|
|
|
|
// returnUploadURL returns the UploadURL to the cache
|
|
func (f *Fs) returnUploadURL(upload *api.GetUploadURLResponse) {
|
|
if upload == nil {
|
|
return
|
|
}
|
|
f.uploadMu.Lock()
|
|
f.uploads = append(f.uploads, upload)
|
|
f.uploadMu.Unlock()
|
|
}
|
|
|
|
// clearUploadURL clears the current UploadURL and the AuthorizationToken
|
|
func (f *Fs) clearUploadURL() {
|
|
f.uploadMu.Lock()
|
|
f.uploads = nil
|
|
f.uploadMu.Unlock()
|
|
}
|
|
|
|
// Fill up (or reset) the buffer tokens
|
|
func (f *Fs) fillBufferTokens() {
|
|
f.bufferTokens = make(chan []byte, fs.Config.Transfers)
|
|
for i := 0; i < fs.Config.Transfers; i++ {
|
|
f.bufferTokens <- nil
|
|
}
|
|
}
|
|
|
|
// getUploadBlock gets a block from the pool of size chunkSize
|
|
func (f *Fs) getUploadBlock() []byte {
|
|
buf := <-f.bufferTokens
|
|
if buf == nil {
|
|
buf = make([]byte, f.opt.ChunkSize)
|
|
}
|
|
// fs.Debugf(f, "Getting upload block %p", buf)
|
|
return buf
|
|
}
|
|
|
|
// putUploadBlock returns a block to the pool of size chunkSize
|
|
func (f *Fs) putUploadBlock(buf []byte) {
|
|
buf = buf[:cap(buf)]
|
|
if len(buf) != int(f.opt.ChunkSize) {
|
|
panic("bad blocksize returned to pool")
|
|
}
|
|
// fs.Debugf(f, "Returning upload block %p", buf)
|
|
f.bufferTokens <- buf
|
|
}
|
|
|
|
// Return an Object from a path
|
|
//
|
|
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
|
func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *api.File) (fs.Object, error) {
|
|
o := &Object{
|
|
fs: f,
|
|
remote: remote,
|
|
}
|
|
if info != nil {
|
|
err := o.decodeMetaData(info)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
err := o.readMetaData(ctx) // reads info and headers, returning an error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
// NewObject finds the Object at remote. If it can't be found
|
|
// it returns the error fs.ErrorObjectNotFound.
|
|
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
|
return f.newObjectWithInfo(ctx, remote, nil)
|
|
}
|
|
|
|
// listFn is called from list to handle an object
|
|
type listFn func(remote string, object *api.File, isDirectory bool) error
|
|
|
|
// errEndList is a sentinel used to end the list iteration now.
|
|
// listFn should return it to end the iteration with no errors.
|
|
var errEndList = errors.New("end list")
|
|
|
|
// list lists the objects into the function supplied from
|
|
// the bucket and root supplied
|
|
//
|
|
// dir is the starting directory, "" for root
|
|
//
|
|
// level is the depth to search to
|
|
//
|
|
// If prefix is set then startFileName is used as a prefix which all
|
|
// files must have
|
|
//
|
|
// If limit is > 0 then it limits to that many files (must be less
|
|
// than 1000)
|
|
//
|
|
// If hidden is set then it will list the hidden (deleted) files too.
|
|
func (f *Fs) list(ctx context.Context, dir string, recurse bool, prefix string, limit int, hidden bool, fn listFn) error {
|
|
root := f.root
|
|
if dir != "" {
|
|
root += dir + "/"
|
|
}
|
|
delimiter := ""
|
|
if !recurse {
|
|
delimiter = "/"
|
|
}
|
|
bucketID, err := f.getBucketID()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
chunkSize := 1000
|
|
if limit > 0 {
|
|
chunkSize = limit
|
|
}
|
|
var request = api.ListFileNamesRequest{
|
|
BucketID: bucketID,
|
|
MaxFileCount: chunkSize,
|
|
Prefix: root,
|
|
Delimiter: delimiter,
|
|
}
|
|
prefix = root + prefix
|
|
if prefix != "" {
|
|
request.StartFileName = prefix
|
|
}
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_list_file_names",
|
|
}
|
|
if hidden {
|
|
opts.Path = "/b2_list_file_versions"
|
|
}
|
|
for {
|
|
var response api.ListFileNamesResponse
|
|
err := f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(&opts, &request, &response)
|
|
return f.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Check that there are matching files
|
|
if len(response.Files) == 0 {
|
|
return fs.ErrorDirNotFound
|
|
}
|
|
for i := range response.Files {
|
|
file := &response.Files[i]
|
|
// Finish if file name no longer has prefix
|
|
if prefix != "" && !strings.HasPrefix(file.Name, prefix) {
|
|
return nil
|
|
}
|
|
if !strings.HasPrefix(file.Name, f.root) {
|
|
fs.Debugf(f, "Odd name received %q", file.Name)
|
|
continue
|
|
}
|
|
remote := file.Name[len(f.root):]
|
|
// Check for directory
|
|
isDirectory := strings.HasSuffix(remote, "/")
|
|
if isDirectory {
|
|
remote = remote[:len(remote)-1]
|
|
}
|
|
// Send object
|
|
err = fn(remote, file, isDirectory)
|
|
if err != nil {
|
|
if err == errEndList {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
// end if no NextFileName
|
|
if response.NextFileName == nil {
|
|
break
|
|
}
|
|
request.StartFileName = *response.NextFileName
|
|
if response.NextFileID != nil {
|
|
request.StartFileID = *response.NextFileID
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Convert a list item into a DirEntry
|
|
func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *api.File, isDirectory bool, last *string) (fs.DirEntry, error) {
|
|
if isDirectory {
|
|
d := fs.NewDir(remote, time.Time{})
|
|
return d, nil
|
|
}
|
|
if remote == *last {
|
|
remote = object.UploadTimestamp.AddVersion(remote)
|
|
} else {
|
|
*last = remote
|
|
}
|
|
// hide objects represent deleted files which we don't list
|
|
if object.Action == "hide" {
|
|
return nil, nil
|
|
}
|
|
o, err := f.newObjectWithInfo(ctx, remote, object)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
// mark the bucket as being OK
|
|
func (f *Fs) markBucketOK() {
|
|
if f.bucket != "" {
|
|
f.bucketOKMu.Lock()
|
|
f.bucketOK = true
|
|
f.bucketOKMu.Unlock()
|
|
}
|
|
}
|
|
|
|
// listDir lists a single directory
|
|
func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
|
last := ""
|
|
err = f.list(ctx, dir, false, "", 0, f.opt.Versions, func(remote string, object *api.File, isDirectory bool) error {
|
|
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory, &last)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if entry != nil {
|
|
entries = append(entries, entry)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// bucket must be present if listing succeeded
|
|
f.markBucketOK()
|
|
return entries, nil
|
|
}
|
|
|
|
// listBuckets returns all the buckets to out
|
|
func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) {
|
|
if dir != "" {
|
|
return nil, fs.ErrorListBucketRequired
|
|
}
|
|
err = f.listBucketsToFn(func(bucket *api.Bucket) error {
|
|
d := fs.NewDir(bucket.Name, time.Time{})
|
|
entries = append(entries, d)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return entries, nil
|
|
}
|
|
|
|
// List the objects and directories in dir into entries. The
|
|
// entries can be returned in any order but should be for a
|
|
// complete directory.
|
|
//
|
|
// dir should be "" to list the root, and should not have
|
|
// trailing slashes.
|
|
//
|
|
// This should return ErrDirNotFound if the directory isn't
|
|
// found.
|
|
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
|
if f.bucket == "" {
|
|
return f.listBuckets(dir)
|
|
}
|
|
return f.listDir(ctx, dir)
|
|
}
|
|
|
|
// ListR lists the objects and directories of the Fs starting
|
|
// from dir recursively into out.
|
|
//
|
|
// dir should be "" to start from the root, and should not
|
|
// have trailing slashes.
|
|
//
|
|
// This should return ErrDirNotFound if the directory isn't
|
|
// found.
|
|
//
|
|
// It should call callback for each tranche of entries read.
|
|
// These need not be returned in any particular order. If
|
|
// callback returns an error then the listing will stop
|
|
// immediately.
|
|
//
|
|
// Don't implement this unless you have a more efficient way
|
|
// of listing recursively that doing a directory traversal.
|
|
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
|
if f.bucket == "" {
|
|
return fs.ErrorListBucketRequired
|
|
}
|
|
list := walk.NewListRHelper(callback)
|
|
last := ""
|
|
err = f.list(ctx, dir, true, "", 0, f.opt.Versions, func(remote string, object *api.File, isDirectory bool) error {
|
|
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory, &last)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return list.Add(entry)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// bucket must be present if listing succeeded
|
|
f.markBucketOK()
|
|
return list.Flush()
|
|
}
|
|
|
|
// listBucketFn is called from listBucketsToFn to handle a bucket
|
|
type listBucketFn func(*api.Bucket) error
|
|
|
|
// listBucketsToFn lists the buckets to the function supplied
|
|
func (f *Fs) listBucketsToFn(fn listBucketFn) error {
|
|
var account = api.ListBucketsRequest{
|
|
AccountID: f.info.AccountID,
|
|
BucketID: f.info.Allowed.BucketID,
|
|
}
|
|
|
|
var response api.ListBucketsResponse
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_list_buckets",
|
|
}
|
|
err := f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(&opts, &account, &response)
|
|
return f.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i := range response.Buckets {
|
|
err = fn(&response.Buckets[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// getbucketType finds the bucketType for the current bucket name
|
|
// can be one of allPublic. allPrivate, or snapshot
|
|
func (f *Fs) getbucketType() (bucketType string, err error) {
|
|
f.bucketTypeMutex.Lock()
|
|
defer f.bucketTypeMutex.Unlock()
|
|
if f._bucketType != "" {
|
|
return f._bucketType, nil
|
|
}
|
|
err = f.listBucketsToFn(func(bucket *api.Bucket) error {
|
|
if bucket.Name == f.bucket {
|
|
bucketType = bucket.Type
|
|
}
|
|
return nil
|
|
|
|
})
|
|
if bucketType == "" {
|
|
err = fs.ErrorDirNotFound
|
|
}
|
|
f._bucketType = bucketType
|
|
return bucketType, err
|
|
}
|
|
|
|
// setBucketType sets the Type for the current bucket name
|
|
func (f *Fs) setBucketType(Type string) {
|
|
f.bucketTypeMutex.Lock()
|
|
f._bucketType = Type
|
|
f.bucketTypeMutex.Unlock()
|
|
}
|
|
|
|
// clearBucketType clears the Type for the current bucket name
|
|
func (f *Fs) clearBucketType() {
|
|
f.bucketTypeMutex.Lock()
|
|
f._bucketType = ""
|
|
f.bucketTypeMutex.Unlock()
|
|
}
|
|
|
|
// getBucketID finds the ID for the current bucket name
|
|
func (f *Fs) getBucketID() (bucketID string, err error) {
|
|
f.bucketIDMutex.Lock()
|
|
defer f.bucketIDMutex.Unlock()
|
|
if f._bucketID != "" {
|
|
return f._bucketID, nil
|
|
}
|
|
err = f.listBucketsToFn(func(bucket *api.Bucket) error {
|
|
if bucket.Name == f.bucket {
|
|
bucketID = bucket.ID
|
|
}
|
|
return nil
|
|
|
|
})
|
|
if bucketID == "" {
|
|
err = fs.ErrorDirNotFound
|
|
}
|
|
f._bucketID = bucketID
|
|
return bucketID, err
|
|
}
|
|
|
|
// setBucketID sets the ID for the current bucket name
|
|
func (f *Fs) setBucketID(ID string) {
|
|
f.bucketIDMutex.Lock()
|
|
f._bucketID = ID
|
|
f.bucketIDMutex.Unlock()
|
|
}
|
|
|
|
// clearBucketID clears the ID for the current bucket name
|
|
func (f *Fs) clearBucketID() {
|
|
f.bucketIDMutex.Lock()
|
|
f._bucketID = ""
|
|
f.bucketIDMutex.Unlock()
|
|
}
|
|
|
|
// Put the object into the bucket
|
|
//
|
|
// Copy the reader in to the new object which is returned
|
|
//
|
|
// The new object may have been created if an error is returned
|
|
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
// Temporary Object under construction
|
|
fs := &Object{
|
|
fs: f,
|
|
remote: src.Remote(),
|
|
}
|
|
return fs, fs.Update(ctx, in, src, options...)
|
|
}
|
|
|
|
// PutStream uploads to the remote path with the modTime given of indeterminate size
|
|
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
return f.Put(ctx, in, src, options...)
|
|
}
|
|
|
|
// Mkdir creates the bucket if it doesn't exist
|
|
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
|
f.bucketOKMu.Lock()
|
|
defer f.bucketOKMu.Unlock()
|
|
if f.bucketOK {
|
|
return nil
|
|
}
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_create_bucket",
|
|
}
|
|
var request = api.CreateBucketRequest{
|
|
AccountID: f.info.AccountID,
|
|
Name: f.bucket,
|
|
Type: "allPrivate",
|
|
}
|
|
var response api.Bucket
|
|
err := f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(&opts, &request, &response)
|
|
return f.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
if apiErr, ok := err.(*api.Error); ok {
|
|
if apiErr.Code == "duplicate_bucket_name" {
|
|
// Check this is our bucket - buckets are globally unique and this
|
|
// might be someone elses.
|
|
_, getBucketErr := f.getBucketID()
|
|
if getBucketErr == nil {
|
|
// found so it is our bucket
|
|
f.bucketOK = true
|
|
return nil
|
|
}
|
|
if getBucketErr != fs.ErrorDirNotFound {
|
|
fs.Debugf(f, "Error checking bucket exists: %v", getBucketErr)
|
|
}
|
|
}
|
|
}
|
|
return errors.Wrap(err, "failed to create bucket")
|
|
}
|
|
f.setBucketID(response.ID)
|
|
f.setBucketType(response.Type)
|
|
f.bucketOK = true
|
|
return nil
|
|
}
|
|
|
|
// Rmdir deletes the bucket if the fs is at the root
|
|
//
|
|
// Returns an error if it isn't empty
|
|
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
|
f.bucketOKMu.Lock()
|
|
defer f.bucketOKMu.Unlock()
|
|
if f.root != "" || dir != "" {
|
|
return nil
|
|
}
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_delete_bucket",
|
|
}
|
|
bucketID, err := f.getBucketID()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var request = api.DeleteBucketRequest{
|
|
ID: bucketID,
|
|
AccountID: f.info.AccountID,
|
|
}
|
|
var response api.Bucket
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(&opts, &request, &response)
|
|
return f.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to delete bucket")
|
|
}
|
|
f.bucketOK = false
|
|
f.clearBucketID()
|
|
f.clearBucketType()
|
|
f.clearUploadURL()
|
|
return nil
|
|
}
|
|
|
|
// Precision of the remote
|
|
func (f *Fs) Precision() time.Duration {
|
|
return time.Millisecond
|
|
}
|
|
|
|
// hide hides a file on the remote
|
|
func (f *Fs) hide(Name string) error {
|
|
bucketID, err := f.getBucketID()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_hide_file",
|
|
}
|
|
var request = api.HideFileRequest{
|
|
BucketID: bucketID,
|
|
Name: Name,
|
|
}
|
|
var response api.File
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(&opts, &request, &response)
|
|
return f.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
if apiErr, ok := err.(*api.Error); ok {
|
|
if apiErr.Code == "already_hidden" {
|
|
// sometimes eventual consistency causes this, so
|
|
// ignore this error since it is harmless
|
|
return nil
|
|
}
|
|
}
|
|
return errors.Wrapf(err, "failed to hide %q", Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// deleteByID deletes a file version given Name and ID
|
|
func (f *Fs) deleteByID(ID, Name string) error {
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_delete_file_version",
|
|
}
|
|
var request = api.DeleteFileRequest{
|
|
ID: ID,
|
|
Name: Name,
|
|
}
|
|
var response api.File
|
|
err := f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(&opts, &request, &response)
|
|
return f.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to delete %q", Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// purge deletes all the files and directories
|
|
//
|
|
// if oldOnly is true then it deletes only non current files.
|
|
//
|
|
// Implemented here so we can make sure we delete old versions.
|
|
func (f *Fs) purge(ctx context.Context, oldOnly bool) error {
|
|
var errReturn error
|
|
var checkErrMutex sync.Mutex
|
|
var checkErr = func(err error) {
|
|
if err == nil {
|
|
return
|
|
}
|
|
checkErrMutex.Lock()
|
|
defer checkErrMutex.Unlock()
|
|
if errReturn == nil {
|
|
errReturn = err
|
|
}
|
|
}
|
|
var isUnfinishedUploadStale = func(timestamp api.Timestamp) bool {
|
|
if time.Since(time.Time(timestamp)).Hours() > 24 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Delete Config.Transfers in parallel
|
|
toBeDeleted := make(chan *api.File, fs.Config.Transfers)
|
|
var wg sync.WaitGroup
|
|
wg.Add(fs.Config.Transfers)
|
|
for i := 0; i < fs.Config.Transfers; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for object := range toBeDeleted {
|
|
accounting.Stats(ctx).Checking(object.Name)
|
|
checkErr(f.deleteByID(object.ID, object.Name))
|
|
accounting.Stats(ctx).DoneChecking(object.Name)
|
|
}
|
|
}()
|
|
}
|
|
last := ""
|
|
checkErr(f.list(ctx, "", true, "", 0, true, func(remote string, object *api.File, isDirectory bool) error {
|
|
if !isDirectory {
|
|
accounting.Stats(ctx).Checking(remote)
|
|
if oldOnly && last != remote {
|
|
if object.Action == "hide" {
|
|
fs.Debugf(remote, "Deleting current version (id %q) as it is a hide marker", object.ID)
|
|
toBeDeleted <- object
|
|
} else if object.Action == "start" && isUnfinishedUploadStale(object.UploadTimestamp) {
|
|
fs.Debugf(remote, "Deleting current version (id %q) as it is a start marker (upload started at %s)", object.ID, time.Time(object.UploadTimestamp).Local())
|
|
toBeDeleted <- object
|
|
} else {
|
|
fs.Debugf(remote, "Not deleting current version (id %q) %q", object.ID, object.Action)
|
|
}
|
|
} else {
|
|
fs.Debugf(remote, "Deleting (id %q)", object.ID)
|
|
toBeDeleted <- object
|
|
}
|
|
last = remote
|
|
accounting.Stats(ctx).DoneChecking(remote)
|
|
}
|
|
return nil
|
|
}))
|
|
close(toBeDeleted)
|
|
wg.Wait()
|
|
|
|
if !oldOnly {
|
|
checkErr(f.Rmdir(ctx, ""))
|
|
}
|
|
return errReturn
|
|
}
|
|
|
|
// Purge deletes all the files and directories including the old versions.
|
|
func (f *Fs) Purge(ctx context.Context) error {
|
|
return f.purge(ctx, false)
|
|
}
|
|
|
|
// CleanUp deletes all the hidden files.
|
|
func (f *Fs) CleanUp(ctx context.Context) error {
|
|
return f.purge(ctx, true)
|
|
}
|
|
|
|
// Copy src to this remote using server side copy operations.
|
|
//
|
|
// This is stored with the remote path given
|
|
//
|
|
// It returns the destination Object and a possible error
|
|
//
|
|
// Will only be called if src.Fs().Name() == f.Name()
|
|
//
|
|
// If it isn't possible then return fs.ErrorCantCopy
|
|
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
|
err := f.Mkdir(ctx, "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
srcObj, ok := src.(*Object)
|
|
if !ok {
|
|
fs.Debugf(src, "Can't copy - not same remote type")
|
|
return nil, fs.ErrorCantCopy
|
|
}
|
|
destBucketID, err := f.getBucketID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_copy_file",
|
|
}
|
|
var request = api.CopyFileRequest{
|
|
SourceID: srcObj.id,
|
|
Name: f.root + remote,
|
|
MetadataDirective: "COPY",
|
|
DestBucketID: destBucketID,
|
|
}
|
|
var response api.FileInfo
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(&opts, &request, &response)
|
|
return f.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
o := &Object{
|
|
fs: f,
|
|
remote: remote,
|
|
}
|
|
err = o.decodeMetaDataFileInfo(&response)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
// Hashes returns the supported hash sets.
|
|
func (f *Fs) Hashes() hash.Set {
|
|
return hash.Set(hash.SHA1)
|
|
}
|
|
|
|
// getDownloadAuthorization returns authorization token for downloading
|
|
// without accout.
|
|
func (f *Fs) getDownloadAuthorization(remote string) (authorization string, err error) {
|
|
validDurationInSeconds := time.Duration(f.opt.DownloadAuthorizationDuration).Nanoseconds() / 1e9
|
|
if validDurationInSeconds <= 0 || validDurationInSeconds > 604800 {
|
|
return "", errors.New("--b2-download-auth-duration must be between 1 sec and 1 week")
|
|
}
|
|
if !f.hasPermission("shareFiles") {
|
|
return "", errors.New("sharing a file link requires the shareFiles permission")
|
|
}
|
|
bucketID, err := f.getBucketID()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_get_download_authorization",
|
|
}
|
|
var request = api.GetDownloadAuthorizationRequest{
|
|
BucketID: bucketID,
|
|
FileNamePrefix: path.Join(f.root, remote),
|
|
ValidDurationInSeconds: validDurationInSeconds,
|
|
}
|
|
var response api.GetDownloadAuthorizationResponse
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(&opts, &request, &response)
|
|
return f.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
return "", errors.Wrap(err, "failed to get download authorization")
|
|
}
|
|
return response.AuthorizationToken, nil
|
|
}
|
|
|
|
// PublicLink returns a link for downloading without accout.
|
|
func (f *Fs) PublicLink(ctx context.Context, remote string) (link string, err error) {
|
|
var RootURL string
|
|
if f.opt.DownloadURL == "" {
|
|
RootURL = f.info.DownloadURL
|
|
} else {
|
|
RootURL = f.opt.DownloadURL
|
|
}
|
|
_, err = f.NewObject(ctx, remote)
|
|
if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile {
|
|
err = f.list(ctx, remote, false, "", 1, f.opt.Versions, func(remote string, object *api.File, isDirectory bool) error {
|
|
return errEndList
|
|
})
|
|
}
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
absPath := "/" + path.Join(f.root, remote)
|
|
link = RootURL + "/file/" + urlEncode(f.bucket) + absPath
|
|
bucketType, err := f.getbucketType()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if bucketType == "allPrivate" || bucketType == "snapshot" {
|
|
AuthorizationToken, err := f.getDownloadAuthorization(remote)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
link += "?Authorization=" + AuthorizationToken
|
|
}
|
|
return link, nil
|
|
}
|
|
|
|
// ------------------------------------------------------------
|
|
|
|
// Fs returns the parent Fs
|
|
func (o *Object) Fs() fs.Info {
|
|
return o.fs
|
|
}
|
|
|
|
// Return a string version
|
|
func (o *Object) String() string {
|
|
if o == nil {
|
|
return "<nil>"
|
|
}
|
|
return o.remote
|
|
}
|
|
|
|
// Remote returns the remote path
|
|
func (o *Object) Remote() string {
|
|
return o.remote
|
|
}
|
|
|
|
// Hash returns the Sha-1 of an object returning a lowercase hex string
|
|
func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
|
|
if t != hash.SHA1 {
|
|
return "", hash.ErrUnsupported
|
|
}
|
|
if o.sha1 == "" {
|
|
// Error is logged in readMetaData
|
|
err := o.readMetaData(ctx)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
return o.sha1, nil
|
|
}
|
|
|
|
// Size returns the size of an object in bytes
|
|
func (o *Object) Size() int64 {
|
|
return o.size
|
|
}
|
|
|
|
// decodeMetaDataRaw sets the metadata from the data passed in
|
|
//
|
|
// Sets
|
|
// o.id
|
|
// o.modTime
|
|
// o.size
|
|
// o.sha1
|
|
func (o *Object) decodeMetaDataRaw(ID, SHA1 string, Size int64, UploadTimestamp api.Timestamp, Info map[string]string, mimeType string) (err error) {
|
|
o.id = ID
|
|
o.sha1 = SHA1
|
|
o.mimeType = mimeType
|
|
// Read SHA1 from metadata if it exists and isn't set
|
|
if o.sha1 == "" || o.sha1 == "none" {
|
|
o.sha1 = Info[sha1Key]
|
|
}
|
|
o.size = Size
|
|
// Use the UploadTimestamp if can't get file info
|
|
o.modTime = time.Time(UploadTimestamp)
|
|
return o.parseTimeString(Info[timeKey])
|
|
}
|
|
|
|
// decodeMetaData sets the metadata in the object from an api.File
|
|
//
|
|
// Sets
|
|
// o.id
|
|
// o.modTime
|
|
// o.size
|
|
// o.sha1
|
|
func (o *Object) decodeMetaData(info *api.File) (err error) {
|
|
return o.decodeMetaDataRaw(info.ID, info.SHA1, info.Size, info.UploadTimestamp, info.Info, info.ContentType)
|
|
}
|
|
|
|
// decodeMetaDataFileInfo sets the metadata in the object from an api.FileInfo
|
|
//
|
|
// Sets
|
|
// o.id
|
|
// o.modTime
|
|
// o.size
|
|
// o.sha1
|
|
func (o *Object) decodeMetaDataFileInfo(info *api.FileInfo) (err error) {
|
|
return o.decodeMetaDataRaw(info.ID, info.SHA1, info.Size, info.UploadTimestamp, info.Info, info.ContentType)
|
|
}
|
|
|
|
// getMetaData gets the metadata from the object unconditionally
|
|
func (o *Object) getMetaData(ctx context.Context) (info *api.File, err error) {
|
|
maxSearched := 1
|
|
var timestamp api.Timestamp
|
|
baseRemote := o.remote
|
|
if o.fs.opt.Versions {
|
|
timestamp, baseRemote = api.RemoveVersion(baseRemote)
|
|
maxSearched = maxVersions
|
|
}
|
|
|
|
err = o.fs.list(ctx, "", true, baseRemote, maxSearched, o.fs.opt.Versions, func(remote string, object *api.File, isDirectory bool) error {
|
|
if isDirectory {
|
|
return nil
|
|
}
|
|
if remote == baseRemote {
|
|
if !timestamp.IsZero() && !timestamp.Equal(object.UploadTimestamp) {
|
|
return nil
|
|
}
|
|
info = object
|
|
}
|
|
return errEndList // read only 1 item
|
|
})
|
|
if err != nil {
|
|
if err == fs.ErrorDirNotFound {
|
|
return nil, fs.ErrorObjectNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
if info == nil {
|
|
return nil, fs.ErrorObjectNotFound
|
|
}
|
|
return info, nil
|
|
}
|
|
|
|
// readMetaData gets the metadata if it hasn't already been fetched
|
|
//
|
|
// Sets
|
|
// o.id
|
|
// o.modTime
|
|
// o.size
|
|
// o.sha1
|
|
func (o *Object) readMetaData(ctx context.Context) (err error) {
|
|
if o.id != "" {
|
|
return nil
|
|
}
|
|
info, err := o.getMetaData(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return o.decodeMetaData(info)
|
|
}
|
|
|
|
// timeString returns modTime as the number of milliseconds
|
|
// elapsed since January 1, 1970 UTC as a decimal string.
|
|
func timeString(modTime time.Time) string {
|
|
return strconv.FormatInt(modTime.UnixNano()/1E6, 10)
|
|
}
|
|
|
|
// parseTimeString converts a decimal string number of milliseconds
|
|
// elapsed since January 1, 1970 UTC into a time.Time and stores it in
|
|
// the modTime variable.
|
|
func (o *Object) parseTimeString(timeString string) (err error) {
|
|
if timeString == "" {
|
|
return nil
|
|
}
|
|
unixMilliseconds, err := strconv.ParseInt(timeString, 10, 64)
|
|
if err != nil {
|
|
fs.Debugf(o, "Failed to parse mod time string %q: %v", timeString, err)
|
|
return nil
|
|
}
|
|
o.modTime = time.Unix(unixMilliseconds/1E3, (unixMilliseconds%1E3)*1E6).UTC()
|
|
return nil
|
|
}
|
|
|
|
// ModTime returns the modification time of the object
|
|
//
|
|
// It attempts to read the objects mtime and if that isn't present the
|
|
// LastModified returned in the http headers
|
|
//
|
|
// SHA-1 will also be updated once the request has completed.
|
|
func (o *Object) ModTime(ctx context.Context) (result time.Time) {
|
|
// The error is logged in readMetaData
|
|
_ = o.readMetaData(ctx)
|
|
return o.modTime
|
|
}
|
|
|
|
// SetModTime sets the modification time of the Object
|
|
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
|
info, err := o.getMetaData(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
info.Info[timeKey] = timeString(modTime)
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_copy_file",
|
|
}
|
|
var request = api.CopyFileRequest{
|
|
SourceID: o.id,
|
|
Name: o.fs.root + o.remote, // copy to same name
|
|
MetadataDirective: "REPLACE",
|
|
ContentType: info.ContentType,
|
|
Info: info.Info,
|
|
}
|
|
var response api.FileInfo
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
resp, err := o.fs.srv.CallJSON(&opts, &request, &response)
|
|
return o.fs.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return o.decodeMetaDataFileInfo(&response)
|
|
}
|
|
|
|
// Storable returns if this object is storable
|
|
func (o *Object) Storable() bool {
|
|
return true
|
|
}
|
|
|
|
// openFile represents an Object open for reading
|
|
type openFile struct {
|
|
o *Object // Object we are reading for
|
|
resp *http.Response // response of the GET
|
|
body io.Reader // reading from here
|
|
hash gohash.Hash // currently accumulating SHA1
|
|
bytes int64 // number of bytes read on this connection
|
|
eof bool // whether we have read end of file
|
|
}
|
|
|
|
// newOpenFile wraps an io.ReadCloser and checks the sha1sum
|
|
func newOpenFile(o *Object, resp *http.Response) *openFile {
|
|
file := &openFile{
|
|
o: o,
|
|
resp: resp,
|
|
hash: sha1.New(),
|
|
}
|
|
file.body = io.TeeReader(resp.Body, file.hash)
|
|
return file
|
|
}
|
|
|
|
// Read bytes from the object - see io.Reader
|
|
func (file *openFile) Read(p []byte) (n int, err error) {
|
|
n, err = file.body.Read(p)
|
|
file.bytes += int64(n)
|
|
if err == io.EOF {
|
|
file.eof = true
|
|
}
|
|
return
|
|
}
|
|
|
|
// Close the object and checks the length and SHA1 if all the object
|
|
// was read
|
|
func (file *openFile) Close() (err error) {
|
|
// Close the body at the end
|
|
defer fs.CheckClose(file.resp.Body, &err)
|
|
|
|
// If not end of file then can't check SHA1
|
|
if !file.eof {
|
|
return nil
|
|
}
|
|
|
|
// Check to see we read the correct number of bytes
|
|
if file.o.Size() != file.bytes {
|
|
return errors.Errorf("object corrupted on transfer - length mismatch (want %d got %d)", file.o.Size(), file.bytes)
|
|
}
|
|
|
|
// Check the SHA1
|
|
receivedSHA1 := file.o.sha1
|
|
calculatedSHA1 := fmt.Sprintf("%x", file.hash.Sum(nil))
|
|
if receivedSHA1 != "" && receivedSHA1 != calculatedSHA1 {
|
|
return errors.Errorf("object corrupted on transfer - SHA1 mismatch (want %q got %q)", receivedSHA1, calculatedSHA1)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Check it satisfies the interfaces
|
|
var _ io.ReadCloser = &openFile{}
|
|
|
|
// Open an object for read
|
|
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
|
opts := rest.Opts{
|
|
Method: "GET",
|
|
Options: options,
|
|
}
|
|
|
|
// Use downloadUrl from backblaze if downloadUrl is not set
|
|
// otherwise use the custom downloadUrl
|
|
if o.fs.opt.DownloadURL == "" {
|
|
opts.RootURL = o.fs.info.DownloadURL
|
|
} else {
|
|
opts.RootURL = o.fs.opt.DownloadURL
|
|
}
|
|
|
|
// Download by id if set otherwise by name
|
|
if o.id != "" {
|
|
opts.Path += "/b2api/v1/b2_download_file_by_id?fileId=" + urlEncode(o.id)
|
|
} else {
|
|
opts.Path += "/file/" + urlEncode(o.fs.bucket) + "/" + urlEncode(o.fs.root+o.remote)
|
|
}
|
|
var resp *http.Response
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
resp, err = o.fs.srv.Call(&opts)
|
|
return o.fs.shouldRetry(resp, err)
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to open for download")
|
|
}
|
|
|
|
// Parse the time out of the headers if possible
|
|
err = o.parseTimeString(resp.Header.Get(timeHeader))
|
|
if err != nil {
|
|
_ = resp.Body.Close()
|
|
return nil, err
|
|
}
|
|
// Read sha1 from header if it isn't set
|
|
if o.sha1 == "" {
|
|
o.sha1 = resp.Header.Get(sha1Header)
|
|
fs.Debugf(o, "Reading sha1 from header - %q", o.sha1)
|
|
// if sha1 header is "none" (in big files), then need
|
|
// to read it from the metadata
|
|
if o.sha1 == "none" {
|
|
o.sha1 = resp.Header.Get(sha1InfoHeader)
|
|
fs.Debugf(o, "Reading sha1 from info - %q", o.sha1)
|
|
}
|
|
}
|
|
// Don't check length or hash on partial content
|
|
if resp.StatusCode == http.StatusPartialContent {
|
|
return resp.Body, nil
|
|
}
|
|
return newOpenFile(o, resp), nil
|
|
}
|
|
|
|
// dontEncode is the characters that do not need percent-encoding
|
|
//
|
|
// The characters that do not need percent-encoding are a subset of
|
|
// the printable ASCII characters: upper-case letters, lower-case
|
|
// letters, digits, ".", "_", "-", "/", "~", "!", "$", "'", "(", ")",
|
|
// "*", ";", "=", ":", and "@". All other byte values in a UTF-8 must
|
|
// be replaced with "%" and the two-digit hex value of the byte.
|
|
const dontEncode = (`abcdefghijklmnopqrstuvwxyz` +
|
|
`ABCDEFGHIJKLMNOPQRSTUVWXYZ` +
|
|
`0123456789` +
|
|
`._-/~!$'()*;=:@`)
|
|
|
|
// noNeedToEncode is a bitmap of characters which don't need % encoding
|
|
var noNeedToEncode [256]bool
|
|
|
|
func init() {
|
|
for _, c := range dontEncode {
|
|
noNeedToEncode[c] = true
|
|
}
|
|
}
|
|
|
|
// urlEncode encodes in with % encoding
|
|
func urlEncode(in string) string {
|
|
var out bytes.Buffer
|
|
for i := 0; i < len(in); i++ {
|
|
c := in[i]
|
|
if noNeedToEncode[c] {
|
|
_ = out.WriteByte(c)
|
|
} else {
|
|
_, _ = out.WriteString(fmt.Sprintf("%%%2X", c))
|
|
}
|
|
}
|
|
return out.String()
|
|
}
|
|
|
|
// Update the object with the contents of the io.Reader, modTime and size
|
|
//
|
|
// The new object may have been created if an error is returned
|
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
|
if o.fs.opt.Versions {
|
|
return errNotWithVersions
|
|
}
|
|
err = o.fs.Mkdir(ctx, "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
size := src.Size()
|
|
|
|
if size == -1 {
|
|
// Check if the file is large enough for a chunked upload (needs to be at least two chunks)
|
|
buf := o.fs.getUploadBlock()
|
|
n, err := io.ReadFull(in, buf)
|
|
if err == nil {
|
|
bufReader := bufio.NewReader(in)
|
|
in = bufReader
|
|
_, err = bufReader.Peek(1)
|
|
}
|
|
|
|
if err == nil {
|
|
fs.Debugf(o, "File is big enough for chunked streaming")
|
|
up, err := o.fs.newLargeUpload(ctx, o, in, src)
|
|
if err != nil {
|
|
o.fs.putUploadBlock(buf)
|
|
return err
|
|
}
|
|
return up.Stream(buf)
|
|
} else if err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n)
|
|
defer o.fs.putUploadBlock(buf)
|
|
size = int64(n)
|
|
in = bytes.NewReader(buf[:n])
|
|
} else {
|
|
return err
|
|
}
|
|
} else if size > int64(o.fs.opt.UploadCutoff) {
|
|
up, err := o.fs.newLargeUpload(ctx, o, in, src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return up.Upload()
|
|
}
|
|
|
|
modTime := src.ModTime(ctx)
|
|
|
|
calculatedSha1, _ := src.Hash(ctx, hash.SHA1)
|
|
if calculatedSha1 == "" {
|
|
calculatedSha1 = "hex_digits_at_end"
|
|
har := newHashAppendingReader(in, sha1.New())
|
|
size += int64(har.AdditionalLength())
|
|
in = har
|
|
}
|
|
|
|
// Get upload URL
|
|
upload, err := o.fs.getUploadURL()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
// return it like this because we might nil it out
|
|
o.fs.returnUploadURL(upload)
|
|
}()
|
|
|
|
// Headers for upload file
|
|
//
|
|
// Authorization
|
|
// required
|
|
// An upload authorization token, from b2_get_upload_url.
|
|
//
|
|
// X-Bz-File-Name
|
|
// required
|
|
//
|
|
// The name of the file, in percent-encoded UTF-8. See Files for requirements on file names. See String Encoding.
|
|
//
|
|
// Content-Type
|
|
// required
|
|
//
|
|
// The MIME type of the content of the file, which will be returned in
|
|
// the Content-Type header when downloading the file. Use the
|
|
// Content-Type b2/x-auto to automatically set the stored Content-Type
|
|
// post upload. In the case where a file extension is absent or the
|
|
// lookup fails, the Content-Type is set to application/octet-stream. The
|
|
// Content-Type mappings can be pursued here.
|
|
//
|
|
// X-Bz-Content-Sha1
|
|
// required
|
|
//
|
|
// The SHA1 checksum of the content of the file. B2 will check this when
|
|
// the file is uploaded, to make sure that the file arrived correctly. It
|
|
// will be returned in the X-Bz-Content-Sha1 header when the file is
|
|
// downloaded.
|
|
//
|
|
// X-Bz-Info-src_last_modified_millis
|
|
// optional
|
|
//
|
|
// If the original source of the file being uploaded has a last modified
|
|
// time concept, Backblaze recommends using this spelling of one of your
|
|
// ten X-Bz-Info-* headers (see below). Using a standard spelling allows
|
|
// different B2 clients and the B2 web user interface to interoperate
|
|
// correctly. The value should be a base 10 number which represents a UTC
|
|
// time when the original source file was last modified. It is a base 10
|
|
// number of milliseconds since midnight, January 1, 1970 UTC. This fits
|
|
// in a 64 bit integer such as the type "long" in the programming
|
|
// language Java. It is intended to be compatible with Java's time
|
|
// long. For example, it can be passed directly into the Java call
|
|
// Date.setTime(long time).
|
|
//
|
|
// X-Bz-Info-*
|
|
// optional
|
|
//
|
|
// Up to 10 of these headers may be present. The * part of the header
|
|
// name is replace with the name of a custom field in the file
|
|
// information stored with the file, and the value is an arbitrary UTF-8
|
|
// string, percent-encoded. The same info headers sent with the upload
|
|
// will be returned with the download.
|
|
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
RootURL: upload.UploadURL,
|
|
Body: in,
|
|
ExtraHeaders: map[string]string{
|
|
"Authorization": upload.AuthorizationToken,
|
|
"X-Bz-File-Name": urlEncode(o.fs.root + o.remote),
|
|
"Content-Type": fs.MimeType(ctx, src),
|
|
sha1Header: calculatedSha1,
|
|
timeHeader: timeString(modTime),
|
|
},
|
|
ContentLength: &size,
|
|
}
|
|
var response api.FileInfo
|
|
// Don't retry, return a retry error instead
|
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
|
resp, err := o.fs.srv.CallJSON(&opts, nil, &response)
|
|
retry, err := o.fs.shouldRetry(resp, err)
|
|
// On retryable error clear UploadURL
|
|
if retry {
|
|
fs.Debugf(o, "Clearing upload URL because of error: %v", err)
|
|
upload = nil
|
|
}
|
|
return retry, err
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return o.decodeMetaDataFileInfo(&response)
|
|
}
|
|
|
|
// Remove an object
|
|
func (o *Object) Remove(ctx context.Context) error {
|
|
if o.fs.opt.Versions {
|
|
return errNotWithVersions
|
|
}
|
|
if o.fs.opt.HardDelete {
|
|
return o.fs.deleteByID(o.id, o.fs.root+o.remote)
|
|
}
|
|
return o.fs.hide(o.fs.root + o.remote)
|
|
}
|
|
|
|
// MimeType of an Object if known, "" otherwise
|
|
func (o *Object) MimeType(ctx context.Context) string {
|
|
return o.mimeType
|
|
}
|
|
|
|
// ID returns the ID of the Object if known, or "" if not
|
|
func (o *Object) ID() string {
|
|
return o.id
|
|
}
|
|
|
|
// Check the interfaces are satisfied
|
|
var (
|
|
_ fs.Fs = &Fs{}
|
|
_ fs.Purger = &Fs{}
|
|
_ fs.Copier = &Fs{}
|
|
_ fs.PutStreamer = &Fs{}
|
|
_ fs.CleanUpper = &Fs{}
|
|
_ fs.ListRer = &Fs{}
|
|
_ fs.PublicLinker = &Fs{}
|
|
_ fs.Object = &Object{}
|
|
_ fs.MimeTyper = &Object{}
|
|
_ fs.IDer = &Object{}
|
|
)
|