forked from TrueCloudLab/restic
This is the first commit that removes the (redundant) Get() method of the backend interface. Get(x, y) is equivalent to GetReader(x, y, 0, 0).
258 lines
5.8 KiB
Go
258 lines
5.8 KiB
Go
package s3
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"io"
|
|
"strings"
|
|
|
|
"github.com/minio/minio-go"
|
|
|
|
"github.com/restic/restic/backend"
|
|
"github.com/restic/restic/debug"
|
|
)
|
|
|
|
const maxKeysInList = 1000
|
|
const connLimit = 10
|
|
const backendPrefix = "restic"
|
|
|
|
func s3path(t backend.Type, name string) string {
|
|
if t == backend.Config {
|
|
return backendPrefix + "/" + string(t)
|
|
}
|
|
return backendPrefix + "/" + string(t) + "/" + name
|
|
}
|
|
|
|
type S3Backend struct {
|
|
client minio.CloudStorageClient
|
|
connChan chan struct{}
|
|
bucketname string
|
|
}
|
|
|
|
// Open opens the S3 backend at bucket and region. The bucket is created if it
|
|
// does not exist yet.
|
|
func Open(cfg Config) (backend.Backend, error) {
|
|
debug.Log("s3.Open", "open, config %#v", cfg)
|
|
|
|
client, err := minio.New(cfg.Endpoint, cfg.KeyID, cfg.Secret, cfg.UseHTTP)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
be := &S3Backend{client: client, bucketname: cfg.Bucket}
|
|
be.createConnections()
|
|
|
|
if err := client.BucketExists(cfg.Bucket); err != nil {
|
|
debug.Log("s3.Open", "BucketExists(%v) returned err %v, trying to create the bucket", cfg.Bucket, err)
|
|
|
|
// create new bucket with default ACL in default region
|
|
err = client.MakeBucket(cfg.Bucket, "", "")
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return be, nil
|
|
}
|
|
|
|
func (be *S3Backend) createConnections() {
|
|
be.connChan = make(chan struct{}, connLimit)
|
|
for i := 0; i < connLimit; i++ {
|
|
be.connChan <- struct{}{}
|
|
}
|
|
}
|
|
|
|
// Location returns this backend's location (the bucket name).
|
|
func (be *S3Backend) Location() string {
|
|
return be.bucketname
|
|
}
|
|
|
|
type s3Blob struct {
|
|
b *S3Backend
|
|
buf *bytes.Buffer
|
|
final bool
|
|
}
|
|
|
|
func (bb *s3Blob) Write(p []byte) (int, error) {
|
|
if bb.final {
|
|
return 0, errors.New("blob already closed")
|
|
}
|
|
|
|
n, err := bb.buf.Write(p)
|
|
return n, err
|
|
}
|
|
|
|
func (bb *s3Blob) Read(p []byte) (int, error) {
|
|
return bb.buf.Read(p)
|
|
}
|
|
|
|
func (bb *s3Blob) Close() error {
|
|
bb.final = true
|
|
bb.buf.Reset()
|
|
return nil
|
|
}
|
|
|
|
func (bb *s3Blob) Size() uint {
|
|
return uint(bb.buf.Len())
|
|
}
|
|
|
|
func (bb *s3Blob) Finalize(t backend.Type, name string) error {
|
|
debug.Log("s3.blob.Finalize()", "bucket %v, finalize %v, %d bytes", bb.b.bucketname, name, bb.buf.Len())
|
|
if bb.final {
|
|
return errors.New("Already finalized")
|
|
}
|
|
|
|
bb.final = true
|
|
|
|
path := s3path(t, name)
|
|
|
|
// Check key does not already exist
|
|
_, err := bb.b.client.StatObject(bb.b.bucketname, path)
|
|
if err == nil {
|
|
debug.Log("s3.blob.Finalize()", "%v already exists", name)
|
|
return errors.New("key already exists")
|
|
}
|
|
|
|
expectedBytes := bb.buf.Len()
|
|
|
|
<-bb.b.connChan
|
|
debug.Log("s3.Finalize", "PutObject(%v, %v, %v, %v)",
|
|
bb.b.bucketname, path, int64(bb.buf.Len()), "binary/octet-stream")
|
|
n, err := bb.b.client.PutObject(bb.b.bucketname, path, bb.buf, "binary/octet-stream")
|
|
debug.Log("s3.Finalize", "finalized %v -> n %v, err %#v", path, n, err)
|
|
bb.b.connChan <- struct{}{}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if n != int64(expectedBytes) {
|
|
return errors.New("could not store all bytes")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Create creates a new Blob. The data is available only after Finalize()
|
|
// has been called on the returned Blob.
|
|
func (be *S3Backend) Create() (backend.Blob, error) {
|
|
blob := s3Blob{
|
|
b: be,
|
|
buf: &bytes.Buffer{},
|
|
}
|
|
|
|
return &blob, nil
|
|
}
|
|
|
|
// GetReader returns an io.ReadCloser for the Blob with the given name of
|
|
// type t at offset and length. If length is 0, the reader reads until EOF.
|
|
func (be *S3Backend) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) {
|
|
debug.Log("s3.GetReader", "%v %v, offset %v len %v", t, name, offset, length)
|
|
path := s3path(t, name)
|
|
obj, err := be.client.GetObject(be.bucketname, path)
|
|
if err != nil {
|
|
debug.Log("s3.GetReader", " err %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
if offset > 0 {
|
|
_, err = obj.Seek(int64(offset), 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if length == 0 {
|
|
return obj, nil
|
|
}
|
|
|
|
return backend.LimitReadCloser(obj, int64(length)), nil
|
|
}
|
|
|
|
// Test returns true if a blob of the given type and name exists in the backend.
|
|
func (be *S3Backend) Test(t backend.Type, name string) (bool, error) {
|
|
found := false
|
|
path := s3path(t, name)
|
|
_, err := be.client.StatObject(be.bucketname, path)
|
|
if err == nil {
|
|
found = true
|
|
}
|
|
|
|
// If error, then not found
|
|
return found, nil
|
|
}
|
|
|
|
// Remove removes the blob with the given name and type.
|
|
func (be *S3Backend) Remove(t backend.Type, name string) error {
|
|
path := s3path(t, name)
|
|
err := be.client.RemoveObject(be.bucketname, path)
|
|
debug.Log("s3.Remove", "%v %v -> err %v", t, name, err)
|
|
return err
|
|
}
|
|
|
|
// List returns a channel that yields all names of blobs of type t. A
|
|
// goroutine is started for this. If the channel done is closed, sending
|
|
// stops.
|
|
func (be *S3Backend) List(t backend.Type, done <-chan struct{}) <-chan string {
|
|
debug.Log("s3.List", "listing %v", t)
|
|
ch := make(chan string)
|
|
|
|
prefix := s3path(t, "")
|
|
|
|
listresp := be.client.ListObjects(be.bucketname, prefix, true, done)
|
|
|
|
go func() {
|
|
defer close(ch)
|
|
for obj := range listresp {
|
|
m := strings.TrimPrefix(obj.Key, prefix)
|
|
if m == "" {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case ch <- m:
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
// Remove keys for a specified backend type.
|
|
func (be *S3Backend) removeKeys(t backend.Type) error {
|
|
done := make(chan struct{})
|
|
defer close(done)
|
|
for key := range be.List(backend.Data, done) {
|
|
err := be.Remove(backend.Data, key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
|
|
func (be *S3Backend) Delete() error {
|
|
alltypes := []backend.Type{
|
|
backend.Data,
|
|
backend.Key,
|
|
backend.Lock,
|
|
backend.Snapshot,
|
|
backend.Index}
|
|
|
|
for _, t := range alltypes {
|
|
err := be.removeKeys(t)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return be.Remove(backend.Config, "")
|
|
}
|
|
|
|
// Close does nothing
|
|
func (be *S3Backend) Close() error { return nil }
|