forked from TrueCloudLab/restic
8f5ff379b7
Prepends the object path prefix to all s3 paths and allows to have multiple independent restic backup repositories in a single s3 bucket. Removed the hardcoded "restic" prefix from s3 paths. Use "restic" as the default object path prefix for s3 if no other prefix gets specified. This will retain backward compatibility with existing s3 repository configurations. Simplified the parse flow to have a single point where we parse the bucket name and the prefix within the bucket. Added tests for s3 object path prefix and the new default prefix to config_test and location_test.
230 lines
5.3 KiB
Go
230 lines
5.3 KiB
Go
package s3
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"io"
|
|
"strings"
|
|
|
|
"github.com/minio/minio-go"
|
|
|
|
"github.com/restic/restic/backend"
|
|
"github.com/restic/restic/debug"
|
|
)
|
|
|
|
const connLimit = 10
|
|
|
|
func s3path(prefix string, t backend.Type, name string) string {
|
|
if t == backend.Config {
|
|
return prefix + "/" + string(t)
|
|
}
|
|
return prefix + "/" + string(t) + "/" + name
|
|
}
|
|
|
|
// s3 is a backend which stores the data on an S3 endpoint.
|
|
type s3 struct {
|
|
client minio.CloudStorageClient
|
|
connChan chan struct{}
|
|
bucketname string
|
|
prefix string
|
|
}
|
|
|
|
// Open opens the S3 backend at bucket and region. The bucket is created if it
|
|
// does not exist yet.
|
|
func Open(cfg Config) (backend.Backend, error) {
|
|
debug.Log("s3.Open", "open, config %#v", cfg)
|
|
|
|
client, err := minio.New(cfg.Endpoint, cfg.KeyID, cfg.Secret, cfg.UseHTTP)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
be := &s3{client: client, bucketname: cfg.Bucket, prefix: cfg.Prefix}
|
|
be.createConnections()
|
|
|
|
if err := client.BucketExists(cfg.Bucket); err != nil {
|
|
debug.Log("s3.Open", "BucketExists(%v) returned err %v, trying to create the bucket", cfg.Bucket, err)
|
|
|
|
// create new bucket with default ACL in default region
|
|
err = client.MakeBucket(cfg.Bucket, "", "")
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return be, nil
|
|
}
|
|
|
|
func (be *s3) createConnections() {
|
|
be.connChan = make(chan struct{}, connLimit)
|
|
for i := 0; i < connLimit; i++ {
|
|
be.connChan <- struct{}{}
|
|
}
|
|
}
|
|
|
|
// Location returns this backend's location (the bucket name).
|
|
func (be *s3) Location() string {
|
|
return be.bucketname
|
|
}
|
|
|
|
// Load returns the data stored in the backend for h at the given offset
|
|
// and saves it in p. Load has the same semantics as io.ReaderAt.
|
|
func (be s3) Load(h backend.Handle, p []byte, off int64) (int, error) {
|
|
debug.Log("s3.Load", "%v, offset %v, len %v", h, off, len(p))
|
|
path := s3path(be.prefix, h.Type, h.Name)
|
|
obj, err := be.client.GetObject(be.bucketname, path)
|
|
if err != nil {
|
|
debug.Log("s3.GetReader", " err %v", err)
|
|
return 0, err
|
|
}
|
|
|
|
if off > 0 {
|
|
_, err = obj.Seek(off, 0)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
<-be.connChan
|
|
defer func() {
|
|
be.connChan <- struct{}{}
|
|
}()
|
|
return io.ReadFull(obj, p)
|
|
}
|
|
|
|
// Save stores data in the backend at the handle.
|
|
func (be s3) Save(h backend.Handle, p []byte) (err error) {
|
|
if err := h.Valid(); err != nil {
|
|
return err
|
|
}
|
|
|
|
debug.Log("s3.Save", "%v bytes at %d", len(p), h)
|
|
|
|
path := s3path(be.prefix, h.Type, h.Name)
|
|
|
|
// Check key does not already exist
|
|
_, err = be.client.StatObject(be.bucketname, path)
|
|
if err == nil {
|
|
debug.Log("s3.blob.Finalize()", "%v already exists", h)
|
|
return errors.New("key already exists")
|
|
}
|
|
|
|
<-be.connChan
|
|
defer func() {
|
|
be.connChan <- struct{}{}
|
|
}()
|
|
|
|
debug.Log("s3.Save", "PutObject(%v, %v, %v, %v)",
|
|
be.bucketname, path, int64(len(p)), "binary/octet-stream")
|
|
n, err := be.client.PutObject(be.bucketname, path, bytes.NewReader(p), "binary/octet-stream")
|
|
debug.Log("s3.Save", "%v -> %v bytes, err %#v", path, n, err)
|
|
|
|
return err
|
|
}
|
|
|
|
// Stat returns information about a blob.
|
|
func (be s3) Stat(h backend.Handle) (backend.BlobInfo, error) {
|
|
debug.Log("s3.Stat", "%v")
|
|
path := s3path(be.prefix, h.Type, h.Name)
|
|
obj, err := be.client.GetObject(be.bucketname, path)
|
|
if err != nil {
|
|
debug.Log("s3.Stat", "GetObject() err %v", err)
|
|
return backend.BlobInfo{}, err
|
|
}
|
|
|
|
fi, err := obj.Stat()
|
|
if err != nil {
|
|
debug.Log("s3.Stat", "Stat() err %v", err)
|
|
return backend.BlobInfo{}, err
|
|
}
|
|
|
|
return backend.BlobInfo{Size: fi.Size}, nil
|
|
}
|
|
|
|
// Test returns true if a blob of the given type and name exists in the backend.
|
|
func (be *s3) Test(t backend.Type, name string) (bool, error) {
|
|
found := false
|
|
path := s3path(be.prefix, t, name)
|
|
_, err := be.client.StatObject(be.bucketname, path)
|
|
if err == nil {
|
|
found = true
|
|
}
|
|
|
|
// If error, then not found
|
|
return found, nil
|
|
}
|
|
|
|
// Remove removes the blob with the given name and type.
|
|
func (be *s3) Remove(t backend.Type, name string) error {
|
|
path := s3path(be.prefix, t, name)
|
|
err := be.client.RemoveObject(be.bucketname, path)
|
|
debug.Log("s3.Remove", "%v %v -> err %v", t, name, err)
|
|
return err
|
|
}
|
|
|
|
// List returns a channel that yields all names of blobs of type t. A
|
|
// goroutine is started for this. If the channel done is closed, sending
|
|
// stops.
|
|
func (be *s3) List(t backend.Type, done <-chan struct{}) <-chan string {
|
|
debug.Log("s3.List", "listing %v", t)
|
|
ch := make(chan string)
|
|
|
|
prefix := s3path(be.prefix, t, "")
|
|
|
|
listresp := be.client.ListObjects(be.bucketname, prefix, true, done)
|
|
|
|
go func() {
|
|
defer close(ch)
|
|
for obj := range listresp {
|
|
m := strings.TrimPrefix(obj.Key, prefix)
|
|
if m == "" {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case ch <- m:
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
// Remove keys for a specified backend type.
|
|
func (be *s3) removeKeys(t backend.Type) error {
|
|
done := make(chan struct{})
|
|
defer close(done)
|
|
for key := range be.List(backend.Data, done) {
|
|
err := be.Remove(backend.Data, key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
|
|
func (be *s3) Delete() error {
|
|
alltypes := []backend.Type{
|
|
backend.Data,
|
|
backend.Key,
|
|
backend.Lock,
|
|
backend.Snapshot,
|
|
backend.Index}
|
|
|
|
for _, t := range alltypes {
|
|
err := be.removeKeys(t)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return be.Remove(backend.Config, "")
|
|
}
|
|
|
|
// Close does nothing
|
|
func (be *s3) Close() error { return nil }
|