From aaae7f33d33fd0b86c79e68543eda5a107ca141e Mon Sep 17 00:00:00 2001 From: Chris Howey Date: Sun, 14 Jun 2015 07:17:38 -0500 Subject: [PATCH] Have number of connections limited by channel Removes previous limit of 1 connection --- backend/s3/s3.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 991385525..4f12251cd 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -5,7 +5,6 @@ import ( "errors" "io" "strings" - "sync" "github.com/mitchellh/goamz/aws" "github.com/mitchellh/goamz/s3" @@ -14,6 +13,7 @@ import ( ) const maxKeysInList = 1000 +const connLimit = 10 func s3path(t backend.Type, name string) string { if t == backend.Config { @@ -23,18 +23,23 @@ func s3path(t backend.Type, name string) string { } type S3 struct { - bucket *s3.Bucket - mput sync.Mutex - path string + bucket *s3.Bucket + connChan chan struct{} + path string } // Open a backend using an S3 bucket object func OpenS3Bucket(bucket *s3.Bucket, bucketname string) *S3 { - return &S3{bucket: bucket, path: bucketname} + connChan := make(chan struct{}, connLimit) + for i := 0; i < connLimit; i++ { + connChan <- struct{}{} + } + + return &S3{bucket: bucket, path: bucketname, connChan: connChan} } // Open opens the s3 backend at bucket and region. -func Open(regionname, bucketname string) (*S3, error) { +func Open(regionname, bucketname string) (backend.Backend, error) { auth, err := aws.EnvAuth() if err != nil { return nil, err @@ -42,7 +47,7 @@ func Open(regionname, bucketname string) (*S3, error) { client := s3.New(auth, aws.Regions[regionname]) - return &S3{bucket: client.Bucket(bucketname), path: bucketname}, nil + return OpenS3Bucket(client.Bucket(bucketname), bucketname), nil } // Location returns this backend's location (the bucket name). @@ -94,9 +99,9 @@ func (bb *s3Blob) Finalize(t backend.Type, name string) error { return errors.New("key already exists!") } - bb.b.mput.Lock() + <-bb.b.connChan err = bb.b.bucket.Put(path, bb.buf.Bytes(), "binary/octet-stream", "private") - bb.b.mput.Unlock() + bb.b.connChan <- struct{}{} bb.buf.Reset() return err } @@ -118,7 +123,9 @@ func (b *S3) get(t backend.Type, name string) (*s3Blob, error) { } path := s3path(t, name) + <-b.connChan data, err := b.bucket.Get(path) + b.connChan <- struct{}{} blob.buf = bytes.NewBuffer(data) return blob, err }