Add Backblaze B2 backend

This is based on prior work by Joe Turgeon <arithmetric@gmail.com>
@arithmetric.
This commit is contained in:
Alexander Neumann 2017-05-28 10:19:01 +02:00
parent 2217b9277e
commit 122462b9b1
10 changed files with 751 additions and 0 deletions

View file

@ -282,6 +282,35 @@ this command.
Please note that knowledge of your password is required to access
the repository. Losing your password means that your data is irrecoverably lost.
Backblaze B2
~~~~~~~~~~~~
Restic can backup data to any Backblaze B2 bucket. You need to first setup the
following environment variables with the credentials you obtained when signed
into your B2 account:
.. code-block:: console
$ export B2_ACCOUNT_ID=<MY_ACCOUNT_ID>
$ export B2_ACCOUNT_KEY=<MY_SECRET_ACCOUNT_KEY>
You can then easily initialize a repository stored at Backblaze B2. If the
bucket does not exist yet, it will be created:
.. code-block:: console
$ restic -r b2:bucketname:path/to/repo init
enter password for new backend:
enter password again:
created restic backend eefee03bbd at b2:bucketname:path/to/repo
Please note that knowledge of your password is required to access the repository.
Losing your password means that your data is irrecoverably lost.
The number of concurrent connections to the B2 service can be set with the `-o
b2.connections=10`. By default, at most five parallel connections are
established.
Password prompt on Windows
~~~~~~~~~~~~~~~~~~~~~~~~~~

View file

