Update s3 bindings

The S3 library has made a few fixes to the retry logic. Updating the bindings
accordingly.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2015-08-03 11:50:48 -07:00
parent 0b3b55e723
commit 7fc9e2112a
9 changed files with 191 additions and 41 deletions

6
Godeps/Godeps.json generated
View file

@ -7,15 +7,15 @@
"Deps": [ "Deps": [
{ {
"ImportPath": "github.com/AdRoll/goamz/aws", "ImportPath": "github.com/AdRoll/goamz/aws",
"Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99" "Rev": "f8c4952d5bc3056c0ca6711a1f56bc88b828d989"
}, },
{ {
"ImportPath": "github.com/AdRoll/goamz/cloudfront", "ImportPath": "github.com/AdRoll/goamz/cloudfront",
"Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99" "Rev": "f8c4952d5bc3056c0ca6711a1f56bc88b828d989"
}, },
{ {
"ImportPath": "github.com/AdRoll/goamz/s3", "ImportPath": "github.com/AdRoll/goamz/s3",
"Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99" "Rev": "f8c4952d5bc3056c0ca6711a1f56bc88b828d989"
}, },
{ {
"ImportPath": "github.com/Azure/azure-sdk-for-go/storage", "ImportPath": "github.com/Azure/azure-sdk-for-go/storage",

View file

@ -75,19 +75,6 @@ func NewRoute53Signer(auth Auth) *Route53Signer {
return &Route53Signer{auth: auth} return &Route53Signer{auth: auth}
} }
// getCurrentDate fetches the date stamp from the aws servers to
// ensure the auth headers are within 5 minutes of the server time
func (s *Route53Signer) getCurrentDate() string {
response, err := http.Get("https://route53.amazonaws.com/date")
if err != nil {
fmt.Print("Unable to get date from amazon: ", err)
return ""
}
response.Body.Close()
return response.Header.Get("Date")
}
// Creates the authorize signature based on the date stamp and secret key // Creates the authorize signature based on the date stamp and secret key
func (s *Route53Signer) getHeaderAuthorize(message string) string { func (s *Route53Signer) getHeaderAuthorize(message string) string {
hmacSha256 := hmac.New(sha256.New, []byte(s.auth.SecretKey)) hmacSha256 := hmac.New(sha256.New, []byte(s.auth.SecretKey))
@ -100,16 +87,18 @@ func (s *Route53Signer) getHeaderAuthorize(message string) string {
// Adds all the required headers for AWS Route53 API to the request // Adds all the required headers for AWS Route53 API to the request
// including the authorization // including the authorization
func (s *Route53Signer) Sign(req *http.Request) { func (s *Route53Signer) Sign(req *http.Request) {
date := s.getCurrentDate() date := time.Now().UTC().Format(time.RFC1123)
delete(req.Header, "Date")
req.Header.Set("Date", date)
authHeader := fmt.Sprintf("AWS3-HTTPS AWSAccessKeyId=%s,Algorithm=%s,Signature=%s", authHeader := fmt.Sprintf("AWS3-HTTPS AWSAccessKeyId=%s,Algorithm=%s,Signature=%s",
s.auth.AccessKey, "HmacSHA256", s.getHeaderAuthorize(date)) s.auth.AccessKey, "HmacSHA256", s.getHeaderAuthorize(date))
req.Header.Set("Host", req.Host) req.Header.Set("Host", req.Host)
req.Header.Set("X-Amzn-Authorization", authHeader) req.Header.Set("X-Amzn-Authorization", authHeader)
req.Header.Set("X-Amz-Date", date)
req.Header.Set("Content-Type", "application/xml") req.Header.Set("Content-Type", "application/xml")
if s.auth.Token() != "" { if s.auth.Token() != "" {
req.Header.Set("X-Amzn-Security-Token", s.auth.Token()) req.Header.Set("X-Amz-Security-Token", s.auth.Token())
} }
} }

View file

@ -394,6 +394,14 @@ func (p completeParts) Len() int { return len(p) }
func (p completeParts) Less(i, j int) bool { return p[i].PartNumber < p[j].PartNumber } func (p completeParts) Less(i, j int) bool { return p[i].PartNumber < p[j].PartNumber }
func (p completeParts) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p completeParts) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// We can't know in advance whether we'll have an Error or a
// CompleteMultipartUploadResult, so this structure is just a placeholder to
// know the name of the XML object.
type completeUploadResp struct {
XMLName xml.Name
InnerXML string `xml:",innerxml"`
}
// Complete assembles the given previously uploaded parts into the // Complete assembles the given previously uploaded parts into the
// final object. This operation may take several minutes. // final object. This operation may take several minutes.
// //
@ -419,12 +427,42 @@ func (m *Multi) Complete(parts []Part) error {
params: params, params: params,
payload: bytes.NewReader(data), payload: bytes.NewReader(data),
} }
err := m.Bucket.S3.query(req, nil) var resp completeUploadResp
err := m.Bucket.S3.query(req, &resp)
if shouldRetry(err) && attempt.HasNext() { if shouldRetry(err) && attempt.HasNext() {
continue continue
} }
if err != nil {
return err return err
} }
// A 200 error code does not guarantee that there were no errors (see
// http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html ),
// so first figure out what kind of XML "object" we are dealing with.
if resp.XMLName.Local == "Error" {
// S3.query does the unmarshalling for us, so we can't unmarshal
// again in a different struct... So we need to duct-tape back the
// original XML back together.
fullErrorXml := "<Error>" + resp.InnerXML + "</Error>"
s3err := &Error{}
if err := xml.Unmarshal([]byte(fullErrorXml), s3err); err != nil {
return err
}
return s3err
}
if resp.XMLName.Local == "CompleteMultipartUploadResult" {
// FIXME: One could probably add a CompleteFull method returning the
// actual contents of the CompleteMultipartUploadResult object.
return nil
}
return errors.New("Invalid XML struct returned: " + resp.XMLName.Local)
}
panic("unreachable") panic("unreachable")
} }

