s3: properly integrate minio-go lib

This commit is contained in:
Alexander Neumann 2015-12-29 00:27:29 +01:00
parent 2c15597e24
commit 407819e5a9
5 changed files with 81 additions and 43 deletions

View file

@ -9,8 +9,8 @@ import (
// Config contains all configuration necessary to connect to an s3 compatible
// server.
type Config struct {
Region string
URL string
Endpoint string
UseHTTP bool
KeyID, Secret string
Bucket string
}
@ -28,7 +28,7 @@ func ParseConfig(s string) (interface{}, error) {
}
cfg := Config{
Region: data[0],
Endpoint: data[0],
Bucket: data[1],
}
@ -55,7 +55,7 @@ func ParseConfig(s string) (interface{}, error) {
if len(rest) == 2 {
// assume that just a region name and a bucket has been specified, in
// the format region/bucket
cfg.Region = rest[0]
cfg.Endpoint = rest[0]
cfg.Bucket = rest[1]
} else {
// assume that a URL has been specified, parse it and use the path as
@ -69,10 +69,12 @@ func ParseConfig(s string) (interface{}, error) {
return nil, errors.New("s3: bucket name not found")
}
cfg.Bucket = url.Path[1:]
url.Path = ""
cfg.Endpoint = url.Host
if url.Scheme == "http" {
cfg.UseHTTP = true
}
cfg.URL = url.String()
cfg.Bucket = url.Path[1:]
}
return cfg, nil

View file

@ -7,11 +7,11 @@ var configTests = []struct {
cfg Config
}{
{"s3://eu-central-1/bucketname", Config{
Region: "eu-central-1",
URL: "eu-central-1",
Bucket: "bucketname",
}},
{"s3:eu-central-1/foobar", Config{
Region: "eu-central-1",
URL: "eu-central-1",
Bucket: "foobar",
}},
{"s3:https://hostname:9999/foobar", Config{

View file

@ -24,7 +24,7 @@ func s3path(t backend.Type, name string) string {
}
type S3Backend struct {
s3api minio.API
client minio.CloudStorageClient
connChan chan struct{}
bucketname string
}
@ -32,30 +32,18 @@ type S3Backend struct {
// Open opens the S3 backend at bucket and region. The bucket is created if it
// does not exist yet.
func Open(cfg Config) (backend.Backend, error) {
mcfg := minio.Config{
AccessKeyID: cfg.KeyID,
SecretAccessKey: cfg.Secret,
}
debug.Log("s3.Open", "open, config %#v", cfg)
if cfg.URL != "" {
mcfg.Endpoint = cfg.URL
} else {
mcfg.Region = cfg.Region
}
if mcfg.Region == "" {
mcfg.Region = "us-east-1"
}
s3api, err := minio.New(mcfg)
client, err := minio.New(cfg.Endpoint, cfg.KeyID, cfg.Secret, cfg.UseHTTP)
if err != nil {
return nil, err
}
be := &S3Backend{s3api: s3api, bucketname: cfg.Bucket}
be := &S3Backend{client: client, bucketname: cfg.Bucket}
be.createConnections()
err = s3api.MakeBucket(cfg.Bucket, "")
// create new bucket with default ACL in default region
err = client.MakeBucket(cfg.Bucket, "", "")
if err != nil {
e, ok := err.(minio.ErrorResponse)
@ -123,15 +111,18 @@ func (bb *s3Blob) Finalize(t backend.Type, name string) error {
path := s3path(t, name)
// Check key does not already exist
_, err := bb.b.s3api.StatObject(bb.b.bucketname, path)
_, err := bb.b.client.StatObject(bb.b.bucketname, path)
if err == nil {
return errors.New("key already exists")
}
<-bb.b.connChan
err = bb.b.s3api.PutObject(bb.b.bucketname, path, "binary/octet-stream", int64(bb.buf.Len()), bb.buf)
_, err = bb.b.client.PutObject(bb.b.bucketname, path, bb.buf, int64(bb.buf.Len()), "binary/octet-stream")
bb.b.connChan <- struct{}{}
bb.buf.Reset()
debug.Log("s3.Finalize", "finalized %v -> err %v", path, err)
return err
}
@ -150,23 +141,50 @@ func (be *S3Backend) Create() (backend.Blob, error) {
// name. The reader should be closed after draining it.
func (be *S3Backend) Get(t backend.Type, name string) (io.ReadCloser, error) {
path := s3path(t, name)
rc, _, err := be.s3api.GetObject(be.bucketname, path)
rc, _, err := be.client.GetObject(be.bucketname, path)
debug.Log("s3.Get", "%v %v -> err %v", t, name, err)
return rc, err
}
// GetReader returns an io.ReadCloser for the Blob with the given name of
// type t at offset and length. If length is 0, the reader reads until EOF.
func (be *S3Backend) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) {
debug.Log("s3.GetReader", "%v %v, offset %v len %v", t, name, offset, length)
path := s3path(t, name)
rc, _, err := be.s3api.GetPartialObject(be.bucketname, path, int64(offset), int64(length))
return rc, err
rd, stat, err := be.client.GetObjectPartial(be.bucketname, path)
debug.Log("s3.GetReader", " stat %v, err %v", stat, err)
if err != nil {
return nil, err
}
l, o := int64(length), int64(offset)
if l == 0 {
l = stat.Size - o
}
if l > stat.Size-o {
l = stat.Size - o
}
debug.Log("s3.GetReader", "%v %v, o %v l %v", t, name, o, l)
buf := make([]byte, l)
n, err := rd.ReadAt(buf, o)
debug.Log("s3.GetReader", " -> n %v err %v", n, err)
if err == io.EOF && int64(n) == l {
debug.Log("s3.GetReader", " ignoring EOF error")
err = nil
}
return backend.ReadCloser(bytes.NewReader(buf[:n])), err
}
// Test returns true if a blob of the given type and name exists in the backend.
func (be *S3Backend) Test(t backend.Type, name string) (bool, error) {
found := false
path := s3path(t, name)
_, err := be.s3api.StatObject(be.bucketname, path)
_, err := be.client.StatObject(be.bucketname, path)
if err == nil {
found = true
}
@ -178,7 +196,9 @@ func (be *S3Backend) Test(t backend.Type, name string) (bool, error) {
// Remove removes the blob with the given name and type.
func (be *S3Backend) Remove(t backend.Type, name string) error {
path := s3path(t, name)
return be.s3api.RemoveObject(be.bucketname, path)
err := be.client.RemoveObject(be.bucketname, path)
debug.Log("s3.Remove", "%v %v -> err %v", t, name, err)
return err
}
// List returns a channel that yields all names of blobs of type t. A
@ -189,12 +209,12 @@ func (be *S3Backend) List(t backend.Type, done <-chan struct{}) <-chan string {
prefix := s3path(t, "")
listresp := be.s3api.ListObjects(be.bucketname, prefix, true)
listresp := be.client.ListObjects(be.bucketname, prefix, true, done)
go func() {
defer close(ch)
for obj := range listresp {
m := strings.TrimPrefix(obj.Stat.Key, prefix)
m := strings.TrimPrefix(obj.Key, prefix)
if m == "" {
continue
}

View file

@ -1,6 +1,7 @@
package backend_test
import (
"net/url"
"os"
"testing"
@ -17,12 +18,21 @@ func TestS3Backend(t *testing.T) {
t.Skip("s3 test server not available")
}
be, err := s3.Open(s3.Config{
URL: TestS3Server,
url, err := url.Parse(TestS3Server)
OK(t, err)
cfg := s3.Config{
Endpoint: url.Host,
Bucket: "restictestbucket",
KeyID: os.Getenv("AWS_ACCESS_KEY_ID"),
Secret: os.Getenv("AWS_SECRET_ACCESS_KEY"),
})
}
if url.Scheme == "http" {
cfg.UseHTTP = true
}
be, err := s3.Open(cfg)
OK(t, err)
testBackend(be, t)

View file

@ -56,6 +56,12 @@ var parseTests = []struct {
Bucket: "bucketname",
}},
},
{"s3:eu-central-1/repo", Location{Scheme: "s3",
Config: s3.Config{
Region: "eu-central-1",
Bucket: "repo",
}},
},
{"s3:https://hostname.foo/repo", Location{Scheme: "s3",
Config: s3.Config{
URL: "https://hostname.foo",