@ -164,6 +164,13 @@ func (env *TravisEnvironment) RunTests() error {
msg("S3 repository not available\n")
}
// if the test b2 repository is available, make sure that the test is not skipped
if os.Getenv("RESTIC_TEST_B2_REPOSITORY") != "" {
ensureTests = append(ensureTests, "restic/backend/b2.TestBackendB2")
} else {
msg("B2 repository not available\n")
}
env.env["RESTIC_TEST_DISALLOW_SKIP"] = strings.Join(ensureTests, ",")
if *runCrossCompile {

View file

@ -11,6 +11,7 @@ import (
"strings"
"syscall"
"restic/backend/b2"
"restic/backend/local"
"restic/backend/location"
"restic/backend/rest"
@ -356,6 +357,23 @@ func parseConfig(loc location.Location, opts options.Options) (interface{}, erro
debug.Log("opening s3 repository at %#v", cfg)
return cfg, nil
case "b2":
cfg := loc.Config.(b2.Config)
if cfg.AccountID == "" {
cfg.AccountID = os.Getenv("B2_ACCOUNT_ID")
}
if cfg.Key == "" {
cfg.Key = os.Getenv("B2_ACCOUNT_KEY")
}
if err := opts.Apply(loc.Scheme, &cfg); err != nil {
return nil, err
}
debug.Log("opening b2 repository at %#v", cfg)
return cfg, nil
case "rest":
cfg := loc.Config.(rest.Config)
if err := opts.Apply(loc.Scheme, &cfg); err != nil {
@ -391,6 +409,8 @@ func open(s string, opts options.Options) (restic.Backend, error) {
be, err = sftp.Open(cfg.(sftp.Config))
case "s3":
be, err = s3.Open(cfg.(s3.Config))
case "b2":
be, err = b2.Open(cfg.(b2.Config))
case "rest":
be, err = rest.Open(cfg.(rest.Config))
@ -435,6 +455,8 @@ func create(s string, opts options.Options) (restic.Backend, error) {
return sftp.Create(cfg.(sftp.Config))
case "s3":
return s3.Open(cfg.(s3.Config))
case "b2":
return b2.Create(cfg.(b2.Config))
case "rest":
return rest.Create(cfg.(rest.Config))
}

371
src/restic/backend/b2/b2.go Normal file
View file

@ -0,0 +1,371 @@
package b2
import (
"context"
"io"
"path"
"restic"
"strings"
"restic/backend"
"restic/debug"
"restic/errors"
"github.com/kurin/blazer/b2"
)
// b2Backend is a backend which stores its data on Backblaze B2.
type b2Backend struct {
client *b2.Client
bucket *b2.Bucket
cfg Config
backend.Layout
sem *backend.Semaphore
}
func newClient(ctx context.Context, cfg Config) (*b2.Client, error) {
opts := []b2.ClientOption{b2.Transport(backend.Transport())}
c, err := b2.NewClient(ctx, cfg.AccountID, cfg.Key, opts...)
if err != nil {
return nil, errors.Wrap(err, "b2.NewClient")
}
return c, nil
}
// Open opens a connection to the B2 service.
func Open(cfg Config) (restic.Backend, error) {
debug.Log("cfg %#v", cfg)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
client, err := newClient(ctx, cfg)
if err != nil {
return nil, err
}
bucket, err := client.Bucket(ctx, cfg.Bucket)
if err != nil {
return nil, errors.Wrap(err, "Bucket")
}
be := &b2Backend{
client: client,
bucket: bucket,
cfg: cfg,
Layout: &backend.DefaultLayout{
Join: path.Join,
Path: cfg.Prefix,
},
sem: backend.NewSemaphore(cfg.Connections),
}
return be, nil
}
// Create opens a connection to the B2 service. If the bucket does not exist yet,
// it is created.
func Create(cfg Config) (restic.Backend, error) {
debug.Log("cfg %#v", cfg)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
client, err := newClient(ctx, cfg)
if err != nil {
return nil, err
}
attr := b2.BucketAttrs{
Type: b2.Private,
}
bucket, err := client.NewBucket(ctx, cfg.Bucket, &attr)
if err != nil {
return nil, errors.Wrap(err, "NewBucket")
}
be := &b2Backend{
client: client,
bucket: bucket,
cfg: cfg,
Layout: &backend.DefaultLayout{
Join: path.Join,
Path: cfg.Prefix,
},
sem: backend.NewSemaphore(cfg.Connections),
}
present, err := be.Test(restic.Handle{Type: restic.ConfigFile})
if err != nil {
return nil, err
}
if present {
return nil, errors.New("config already exists")
}
return be, nil
}
// Location returns the location for the backend.
func (be *b2Backend) Location() string {
return be.cfg.Bucket
}
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
type wrapReader struct {
io.ReadCloser
eofSeen bool
f func()
}
func (wr *wrapReader) Read(p []byte) (int, error) {
if wr.eofSeen {
return 0, io.EOF
}
n, err := wr.ReadCloser.Read(p)
if err == io.EOF {
wr.eofSeen = true
}
return n, err
}
func (wr *wrapReader) Close() error {
err := wr.ReadCloser.Close()
wr.f()
return err
}
// Load returns the data stored in the backend for h at the given offset
// and saves it in p. Load has the same semantics as io.ReaderAt.
func (be *b2Backend) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil {
return nil, err
}
if offset < 0 {
return nil, errors.New("offset is negative")
}
if length < 0 {
return nil, errors.Errorf("invalid length %d", length)
}
ctx, cancel := context.WithCancel(context.TODO())
be.sem.GetToken()
name := be.Layout.Filename(h)
obj := be.bucket.Object(name)
if offset == 0 && length == 0 {
rd := obj.NewReader(ctx)
wrapper := &wrapReader{
ReadCloser: rd,
f: func() {
cancel()
be.sem.ReleaseToken()
},
}
return wrapper, nil
}
// pass a negative length to NewRangeReader so that the remainder of the
// file is read.
if length == 0 {
length = -1
}
rd := obj.NewRangeReader(ctx, offset, int64(length))
wrapper := &wrapReader{
ReadCloser: rd,
f: func() {
cancel()
be.sem.ReleaseToken()
},
}
return wrapper, nil
}
// Save stores data in the backend at the handle.
func (be *b2Backend) Save(h restic.Handle, rd io.Reader) (err error) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
if err := h.Valid(); err != nil {
return err
}
be.sem.GetToken()
defer be.sem.ReleaseToken()
name := be.Filename(h)
debug.Log("Save %v, name %v", h, name)
obj := be.bucket.Object(name)
_, err = obj.Attrs(ctx)
if err == nil {
debug.Log(" %v already exists", h)
return errors.New("key already exists")
}
w := obj.NewWriter(ctx)
n, err := io.Copy(w, rd)
debug.Log(" saved %d bytes, err %v", n, err)
if err != nil {
_ = w.Close()
return errors.Wrap(err, "Copy")
}
return errors.Wrap(w.Close(), "Close")
}
// Stat returns information about a blob.
func (be *b2Backend) Stat(h restic.Handle) (bi restic.FileInfo, err error) {
debug.Log("Stat %v", h)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
be.sem.GetToken()
defer be.sem.ReleaseToken()
name := be.Filename(h)
obj := be.bucket.Object(name)
info, err := obj.Attrs(ctx)
if err != nil {
debug.Log("Attrs() err %v", err)
return restic.FileInfo{}, errors.Wrap(err, "Stat")
}
return restic.FileInfo{Size: info.Size}, nil
}
// Test returns true if a blob of the given type and name exists in the backend.
func (be *b2Backend) Test(h restic.Handle) (bool, error) {
debug.Log("Test %v", h)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
be.sem.GetToken()
defer be.sem.ReleaseToken()
found := false
name := be.Filename(h)
obj := be.bucket.Object(name)
info, err := obj.Attrs(ctx)
if err == nil && info != nil && info.Status == b2.Uploaded {
found = true
}
return found, nil
}
// Remove removes the blob with the given name and type.
func (be *b2Backend) Remove(h restic.Handle) error {
debug.Log("Remove %v", h)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
be.sem.GetToken()
defer be.sem.ReleaseToken()
obj := be.bucket.Object(be.Filename(h))
return errors.Wrap(obj.Delete(ctx), "Delete")
}
// List returns a channel that yields all names of blobs of type t. A
// goroutine is started for this. If the channel done is closed, sending
// stops.
func (be *b2Backend) List(t restic.FileType, done <-chan struct{}) <-chan string {
debug.Log("List %v", t)
ch := make(chan string)
ctx, cancel := context.WithCancel(context.TODO())
be.sem.GetToken()
go func() {
defer close(ch)
defer cancel()
defer be.sem.ReleaseToken()
prefix := be.Dirname(restic.Handle{Type: t})
cur := &b2.Cursor{Prefix: prefix}
for {
objs, c, err := be.bucket.ListCurrentObjects(ctx, 1000, cur)
if err != nil && err != io.EOF {
return
}
for _, obj := range objs {
// Skip objects returned that do not have the specified prefix.
if !strings.HasPrefix(obj.Name(), prefix) {
continue
}
m := path.Base(obj.Name())
if m == "" {
continue
}
select {
case ch <- m:
case <-done:
return
}
}
if err == io.EOF {
return
}
cur = c
}
}()
return ch
}
// Remove keys for a specified backend type.
func (be *b2Backend) removeKeys(t restic.FileType) error {
debug.Log("removeKeys %v", t)
done := make(chan struct{})
defer close(done)
for key := range be.List(t, done) {
err := be.Remove(restic.Handle{Type: t, Name: key})
if err != nil {
return err
}
}
return nil
}
// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
func (be *b2Backend) Delete() error {
alltypes := []restic.FileType{
restic.DataFile,
restic.KeyFile,
restic.LockFile,
restic.SnapshotFile,
restic.IndexFile}
for _, t := range alltypes {
err := be.removeKeys(t)
if err != nil {
return nil
}
}
err := be.Remove(restic.Handle{Type: restic.ConfigFile})
if err != nil && b2.IsNotExist(errors.Cause(err)) {
err = nil
}
return err
}
// Close does nothing
func (be *b2Backend) Close() error { return nil }

View file

@ -0,0 +1,93 @@
package b2_test
import (
"fmt"
"os"
"testing"
"time"
"restic"
"restic/backend/b2"
"restic/backend/test"
. "restic/test"
)
func newB2TestSuite(t testing.TB) *test.Suite {
return &test.Suite{
// do not use excessive data
MinimalData: true,
// NewConfig returns a config for a new temporary backend that will be used in tests.
NewConfig: func() (interface{}, error) {
b2cfg, err := b2.ParseConfig(os.Getenv("RESTIC_TEST_B2_REPOSITORY"))
if err != nil {
return nil, err
}
cfg := b2cfg.(b2.Config)
cfg.AccountID = os.Getenv("RESTIC_TEST_B2_ACCOUNT_ID")
cfg.Key = os.Getenv("RESTIC_TEST_B2_ACCOUNT_KEY")
cfg.Prefix = fmt.Sprintf("test-%d", time.Now().UnixNano())
return cfg, nil
},
// CreateFn is a function that creates a temporary repository for the tests.
Create: func(config interface{}) (restic.Backend, error) {
cfg := config.(b2.Config)
return b2.Create(cfg)
},
// OpenFn is a function that opens a previously created temporary repository.
Open: func(config interface{}) (restic.Backend, error) {
cfg := config.(b2.Config)
return b2.Open(cfg)
},
// CleanupFn removes data created during the tests.
Cleanup: func(config interface{}) error {
cfg := config.(b2.Config)
be, err := b2.Open(cfg)
if err != nil {
return err
}
if err := be.(restic.Deleter).Delete(); err != nil {
return err
}
return nil
},
}
}
func testVars(t testing.TB) {
vars := []string{
"RESTIC_TEST_B2_ACCOUNT_ID",
"RESTIC_TEST_B2_ACCOUNT_KEY",
"RESTIC_TEST_B2_REPOSITORY",
}
for _, v := range vars {
if os.Getenv(v) == "" {
t.Skipf("environment variable %v not set", v)
return
}
}
}
func TestBackendB2(t *testing.T) {
defer func() {
if t.Skipped() {
SkipDisallowed(t, "restic/backend/b2.TestBackendB2")
}
}()
testVars(t)
newB2TestSuite(t).RunTests(t)
}
func BenchmarkBackendb2(t *testing.B) {
testVars(t)
newB2TestSuite(t).RunBenchmarks(t)
}

View file

@ -0,0 +1,93 @@
package b2
import (
"path"
"regexp"
"strings"
"restic/errors"
"restic/options"
)
// Config contains all configuration necessary to connect to an b2 compatible
// server.
type Config struct {
AccountID string
Key string
Bucket string
Prefix string
Connections int `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
}
// NewConfig returns a new config with default options applied.
func NewConfig() Config {
return Config{
Connections: 5,
}
}
func init() {
options.Register("b2", Config{})
}
var bucketName = regexp.MustCompile("^[a-zA-Z0-9-]+$")
// checkBucketName tests the bucket name against the rules at
// https://help.backblaze.com/hc/en-us/articles/217666908-What-you-need-to-know-about-B2-Bucket-names
func checkBucketName(name string) error {
if name == "" {
return errors.New("bucket name is empty")
}
if len(name) < 6 {
return errors.New("bucket name is too short")
}
if len(name) > 50 {
return errors.New("bucket name is too long")
}
if !bucketName.MatchString(name) {
return errors.New("bucket name contains invalid characters, allowed are: a-z, 0-9, dash (-)")
}
return nil
}
// ParseConfig parses the string s and extracts the b2 config. The supported
// configuration format is b2:bucketname/prefix. If no prefix is given the
// prefix "restic" will be used.
func ParseConfig(s string) (interface{}, error) {
if !strings.HasPrefix(s, "b2:") {
return nil, errors.New("invalid format, want: b2:bucket-name[:path]")
}
s = s[3:]
data := strings.SplitN(s, ":", 2)
if len(data) == 0 || len(data[0]) == 0 {
return nil, errors.New("bucket name not found")
}
cfg := NewConfig()
cfg.Bucket = data[0]
if err := checkBucketName(cfg.Bucket); err != nil {
return nil, err
}
if len(data) == 2 {
p := data[1]
if len(p) > 0 {
p = path.Clean(p)
}
if len(p) > 0 && path.IsAbs(p) {
p = p[1:]
}
cfg.Prefix = p
}
return cfg, nil
}

View file

@ -0,0 +1,92 @@
package b2
import "testing"
var configTests = []struct {
s string
cfg Config
}{
{"b2:bucketname", Config{
Bucket: "bucketname",
Prefix: "",
Connections: 5,
}},
{"b2:bucketname:", Config{
Bucket: "bucketname",
Prefix: "",
Connections: 5,
}},
{"b2:bucketname:/prefix/directory", Config{
Bucket: "bucketname",
Prefix: "prefix/directory",
Connections: 5,
}},
{"b2:foobar", Config{
Bucket: "foobar",
Prefix: "",
Connections: 5,
}},
{"b2:foobar:", Config{
Bucket: "foobar",
Prefix: "",
Connections: 5,
}},
{"b2:foobar:/", Config{
Bucket: "foobar",
Prefix: "",
Connections: 5,
}},
}
func TestParseConfig(t *testing.T) {
for _, test := range configTests {
t.Run("", func(t *testing.T) {
cfg, err := ParseConfig(test.s)
if err != nil {
t.Fatalf("%s failed: %v", test.s, err)
}
if cfg != test.cfg {
t.Fatalf("input: %s\n wrong config, want:\n %#v\ngot:\n %#v",
test.s, test.cfg, cfg)
}
})
}
}
var invalidConfigTests = []struct {
s string
err string
}{
{
"b2",
"invalid format, want: b2:bucket-name[:path]",
},
{
"b2:",
"bucket name not found",
},
{
"b2:bucket_name",
"bucket name contains invalid characters, allowed are: a-z, 0-9, dash (-)",
},
{
"b2:bucketname/prefix/directory/",
"bucket name contains invalid characters, allowed are: a-z, 0-9, dash (-)",
},
}
func TestInvalidConfig(t *testing.T) {
for _, test := range invalidConfigTests {
t.Run("", func(t *testing.T) {
cfg, err := ParseConfig(test.s)
if err == nil {
t.Fatalf("expected error not found for invalid config: %v, cfg is:\n%#v", test.s, cfg)
}
if err.Error() != test.err {
t.Fatalf("unexpected error found, want:\n %v\ngot:\n %v", test.err, err.Error())
}
})
}
}

View file

@ -4,6 +4,7 @@ package location
import (
"strings"
"restic/backend/b2"
"restic/backend/local"
"restic/backend/rest"
"restic/backend/s3"
@ -25,6 +26,7 @@ type parser struct {
// parsers is a list of valid config parsers for the backends. The first parser
// is the fallback and should always be set to the local backend.
var parsers = []parser{
{"b2", b2.ParseConfig},
{"local", local.ParseConfig},
{"sftp", sftp.ParseConfig},
{"s3", s3.ParseConfig},

View file

@ -5,6 +5,7 @@ import (
"reflect"
"testing"
"restic/backend/b2"
"restic/backend/local"
"restic/backend/rest"
"restic/backend/s3"
@ -203,6 +204,24 @@ var parseTests = []struct {
},
},
},
{
"b2:bucketname:/prefix", Location{Scheme: "b2",
Config: b2.Config{
Bucket: "bucketname",
Prefix: "prefix",
Connections: 5,
},
},
},
{
"b2:bucketname", Location{Scheme: "b2",
Config: b2.Config{
Bucket: "bucketname",
Prefix: "",
Connections: 5,
},
},
},
}
func TestParse(t *testing.T) {

View file

@ -0,0 +1,23 @@
package backend
// Semaphore limits access to a restricted resource.
type Semaphore struct {
ch chan struct{}
}
// NewSemaphore returns a new semaphore with capacity n.
func NewSemaphore(n int) *Semaphore {
return &Semaphore{
ch: make(chan struct{}, n),
}
}
// GetToken blocks until a Token is available.
func (s *Semaphore) GetToken() {
s.ch <- struct{}{}
}
// ReleaseToken returns a token.
func (s *Semaphore) ReleaseToken() {
<-s.ch
}