View file

@ -343,10 +343,7 @@ func (s *S) TestPutAllResume(c *check.C) {
func (s *S) TestMultiComplete(c *check.C) { func (s *S) TestMultiComplete(c *check.C) {
testServer.Response(200, nil, InitMultiResultDump) testServer.Response(200, nil, InitMultiResultDump)
// Note the 200 response. Completing will hold the connection on some testServer.Response(200, nil, MultiCompleteDump)
// kind of long poll, and may return a late error even after a 200.
testServer.Response(200, nil, InternalErrorDump)
testServer.Response(200, nil, "")
b := s.s3.Bucket("sample") b := s.s3.Bucket("sample")
@ -382,6 +379,24 @@ func (s *S) TestMultiComplete(c *check.C) {
c.Assert(payload.Part[1].ETag, check.Equals, `"ETag2"`) c.Assert(payload.Part[1].ETag, check.Equals, `"ETag2"`)
} }
func (s *S) TestMultiCompleteError(c *check.C) {
testServer.Response(200, nil, InitMultiResultDump)
// Note the 200 response. Completing will hold the connection on some
// kind of long poll, and may return a late error even after a 200.
testServer.Response(200, nil, InternalErrorDump)
b := s.s3.Bucket("sample")
multi, err := b.InitMulti("multi", "text/plain", s3.Private, s3.Options{})
c.Assert(err, check.IsNil)
err = multi.Complete([]s3.Part{{2, `"ETag2"`, 32}, {1, `"ETag1"`, 64}})
c.Assert(err, check.NotNil)
testServer.WaitRequest()
testServer.WaitRequest()
}
func (s *S) TestMultiAbort(c *check.C) { func (s *S) TestMultiAbort(c *check.C) {
testServer.Response(200, nil, InitMultiResultDump) testServer.Response(200, nil, InitMultiResultDump)
testServer.Response(200, nil, "") testServer.Response(200, nil, "")

View file

@ -194,6 +194,15 @@ var NoSuchUploadErrorDump = `
</Error> </Error>
` `
var MultiCompleteDump = `
<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Location>http://Example-Bucket.s3.amazonaws.com/Example-Object</Location>
<Bucket>Example-Bucket</Bucket>
<Key>Example-Object</Key>
<ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>
</CompleteMultipartUploadResult>
`
var InternalErrorDump = ` var InternalErrorDump = `
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<Error> <Error>

View file

@ -850,7 +850,15 @@ func (b *Bucket) UploadSignedURL(name, method, content_type string, expires time
signature := base64.StdEncoding.EncodeToString([]byte(macsum)) signature := base64.StdEncoding.EncodeToString([]byte(macsum))
signature = strings.TrimSpace(signature) signature = strings.TrimSpace(signature)
signedurl, err := url.Parse("https://" + b.Name + ".s3.amazonaws.com/") var signedurl *url.URL
var err error
if b.Region.S3Endpoint != "" {
signedurl, err = url.Parse(b.Region.S3Endpoint)
name = b.Name + "/" + name
} else {
signedurl, err = url.Parse("https://" + b.Name + ".s3.amazonaws.com/")
}
if err != nil { if err != nil {
log.Println("ERROR sining url for S3 upload", err) log.Println("ERROR sining url for S3 upload", err)
return "" return ""
@ -1243,7 +1251,7 @@ func shouldRetry(err error) bool {
return true return true
case *net.OpError: case *net.OpError:
switch e.Op { switch e.Op {
case "read", "write": case "dial", "read", "write":
return true return true
} }
case *url.Error: case *url.Error:
@ -1252,7 +1260,14 @@ func shouldRetry(err error) bool {
// are received or parsed correctly. In that later case, e.Op is set to // are received or parsed correctly. In that later case, e.Op is set to
// the HTTP method name with the first letter uppercased. We don't want // the HTTP method name with the first letter uppercased. We don't want
// to retry on POST operations, since those are not idempotent, all the // to retry on POST operations, since those are not idempotent, all the
// other ones should be safe to retry. // other ones should be safe to retry. The only case where all
// operations are safe to retry are "dial" errors, since in that case
// the POST request didn't make it to the server.
if netErr, ok := e.Err.(*net.OpError); ok && netErr.Op == "dial" {
return true
}
switch e.Op { switch e.Op {
case "Get", "Put", "Delete", "Head": case "Get", "Put", "Delete", "Head":
return shouldRetry(e.Err) return shouldRetry(e.Err)
@ -1264,6 +1279,10 @@ func shouldRetry(err error) bool {
case "InternalError", "NoSuchUpload", "NoSuchBucket": case "InternalError", "NoSuchUpload", "NoSuchBucket":
return true return true
} }
switch e.StatusCode {
case 500, 503, 504:
return true
}
} }
return false return false
} }

View file

@ -456,7 +456,11 @@ func (s *ClientTests) TestMultiComplete(c *check.C) {
err := b.PutBucket(s3.Private) err := b.PutBucket(s3.Private)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
multi, err := b.InitMulti("multi", "text/plain", s3.Private, s3.Options{}) contentType := "text/plain"
meta := make(map[string][]string)
meta["X-Amz-Meta-TestField"] = []string{"testValue"}
options := s3.Options{ContentEncoding: "identity", ContentDisposition: "inline", Meta: meta}
multi, err := b.InitMulti("multi", contentType, s3.Private, options)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(multi.UploadId, check.Matches, ".+") c.Assert(multi.UploadId, check.Matches, ".+")
defer multi.Abort() defer multi.Abort()
@ -484,6 +488,16 @@ func (s *ClientTests) TestMultiComplete(c *check.C) {
} }
} }
c.Assert(string(data[len(data1):]), check.Equals, string(data2)) c.Assert(string(data[len(data1):]), check.Equals, string(data2))
resp, err := b.GetResponse("multi")
c.Assert(resp.Header.Get("Content-Type"), check.Equals, contentType)
c.Assert(resp.Header.Get("x-amz-acl"), check.Equals, s3.Private)
c.Assert(resp.Header.Get("Content-MD5"), check.Equals, options.ContentMD5)
c.Assert(resp.Header.Get("Content-Encoding"), check.Equals, options.ContentEncoding)
c.Assert(resp.Header.Get("Content-Disposition"), check.Equals, options.ContentDisposition)
for k, values := range meta {
c.Assert(resp.Header.Get(k), check.Equals, strings.Join(values, ","))
}
} }
type multiList []*s3.Multi type multiList []*s3.Multi

View file

@ -4,6 +4,7 @@ import (
"github.com/AdRoll/goamz/aws" "github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3" "github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test" "github.com/AdRoll/goamz/s3/s3test"
"github.com/AdRoll/goamz/testutil"
"gopkg.in/check.v1" "gopkg.in/check.v1"
) )
@ -77,3 +78,10 @@ func (s *LocalServerSuite) TestBucketList(c *check.C) {
func (s *LocalServerSuite) TestDoublePutBucket(c *check.C) { func (s *LocalServerSuite) TestDoublePutBucket(c *check.C) {
s.clientTests.TestDoublePutBucket(c) s.clientTests.TestDoublePutBucket(c)
} }
func (s *LocalServerSuite) TestMultiComplete(c *check.C) {
if !testutil.Amazon {
c.Skip("live tests against AWS disabled (no -amazon)")
}
s.clientTests.TestMultiComplete(c)
}

View file

@ -25,6 +25,8 @@ import (
const debug = false const debug = false
var rangePattern = regexp.MustCompile(`^bytes=([\d]*)-([\d]*)$`)
type s3Error struct { type s3Error struct {
statusCode int statusCode int
XMLName struct{} `xml:"Error"` XMLName struct{} `xml:"Error"`
@ -82,6 +84,7 @@ type bucket struct {
ctime time.Time ctime time.Time
objects map[string]*object objects map[string]*object
multipartUploads map[string][]*multipartUploadPart multipartUploads map[string][]*multipartUploadPart
multipartMeta map[string]http.Header
} }
type object struct { type object struct {
@ -93,11 +96,26 @@ type object struct {
} }
type multipartUploadPart struct { type multipartUploadPart struct {
index uint
data []byte data []byte
etag string etag string
lastModified time.Time lastModified time.Time
} }
type multipartUploadPartByIndex []*multipartUploadPart
func (x multipartUploadPartByIndex) Len() int {
return len(x)
}
func (x multipartUploadPartByIndex) Swap(i, j int) {
x[i], x[j] = x[j], x[i]
}
func (x multipartUploadPartByIndex) Less(i, j int) bool {
return x[i].index < x[j].index
}
// A resource encapsulates the subject of an HTTP request. // A resource encapsulates the subject of an HTTP request.
// The resource referred to may or may not exist // The resource referred to may or may not exist
// when the request is made. // when the request is made.
@ -438,6 +456,7 @@ func (r bucketResource) put(a *action) interface{} {
// TODO default acl // TODO default acl
objects: make(map[string]*object), objects: make(map[string]*object),
multipartUploads: make(map[string][]*multipartUploadPart), multipartUploads: make(map[string][]*multipartUploadPart),
multipartMeta: make(map[string]http.Header),
} }
a.srv.buckets[r.name] = r.bucket a.srv.buckets[r.name] = r.bucket
created = true created = true
@ -592,8 +611,33 @@ func (objr objectResource) get(a *action) interface{} {
h.Set(name, vals[0]) h.Set(name, vals[0])
} }
} }
data := obj.data
status := http.StatusOK
if r := a.req.Header.Get("Range"); r != "" { if r := a.req.Header.Get("Range"); r != "" {
fatalf(400, "NotImplemented", "range unimplemented") // s3 ignores invalid ranges
if matches := rangePattern.FindStringSubmatch(r); len(matches) == 3 {
var err error
start := 0
end := len(obj.data) - 1
if matches[1] != "" {
start, err = strconv.Atoi(matches[1])
}
if err == nil && matches[2] != "" {
end, err = strconv.Atoi(matches[2])
}
if err == nil && start >= 0 && end >= start {
if start >= len(obj.data) {
fatalf(416, "InvalidRequest", "The requested range is not satisfiable")
}
if end > len(obj.data)-1 {
end = len(obj.data) - 1
}
data = obj.data[start : end+1]
status = http.StatusPartialContent
h.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, len(obj.data)))
}
}
} }
// TODO Last-Modified-Since // TODO Last-Modified-Since
// TODO If-Modified-Since // TODO If-Modified-Since
@ -602,14 +646,19 @@ func (objr objectResource) get(a *action) interface{} {
// TODO If-None-Match // TODO If-None-Match
// TODO Connection: close ?? // TODO Connection: close ??
// TODO x-amz-request-id // TODO x-amz-request-id
h.Set("Content-Length", fmt.Sprint(len(obj.data))) h.Set("Content-Length", fmt.Sprint(len(data)))
h.Set("ETag", hex.EncodeToString(obj.checksum)) h.Set("ETag", hex.EncodeToString(obj.checksum))
h.Set("Last-Modified", obj.mtime.Format(time.RFC1123)) h.Set("Last-Modified", obj.mtime.Format(time.RFC1123))
if status != http.StatusOK {
a.w.WriteHeader(status)
}
if a.req.Method == "HEAD" { if a.req.Method == "HEAD" {
return nil return nil
} }
// TODO avoid holding the lock when writing data. // TODO avoid holding the lock when writing data.
_, err := a.w.Write(obj.data) _, err := a.w.Write(data)
if err != nil { if err != nil {
// we can't do much except just log the fact. // we can't do much except just log the fact.
log.Printf("error writing data: %v", err) log.Printf("error writing data: %v", err)
@ -633,6 +682,7 @@ func (objr objectResource) put(a *action) interface{} {
// TODO x-amz-storage-class // TODO x-amz-storage-class
uploadId := a.req.URL.Query().Get("uploadId") uploadId := a.req.URL.Query().Get("uploadId")
var partNumber uint
// Check that the upload ID is valid if this is a multipart upload // Check that the upload ID is valid if this is a multipart upload
if uploadId != "" { if uploadId != "" {
@ -646,16 +696,13 @@ func (objr objectResource) put(a *action) interface{} {
fatalf(400, "InvalidRequest", "Missing partNumber parameter") fatalf(400, "InvalidRequest", "Missing partNumber parameter")
} }
partNumber, err := strconv.ParseUint(partNumberStr, 10, 32) number, err := strconv.ParseUint(partNumberStr, 10, 32)
if err != nil { if err != nil {
fatalf(400, "InvalidRequest", "partNumber is not a number") fatalf(400, "InvalidRequest", "partNumber is not a number")
} }
// Parts are 1-indexed for multipart uploads partNumber = uint(number)
if uint(partNumber)-1 != uint(len(objr.bucket.multipartUploads[uploadId])) {
fatalf(400, "InvalidRequest", "Invalid part number")
}
} }
var expectHash []byte var expectHash []byte
@ -712,9 +759,10 @@ func (objr objectResource) put(a *action) interface{} {
parts := objr.bucket.multipartUploads[uploadId] parts := objr.bucket.multipartUploads[uploadId]
part := &multipartUploadPart{ part := &multipartUploadPart{
data, index: partNumber,
etag, data: data,
time.Now(), etag: etag,
lastModified: time.Now(),
} }
objr.bucket.multipartUploads[uploadId] = append(parts, part) objr.bucket.multipartUploads[uploadId] = append(parts, part)
@ -755,6 +803,13 @@ func (objr objectResource) post(a *action) interface{} {
uploadId := strconv.FormatInt(rand.Int63(), 16) uploadId := strconv.FormatInt(rand.Int63(), 16)
objr.bucket.multipartUploads[uploadId] = []*multipartUploadPart{} objr.bucket.multipartUploads[uploadId] = []*multipartUploadPart{}
objr.bucket.multipartMeta[uploadId] = make(http.Header)
for key, values := range a.req.Header {
key = http.CanonicalHeaderKey(key)
if metaHeaders[key] || strings.HasPrefix(key, "X-Amz-Meta-") {
objr.bucket.multipartMeta[uploadId][key] = values
}
}
return &multipartInitResponse{ return &multipartInitResponse{
Bucket: objr.bucket.name, Bucket: objr.bucket.name,
@ -804,10 +859,12 @@ func (objr objectResource) post(a *action) interface{} {
data := &bytes.Buffer{} data := &bytes.Buffer{}
w := io.MultiWriter(sum, data) w := io.MultiWriter(sum, data)
sort.Sort(multipartUploadPartByIndex(parts))
for i, p := range parts { for i, p := range parts {
reqPart := req.Part[i] reqPart := req.Part[i]
if reqPart.PartNumber != uint(1+i) { if reqPart.PartNumber != p.index {
fatalf(400, "InvalidRequest", "Bad part number") fatalf(400, "InvalidRequest", "Bad part number")
} }
@ -833,6 +890,7 @@ func (objr objectResource) post(a *action) interface{} {
obj.checksum = sum.Sum(nil) obj.checksum = sum.Sum(nil)
obj.mtime = time.Now() obj.mtime = time.Now()
objr.bucket.objects[objr.name] = obj objr.bucket.objects[objr.name] = obj
obj.meta = objr.bucket.multipartMeta[uploadId]
objectLocation := fmt.Sprintf("http://%s/%s/%s", a.srv.listener.Addr().String(), objr.bucket.name, objr.name) objectLocation := fmt.Sprintf("http://%s/%s/%s", a.srv.listener.Addr().String(), objr.bucket.name, objr.name)