Switch s3 library to allow for s3 compatible backends. Fixes #315
This commit is contained in:
parent
c969de7fad
commit
6d1552af51
68 changed files with 5994 additions and 4655 deletions
120
backend/s3/s3.go
120
backend/s3/s3.go
|
@ -3,13 +3,12 @@ package s3
|
|||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/amz.v3/aws"
|
||||
"gopkg.in/amz.v3/s3"
|
||||
"github.com/minio/minio-go"
|
||||
|
||||
"github.com/restic/restic/backend"
|
||||
)
|
||||
|
@ -26,41 +25,47 @@ func s3path(t backend.Type, name string) string {
|
|||
}
|
||||
|
||||
type S3Backend struct {
|
||||
bucket *s3.Bucket
|
||||
connChan chan struct{}
|
||||
path string
|
||||
s3api minio.API
|
||||
connChan chan struct{}
|
||||
bucketname string
|
||||
}
|
||||
|
||||
// Open a backend using an S3 bucket object
|
||||
func OpenS3Bucket(bucket *s3.Bucket, bucketname string) *S3Backend {
|
||||
// Open opens the S3 backend at bucket and region.
|
||||
func Open(regionname, bucketname string) (backend.Backend, error) {
|
||||
config := minio.Config{
|
||||
AccessKeyID: os.Getenv("AWS_ACCESS_KEY_ID"),
|
||||
SecretAccessKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
|
||||
}
|
||||
|
||||
if !strings.Contains(regionname, ".") {
|
||||
// Amazon region name
|
||||
switch regionname {
|
||||
case "us-east-1":
|
||||
config.Endpoint = "https://s3.amazonaws.com"
|
||||
default:
|
||||
config.Endpoint = "https://s3-" + regionname + ".amazonaws.com"
|
||||
}
|
||||
} else {
|
||||
// S3 compatible endpoint
|
||||
config.Endpoint = "https://" + regionname
|
||||
}
|
||||
|
||||
s3api, s3err := minio.New(config)
|
||||
if s3err != nil {
|
||||
return nil, s3err
|
||||
}
|
||||
|
||||
connChan := make(chan struct{}, connLimit)
|
||||
for i := 0; i < connLimit; i++ {
|
||||
connChan <- struct{}{}
|
||||
}
|
||||
|
||||
return &S3Backend{bucket: bucket, path: bucketname, connChan: connChan}
|
||||
}
|
||||
|
||||
// Open opens the S3 backend at bucket and region.
|
||||
func Open(regionname, bucketname string) (backend.Backend, error) {
|
||||
auth, err := aws.EnvAuth()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := s3.New(auth, aws.Regions[regionname])
|
||||
|
||||
s3bucket, s3err := client.Bucket(bucketname)
|
||||
if s3err != nil {
|
||||
return nil, s3err
|
||||
}
|
||||
|
||||
return OpenS3Bucket(s3bucket, bucketname), nil
|
||||
return &S3Backend{s3api: s3api, bucketname: bucketname, connChan: connChan}, nil
|
||||
}
|
||||
|
||||
// Location returns this backend's location (the bucket name).
|
||||
func (be *S3Backend) Location() string {
|
||||
return be.path
|
||||
return be.bucketname
|
||||
}
|
||||
|
||||
type s3Blob struct {
|
||||
|
@ -102,13 +107,13 @@ func (bb *s3Blob) Finalize(t backend.Type, name string) error {
|
|||
path := s3path(t, name)
|
||||
|
||||
// Check key does not already exist
|
||||
_, err := bb.b.bucket.GetReader(path)
|
||||
_, err := bb.b.s3api.StatObject(bb.b.bucketname, path)
|
||||
if err == nil {
|
||||
return errors.New("key already exists!")
|
||||
}
|
||||
|
||||
<-bb.b.connChan
|
||||
err = bb.b.bucket.PutReader(path, bb.buf, int64(bb.buf.Len()), "binary/octet-stream", "private")
|
||||
err = bb.b.s3api.PutObject(bb.b.bucketname, path, "application/octet-stream", int64(bb.buf.Len()), bb.buf)
|
||||
bb.b.connChan <- struct{}{}
|
||||
bb.buf.Reset()
|
||||
return err
|
||||
|
@ -129,36 +134,25 @@ func (be *S3Backend) Create() (backend.Blob, error) {
|
|||
// name. The reader should be closed after draining it.
|
||||
func (be *S3Backend) Get(t backend.Type, name string) (io.ReadCloser, error) {
|
||||
path := s3path(t, name)
|
||||
return be.bucket.GetReader(path)
|
||||
r, _, err := be.s3api.GetObject(be.bucketname, path)
|
||||
rc := ioutil.NopCloser(r)
|
||||
return rc, err
|
||||
}
|
||||
|
||||
// GetReader returns an io.ReadCloser for the Blob with the given name of
|
||||
// type t at offset and length. If length is 0, the reader reads until EOF.
|
||||
func (be *S3Backend) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) {
|
||||
rc, err := be.Get(t, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
n, errc := io.CopyN(ioutil.Discard, rc, int64(offset))
|
||||
if errc != nil {
|
||||
return nil, errc
|
||||
} else if n != int64(offset) {
|
||||
return nil, fmt.Errorf("less bytes read than expected, read: %d, expected: %d", n, offset)
|
||||
}
|
||||
|
||||
if length == 0 {
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
return backend.LimitReadCloser(rc, int64(length)), nil
|
||||
path := s3path(t, name)
|
||||
r, _, err := be.s3api.GetPartialObject(be.bucketname, path, int64(offset), int64(length))
|
||||
rc := ioutil.NopCloser(r)
|
||||
return rc, err
|
||||
}
|
||||
|
||||
// Test returns true if a blob of the given type and name exists in the backend.
|
||||
func (be *S3Backend) Test(t backend.Type, name string) (bool, error) {
|
||||
found := false
|
||||
path := s3path(t, name)
|
||||
_, err := be.bucket.GetReader(path)
|
||||
_, err := be.s3api.StatObject(be.bucketname, path)
|
||||
if err == nil {
|
||||
found = true
|
||||
}
|
||||
|
@ -170,7 +164,7 @@ func (be *S3Backend) Test(t backend.Type, name string) (bool, error) {
|
|||
// Remove removes the blob with the given name and type.
|
||||
func (be *S3Backend) Remove(t backend.Type, name string) error {
|
||||
path := s3path(t, name)
|
||||
return be.bucket.Del(path)
|
||||
return be.s3api.RemoveObject(be.bucketname, path)
|
||||
}
|
||||
|
||||
// List returns a channel that yields all names of blobs of type t. A
|
||||
|
@ -181,34 +175,12 @@ func (be *S3Backend) List(t backend.Type, done <-chan struct{}) <-chan string {
|
|||
|
||||
prefix := s3path(t, "")
|
||||
|
||||
listresp, err := be.bucket.List(prefix, "/", "", maxKeysInList)
|
||||
|
||||
if err != nil {
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
matches := make([]string, len(listresp.Contents))
|
||||
for idx, key := range listresp.Contents {
|
||||
matches[idx] = strings.TrimPrefix(key.Key, prefix)
|
||||
}
|
||||
|
||||
// Continue making requests to get full list.
|
||||
for listresp.IsTruncated {
|
||||
listresp, err = be.bucket.List(prefix, "/", listresp.NextMarker, maxKeysInList)
|
||||
if err != nil {
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
for _, key := range listresp.Contents {
|
||||
matches = append(matches, strings.TrimPrefix(key.Key, prefix))
|
||||
}
|
||||
}
|
||||
listresp := be.s3api.ListObjects(be.bucketname, prefix, true)
|
||||
|
||||
go func() {
|
||||
defer close(ch)
|
||||
for _, m := range matches {
|
||||
for obj := range listresp {
|
||||
m := strings.TrimPrefix(obj.Stat.Key, prefix)
|
||||
if m == "" {
|
||||
continue
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue