From 407819e5a93b344b79b6fc8bc429b79cf701555c Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Tue, 29 Dec 2015 00:27:29 +0100 Subject: [PATCH] s3: properly integrate minio-go lib --- backend/s3/config.go | 18 +++++----- backend/s3/config_test.go | 4 +-- backend/s3/s3.go | 74 +++++++++++++++++++++++++-------------- backend/s3_test.go | 22 ++++++++---- location/location_test.go | 6 ++++ 5 files changed, 81 insertions(+), 43 deletions(-) diff --git a/backend/s3/config.go b/backend/s3/config.go index cd4d77b4f..b0224925a 100644 --- a/backend/s3/config.go +++ b/backend/s3/config.go @@ -9,8 +9,8 @@ import ( // Config contains all configuration necessary to connect to an s3 compatible // server. type Config struct { - Region string - URL string + Endpoint string + UseHTTP bool KeyID, Secret string Bucket string } @@ -28,8 +28,8 @@ func ParseConfig(s string) (interface{}, error) { } cfg := Config{ - Region: data[0], - Bucket: data[1], + Endpoint: data[0], + Bucket: data[1], } return cfg, nil @@ -55,7 +55,7 @@ func ParseConfig(s string) (interface{}, error) { if len(rest) == 2 { // assume that just a region name and a bucket has been specified, in // the format region/bucket - cfg.Region = rest[0] + cfg.Endpoint = rest[0] cfg.Bucket = rest[1] } else { // assume that a URL has been specified, parse it and use the path as @@ -69,10 +69,12 @@ func ParseConfig(s string) (interface{}, error) { return nil, errors.New("s3: bucket name not found") } - cfg.Bucket = url.Path[1:] - url.Path = "" + cfg.Endpoint = url.Host + if url.Scheme == "http" { + cfg.UseHTTP = true + } - cfg.URL = url.String() + cfg.Bucket = url.Path[1:] } return cfg, nil diff --git a/backend/s3/config_test.go b/backend/s3/config_test.go index ca71a589f..6b3962c94 100644 --- a/backend/s3/config_test.go +++ b/backend/s3/config_test.go @@ -7,11 +7,11 @@ var configTests = []struct { cfg Config }{ {"s3://eu-central-1/bucketname", Config{ - Region: "eu-central-1", + URL: "eu-central-1", Bucket: "bucketname", }}, {"s3:eu-central-1/foobar", Config{ - Region: "eu-central-1", + URL: "eu-central-1", Bucket: "foobar", }}, {"s3:https://hostname:9999/foobar", Config{ diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 62616a993..6ee309112 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -24,7 +24,7 @@ func s3path(t backend.Type, name string) string { } type S3Backend struct { - s3api minio.API + client minio.CloudStorageClient connChan chan struct{} bucketname string } @@ -32,30 +32,18 @@ type S3Backend struct { // Open opens the S3 backend at bucket and region. The bucket is created if it // does not exist yet. func Open(cfg Config) (backend.Backend, error) { - mcfg := minio.Config{ - AccessKeyID: cfg.KeyID, - SecretAccessKey: cfg.Secret, - } + debug.Log("s3.Open", "open, config %#v", cfg) - if cfg.URL != "" { - mcfg.Endpoint = cfg.URL - } else { - mcfg.Region = cfg.Region - } - - if mcfg.Region == "" { - mcfg.Region = "us-east-1" - } - - s3api, err := minio.New(mcfg) + client, err := minio.New(cfg.Endpoint, cfg.KeyID, cfg.Secret, cfg.UseHTTP) if err != nil { return nil, err } - be := &S3Backend{s3api: s3api, bucketname: cfg.Bucket} + be := &S3Backend{client: client, bucketname: cfg.Bucket} be.createConnections() - err = s3api.MakeBucket(cfg.Bucket, "") + // create new bucket with default ACL in default region + err = client.MakeBucket(cfg.Bucket, "", "") if err != nil { e, ok := err.(minio.ErrorResponse) @@ -123,15 +111,18 @@ func (bb *s3Blob) Finalize(t backend.Type, name string) error { path := s3path(t, name) // Check key does not already exist - _, err := bb.b.s3api.StatObject(bb.b.bucketname, path) + _, err := bb.b.client.StatObject(bb.b.bucketname, path) if err == nil { return errors.New("key already exists") } <-bb.b.connChan - err = bb.b.s3api.PutObject(bb.b.bucketname, path, "binary/octet-stream", int64(bb.buf.Len()), bb.buf) + _, err = bb.b.client.PutObject(bb.b.bucketname, path, bb.buf, int64(bb.buf.Len()), "binary/octet-stream") bb.b.connChan <- struct{}{} bb.buf.Reset() + + debug.Log("s3.Finalize", "finalized %v -> err %v", path, err) + return err } @@ -150,23 +141,50 @@ func (be *S3Backend) Create() (backend.Blob, error) { // name. The reader should be closed after draining it. func (be *S3Backend) Get(t backend.Type, name string) (io.ReadCloser, error) { path := s3path(t, name) - rc, _, err := be.s3api.GetObject(be.bucketname, path) + rc, _, err := be.client.GetObject(be.bucketname, path) + debug.Log("s3.Get", "%v %v -> err %v", t, name, err) return rc, err } // GetReader returns an io.ReadCloser for the Blob with the given name of // type t at offset and length. If length is 0, the reader reads until EOF. func (be *S3Backend) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) { + debug.Log("s3.GetReader", "%v %v, offset %v len %v", t, name, offset, length) path := s3path(t, name) - rc, _, err := be.s3api.GetPartialObject(be.bucketname, path, int64(offset), int64(length)) - return rc, err + rd, stat, err := be.client.GetObjectPartial(be.bucketname, path) + debug.Log("s3.GetReader", " stat %v, err %v", stat, err) + if err != nil { + return nil, err + } + + l, o := int64(length), int64(offset) + + if l == 0 { + l = stat.Size - o + } + + if l > stat.Size-o { + l = stat.Size - o + } + + debug.Log("s3.GetReader", "%v %v, o %v l %v", t, name, o, l) + + buf := make([]byte, l) + n, err := rd.ReadAt(buf, o) + debug.Log("s3.GetReader", " -> n %v err %v", n, err) + if err == io.EOF && int64(n) == l { + debug.Log("s3.GetReader", " ignoring EOF error") + err = nil + } + + return backend.ReadCloser(bytes.NewReader(buf[:n])), err } // Test returns true if a blob of the given type and name exists in the backend. func (be *S3Backend) Test(t backend.Type, name string) (bool, error) { found := false path := s3path(t, name) - _, err := be.s3api.StatObject(be.bucketname, path) + _, err := be.client.StatObject(be.bucketname, path) if err == nil { found = true } @@ -178,7 +196,9 @@ func (be *S3Backend) Test(t backend.Type, name string) (bool, error) { // Remove removes the blob with the given name and type. func (be *S3Backend) Remove(t backend.Type, name string) error { path := s3path(t, name) - return be.s3api.RemoveObject(be.bucketname, path) + err := be.client.RemoveObject(be.bucketname, path) + debug.Log("s3.Remove", "%v %v -> err %v", t, name, err) + return err } // List returns a channel that yields all names of blobs of type t. A @@ -189,12 +209,12 @@ func (be *S3Backend) List(t backend.Type, done <-chan struct{}) <-chan string { prefix := s3path(t, "") - listresp := be.s3api.ListObjects(be.bucketname, prefix, true) + listresp := be.client.ListObjects(be.bucketname, prefix, true, done) go func() { defer close(ch) for obj := range listresp { - m := strings.TrimPrefix(obj.Stat.Key, prefix) + m := strings.TrimPrefix(obj.Key, prefix) if m == "" { continue } diff --git a/backend/s3_test.go b/backend/s3_test.go index b177ad067..6d79f9cd8 100644 --- a/backend/s3_test.go +++ b/backend/s3_test.go @@ -1,6 +1,7 @@ package backend_test import ( + "net/url" "os" "testing" @@ -17,12 +18,21 @@ func TestS3Backend(t *testing.T) { t.Skip("s3 test server not available") } - be, err := s3.Open(s3.Config{ - URL: TestS3Server, - Bucket: "restictestbucket", - KeyID: os.Getenv("AWS_ACCESS_KEY_ID"), - Secret: os.Getenv("AWS_SECRET_ACCESS_KEY"), - }) + url, err := url.Parse(TestS3Server) + OK(t, err) + + cfg := s3.Config{ + Endpoint: url.Host, + Bucket: "restictestbucket", + KeyID: os.Getenv("AWS_ACCESS_KEY_ID"), + Secret: os.Getenv("AWS_SECRET_ACCESS_KEY"), + } + + if url.Scheme == "http" { + cfg.UseHTTP = true + } + + be, err := s3.Open(cfg) OK(t, err) testBackend(be, t) diff --git a/location/location_test.go b/location/location_test.go index 405ba0144..702b2651e 100644 --- a/location/location_test.go +++ b/location/location_test.go @@ -56,6 +56,12 @@ var parseTests = []struct { Bucket: "bucketname", }}, }, + {"s3:eu-central-1/repo", Location{Scheme: "s3", + Config: s3.Config{ + Region: "eu-central-1", + Bucket: "repo", + }}, + }, {"s3:https://hostname.foo/repo", Location{Scheme: "s3", Config: s3.Config{ URL: "https://hostname.foo",