restic/internal/backend/b2/b2.go
Alexander Neumann b5062959c8 backend: Relax requirement for new files
Before, all backend implementations were required to return an error if
the file that is to be written already exists in the backend. For most
backends, that means making a request (e.g. via HTTP) and returning an
error when the file already exists.

This is not accurate, the file could have been created between the HTTP
request testing for it, and when writing starts. In addition, apart from
the `config` file in the repo, all other file names have pseudo-random
names with a very very low probability of a collision. And even if a
file name is written again, the way the restic repo is structured this
just means that the same content is placed there again. Which is not a
problem, just not very efficient.

So, this commit relaxes the requirement to return an error when the file
in the backend already exists, which allows reducing the number of API
requests and thereby the latency for remote backends.
2018-02-17 22:39:18 +01:00

359 lines
7.9 KiB
Go

package b2
import (
"context"
"io"
"net/http"
"path"
"strings"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/kurin/blazer/b2"
)
// b2Backend is a backend which stores its data on Backblaze B2.
type b2Backend struct {
client *b2.Client
bucket *b2.Bucket
cfg Config
listMaxItems int
backend.Layout
sem *backend.Semaphore
}
const defaultListMaxItems = 1000
// ensure statically that *b2Backend implements restic.Backend.
var _ restic.Backend = &b2Backend{}
func newClient(ctx context.Context, cfg Config, rt http.RoundTripper) (*b2.Client, error) {
opts := []b2.ClientOption{b2.Transport(rt)}
c, err := b2.NewClient(ctx, cfg.AccountID, cfg.Key, opts...)
if err != nil {
return nil, errors.Wrap(err, "b2.NewClient")
}
return c, nil
}
// Open opens a connection to the B2 service.
func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) {
debug.Log("cfg %#v", cfg)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
client, err := newClient(ctx, cfg, rt)
if err != nil {
return nil, err
}
bucket, err := client.Bucket(ctx, cfg.Bucket)
if err != nil {
return nil, errors.Wrap(err, "Bucket")
}
sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
be := &b2Backend{
client: client,
bucket: bucket,
cfg: cfg,
Layout: &backend.DefaultLayout{
Join: path.Join,
Path: cfg.Prefix,
},
listMaxItems: defaultListMaxItems,
sem: sem,
}
return be, nil
}
// Create opens a connection to the B2 service. If the bucket does not exist yet,
// it is created.
func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) {
debug.Log("cfg %#v", cfg)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
client, err := newClient(ctx, cfg, rt)
if err != nil {
return nil, err
}
attr := b2.BucketAttrs{
Type: b2.Private,
}
bucket, err := client.NewBucket(ctx, cfg.Bucket, &attr)
if err != nil {
return nil, errors.Wrap(err, "NewBucket")
}
sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
be := &b2Backend{
client: client,
bucket: bucket,
cfg: cfg,
Layout: &backend.DefaultLayout{
Join: path.Join,
Path: cfg.Prefix,
},
listMaxItems: defaultListMaxItems,
sem: sem,
}
present, err := be.Test(ctx, restic.Handle{Type: restic.ConfigFile})
if err != nil {
return nil, err
}
if present {
return nil, errors.New("config already exists")
}
return be, nil
}
// SetListMaxItems sets the number of list items to load per request.
func (be *b2Backend) SetListMaxItems(i int) {
be.listMaxItems = i
}
// Location returns the location for the backend.
func (be *b2Backend) Location() string {
return be.cfg.Bucket
}
// IsNotExist returns true if the error is caused by a non-existing file.
func (be *b2Backend) IsNotExist(err error) bool {
return b2.IsNotExist(errors.Cause(err))
}
// Load returns the data stored in the backend for h at the given offset
// and saves it in p. Load has the same semantics as io.ReaderAt.
func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil {
return nil, err
}
if offset < 0 {
return nil, errors.New("offset is negative")
}
if length < 0 {
return nil, errors.Errorf("invalid length %d", length)
}
ctx, cancel := context.WithCancel(ctx)
be.sem.GetToken()
name := be.Layout.Filename(h)
obj := be.bucket.Object(name)
if offset == 0 && length == 0 {
rd := obj.NewReader(ctx)
return be.sem.ReleaseTokenOnClose(rd, cancel), nil
}
// pass a negative length to NewRangeReader so that the remainder of the
// file is read.
if length == 0 {
length = -1
}
rd := obj.NewRangeReader(ctx, offset, int64(length))
return be.sem.ReleaseTokenOnClose(rd, cancel), nil
}
// Save stores data in the backend at the handle.
func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := h.Valid(); err != nil {
return err
}
be.sem.GetToken()
defer be.sem.ReleaseToken()
name := be.Filename(h)
debug.Log("Save %v, name %v", h, name)
obj := be.bucket.Object(name)
w := obj.NewWriter(ctx)
n, err := io.Copy(w, rd)
debug.Log(" saved %d bytes, err %v", n, err)
if err != nil {
_ = w.Close()
return errors.Wrap(err, "Copy")
}
return errors.Wrap(w.Close(), "Close")
}
// Stat returns information about a blob.
func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) {
debug.Log("Stat %v", h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
name := be.Filename(h)
obj := be.bucket.Object(name)
info, err := obj.Attrs(ctx)
if err != nil {
debug.Log("Attrs() err %v", err)
return restic.FileInfo{}, errors.Wrap(err, "Stat")
}
return restic.FileInfo{Size: info.Size, Name: h.Name}, nil
}
// Test returns true if a blob of the given type and name exists in the backend.
func (be *b2Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
debug.Log("Test %v", h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
found := false
name := be.Filename(h)
obj := be.bucket.Object(name)
info, err := obj.Attrs(ctx)
if err == nil && info != nil && info.Status == b2.Uploaded {
found = true
}
return found, nil
}
// Remove removes the blob with the given name and type.
func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error {
debug.Log("Remove %v", h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
obj := be.bucket.Object(be.Filename(h))
return errors.Wrap(obj.Delete(ctx), "Delete")
}
// List returns a channel that yields all names of blobs of type t. A
// goroutine is started for this. If the channel done is closed, sending
// stops.
func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
debug.Log("List %v", t)
prefix, _ := be.Basedir(t)
cur := &b2.Cursor{Prefix: prefix}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
be.sem.GetToken()
objs, c, err := be.bucket.ListCurrentObjects(ctx, be.listMaxItems, cur)
be.sem.ReleaseToken()
if err != nil && err != io.EOF {
debug.Log("List: %v", err)
return err
}
debug.Log("returned %v items", len(objs))
for _, obj := range objs {
// Skip objects returned that do not have the specified prefix.
if !strings.HasPrefix(obj.Name(), prefix) {
continue
}
m := path.Base(obj.Name())
if m == "" {
continue
}
if ctx.Err() != nil {
return ctx.Err()
}
if ctx.Err() != nil {
return ctx.Err()
}
attrs, err := obj.Attrs(ctx)
if err != nil {
return err
}
fi := restic.FileInfo{
Name: m,
Size: attrs.Size,
}
err = fn(fi)
if err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
}
if err == io.EOF {
return ctx.Err()
}
cur = c
}
return ctx.Err()
}
// Remove keys for a specified backend type.
func (be *b2Backend) removeKeys(ctx context.Context, t restic.FileType) error {
debug.Log("removeKeys %v", t)
return be.List(ctx, t, func(fi restic.FileInfo) error {
return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name})
})
}
// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
func (be *b2Backend) Delete(ctx context.Context) error {
alltypes := []restic.FileType{
restic.DataFile,
restic.KeyFile,
restic.LockFile,
restic.SnapshotFile,
restic.IndexFile}
for _, t := range alltypes {
err := be.removeKeys(ctx, t)
if err != nil {
return nil
}
}
err := be.Remove(ctx, restic.Handle{Type: restic.ConfigFile})
if err != nil && b2.IsNotExist(errors.Cause(err)) {
err = nil
}
return err
}
// Close does nothing
func (be *b2Backend) Close() error { return nil }