From 7fc9e2112af27bc160fbdeb124bb2276778fe209 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Mon, 3 Aug 2015 11:50:48 -0700 Subject: [PATCH] Update s3 bindings The S3 library has made a few fixes to the retry logic. Updating the bindings accordingly. Signed-off-by: Stephen J Day --- Godeps/Godeps.json | 6 +- .../src/github.com/AdRoll/goamz/aws/sign.go | 21 ++--- .../src/github.com/AdRoll/goamz/s3/multi.go | 42 +++++++++- .../github.com/AdRoll/goamz/s3/multi_test.go | 23 +++++- .../AdRoll/goamz/s3/responses_test.go | 9 ++ .../src/github.com/AdRoll/goamz/s3/s3.go | 25 +++++- .../github.com/AdRoll/goamz/s3/s3i_test.go | 16 +++- .../github.com/AdRoll/goamz/s3/s3t_test.go | 8 ++ .../AdRoll/goamz/s3/s3test/server.go | 82 ++++++++++++++++--- 9 files changed, 191 insertions(+), 41 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 174280023..3fe949e9a 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -7,15 +7,15 @@ "Deps": [ { "ImportPath": "github.com/AdRoll/goamz/aws", - "Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99" + "Rev": "f8c4952d5bc3056c0ca6711a1f56bc88b828d989" }, { "ImportPath": "github.com/AdRoll/goamz/cloudfront", - "Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99" + "Rev": "f8c4952d5bc3056c0ca6711a1f56bc88b828d989" }, { "ImportPath": "github.com/AdRoll/goamz/s3", - "Rev": "cc210f45dcb9889c2769a274522be2bf70edfb99" + "Rev": "f8c4952d5bc3056c0ca6711a1f56bc88b828d989" }, { "ImportPath": "github.com/Azure/azure-sdk-for-go/storage", diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/sign.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/sign.go index 20bcf0111..5875beeea 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/sign.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/aws/sign.go @@ -75,19 +75,6 @@ func NewRoute53Signer(auth Auth) *Route53Signer { return &Route53Signer{auth: auth} } -// getCurrentDate fetches the date stamp from the aws servers to -// ensure the auth headers are within 5 minutes of the server time -func (s *Route53Signer) getCurrentDate() string { - response, err := http.Get("https://route53.amazonaws.com/date") - if err != nil { - fmt.Print("Unable to get date from amazon: ", err) - return "" - } - - response.Body.Close() - return response.Header.Get("Date") -} - // Creates the authorize signature based on the date stamp and secret key func (s *Route53Signer) getHeaderAuthorize(message string) string { hmacSha256 := hmac.New(sha256.New, []byte(s.auth.SecretKey)) @@ -100,16 +87,18 @@ func (s *Route53Signer) getHeaderAuthorize(message string) string { // Adds all the required headers for AWS Route53 API to the request // including the authorization func (s *Route53Signer) Sign(req *http.Request) { - date := s.getCurrentDate() + date := time.Now().UTC().Format(time.RFC1123) + delete(req.Header, "Date") + req.Header.Set("Date", date) + authHeader := fmt.Sprintf("AWS3-HTTPS AWSAccessKeyId=%s,Algorithm=%s,Signature=%s", s.auth.AccessKey, "HmacSHA256", s.getHeaderAuthorize(date)) req.Header.Set("Host", req.Host) req.Header.Set("X-Amzn-Authorization", authHeader) - req.Header.Set("X-Amz-Date", date) req.Header.Set("Content-Type", "application/xml") if s.auth.Token() != "" { - req.Header.Set("X-Amzn-Security-Token", s.auth.Token()) + req.Header.Set("X-Amz-Security-Token", s.auth.Token()) } } diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/multi.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/multi.go index 80f17cbee..ff4542656 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/multi.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/multi.go @@ -394,6 +394,14 @@ func (p completeParts) Len() int { return len(p) } func (p completeParts) Less(i, j int) bool { return p[i].PartNumber < p[j].PartNumber } func (p completeParts) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +// We can't know in advance whether we'll have an Error or a +// CompleteMultipartUploadResult, so this structure is just a placeholder to +// know the name of the XML object. +type completeUploadResp struct { + XMLName xml.Name + InnerXML string `xml:",innerxml"` +} + // Complete assembles the given previously uploaded parts into the // final object. This operation may take several minutes. // @@ -419,11 +427,41 @@ func (m *Multi) Complete(parts []Part) error { params: params, payload: bytes.NewReader(data), } - err := m.Bucket.S3.query(req, nil) + var resp completeUploadResp + err := m.Bucket.S3.query(req, &resp) if shouldRetry(err) && attempt.HasNext() { continue } - return err + + if err != nil { + return err + } + + // A 200 error code does not guarantee that there were no errors (see + // http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html ), + // so first figure out what kind of XML "object" we are dealing with. + + if resp.XMLName.Local == "Error" { + // S3.query does the unmarshalling for us, so we can't unmarshal + // again in a different struct... So we need to duct-tape back the + // original XML back together. + fullErrorXml := "" + resp.InnerXML + "" + s3err := &Error{} + + if err := xml.Unmarshal([]byte(fullErrorXml), s3err); err != nil { + return err + } + + return s3err + } + + if resp.XMLName.Local == "CompleteMultipartUploadResult" { + // FIXME: One could probably add a CompleteFull method returning the + // actual contents of the CompleteMultipartUploadResult object. + return nil + } + + return errors.New("Invalid XML struct returned: " + resp.XMLName.Local) } panic("unreachable") } diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/multi_test.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/multi_test.go index 875c0adda..8429d336a 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/multi_test.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/multi_test.go @@ -343,10 +343,7 @@ func (s *S) TestPutAllResume(c *check.C) { func (s *S) TestMultiComplete(c *check.C) { testServer.Response(200, nil, InitMultiResultDump) - // Note the 200 response. Completing will hold the connection on some - // kind of long poll, and may return a late error even after a 200. - testServer.Response(200, nil, InternalErrorDump) - testServer.Response(200, nil, "") + testServer.Response(200, nil, MultiCompleteDump) b := s.s3.Bucket("sample") @@ -382,6 +379,24 @@ func (s *S) TestMultiComplete(c *check.C) { c.Assert(payload.Part[1].ETag, check.Equals, `"ETag2"`) } +func (s *S) TestMultiCompleteError(c *check.C) { + testServer.Response(200, nil, InitMultiResultDump) + // Note the 200 response. Completing will hold the connection on some + // kind of long poll, and may return a late error even after a 200. + testServer.Response(200, nil, InternalErrorDump) + + b := s.s3.Bucket("sample") + + multi, err := b.InitMulti("multi", "text/plain", s3.Private, s3.Options{}) + c.Assert(err, check.IsNil) + + err = multi.Complete([]s3.Part{{2, `"ETag2"`, 32}, {1, `"ETag1"`, 64}}) + c.Assert(err, check.NotNil) + + testServer.WaitRequest() + testServer.WaitRequest() +} + func (s *S) TestMultiAbort(c *check.C) { testServer.Response(200, nil, InitMultiResultDump) testServer.Response(200, nil, "") diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/responses_test.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/responses_test.go index 22478b9c2..66fe271b0 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/responses_test.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/responses_test.go @@ -194,6 +194,15 @@ var NoSuchUploadErrorDump = ` ` +var MultiCompleteDump = ` + + http://Example-Bucket.s3.amazonaws.com/Example-Object + Example-Bucket + Example-Object + "3858f62230ac3c915f300c664312c11f-9" + +` + var InternalErrorDump = ` diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3.go index 69b2e071d..dd3130258 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3.go @@ -850,7 +850,15 @@ func (b *Bucket) UploadSignedURL(name, method, content_type string, expires time signature := base64.StdEncoding.EncodeToString([]byte(macsum)) signature = strings.TrimSpace(signature) - signedurl, err := url.Parse("https://" + b.Name + ".s3.amazonaws.com/") + var signedurl *url.URL + var err error + if b.Region.S3Endpoint != "" { + signedurl, err = url.Parse(b.Region.S3Endpoint) + name = b.Name + "/" + name + } else { + signedurl, err = url.Parse("https://" + b.Name + ".s3.amazonaws.com/") + } + if err != nil { log.Println("ERROR sining url for S3 upload", err) return "" @@ -1243,7 +1251,7 @@ func shouldRetry(err error) bool { return true case *net.OpError: switch e.Op { - case "read", "write": + case "dial", "read", "write": return true } case *url.Error: @@ -1252,7 +1260,14 @@ func shouldRetry(err error) bool { // are received or parsed correctly. In that later case, e.Op is set to // the HTTP method name with the first letter uppercased. We don't want // to retry on POST operations, since those are not idempotent, all the - // other ones should be safe to retry. + // other ones should be safe to retry. The only case where all + // operations are safe to retry are "dial" errors, since in that case + // the POST request didn't make it to the server. + + if netErr, ok := e.Err.(*net.OpError); ok && netErr.Op == "dial" { + return true + } + switch e.Op { case "Get", "Put", "Delete", "Head": return shouldRetry(e.Err) @@ -1264,6 +1279,10 @@ func shouldRetry(err error) bool { case "InternalError", "NoSuchUpload", "NoSuchBucket": return true } + switch e.StatusCode { + case 500, 503, 504: + return true + } } return false } diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3i_test.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3i_test.go index e3b707b45..b0da0130e 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3i_test.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3i_test.go @@ -456,7 +456,11 @@ func (s *ClientTests) TestMultiComplete(c *check.C) { err := b.PutBucket(s3.Private) c.Assert(err, check.IsNil) - multi, err := b.InitMulti("multi", "text/plain", s3.Private, s3.Options{}) + contentType := "text/plain" + meta := make(map[string][]string) + meta["X-Amz-Meta-TestField"] = []string{"testValue"} + options := s3.Options{ContentEncoding: "identity", ContentDisposition: "inline", Meta: meta} + multi, err := b.InitMulti("multi", contentType, s3.Private, options) c.Assert(err, check.IsNil) c.Assert(multi.UploadId, check.Matches, ".+") defer multi.Abort() @@ -484,6 +488,16 @@ func (s *ClientTests) TestMultiComplete(c *check.C) { } } c.Assert(string(data[len(data1):]), check.Equals, string(data2)) + + resp, err := b.GetResponse("multi") + c.Assert(resp.Header.Get("Content-Type"), check.Equals, contentType) + c.Assert(resp.Header.Get("x-amz-acl"), check.Equals, s3.Private) + c.Assert(resp.Header.Get("Content-MD5"), check.Equals, options.ContentMD5) + c.Assert(resp.Header.Get("Content-Encoding"), check.Equals, options.ContentEncoding) + c.Assert(resp.Header.Get("Content-Disposition"), check.Equals, options.ContentDisposition) + for k, values := range meta { + c.Assert(resp.Header.Get(k), check.Equals, strings.Join(values, ",")) + } } type multiList []*s3.Multi diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3t_test.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3t_test.go index 29e2e753d..72279ff31 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3t_test.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3t_test.go @@ -4,6 +4,7 @@ import ( "github.com/AdRoll/goamz/aws" "github.com/AdRoll/goamz/s3" "github.com/AdRoll/goamz/s3/s3test" + "github.com/AdRoll/goamz/testutil" "gopkg.in/check.v1" ) @@ -77,3 +78,10 @@ func (s *LocalServerSuite) TestBucketList(c *check.C) { func (s *LocalServerSuite) TestDoublePutBucket(c *check.C) { s.clientTests.TestDoublePutBucket(c) } + +func (s *LocalServerSuite) TestMultiComplete(c *check.C) { + if !testutil.Amazon { + c.Skip("live tests against AWS disabled (no -amazon)") + } + s.clientTests.TestMultiComplete(c) +} diff --git a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3test/server.go b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3test/server.go index d54a638c5..0dd63af39 100644 --- a/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3test/server.go +++ b/Godeps/_workspace/src/github.com/AdRoll/goamz/s3/s3test/server.go @@ -25,6 +25,8 @@ import ( const debug = false +var rangePattern = regexp.MustCompile(`^bytes=([\d]*)-([\d]*)$`) + type s3Error struct { statusCode int XMLName struct{} `xml:"Error"` @@ -82,6 +84,7 @@ type bucket struct { ctime time.Time objects map[string]*object multipartUploads map[string][]*multipartUploadPart + multipartMeta map[string]http.Header } type object struct { @@ -93,11 +96,26 @@ type object struct { } type multipartUploadPart struct { + index uint data []byte etag string lastModified time.Time } +type multipartUploadPartByIndex []*multipartUploadPart + +func (x multipartUploadPartByIndex) Len() int { + return len(x) +} + +func (x multipartUploadPartByIndex) Swap(i, j int) { + x[i], x[j] = x[j], x[i] +} + +func (x multipartUploadPartByIndex) Less(i, j int) bool { + return x[i].index < x[j].index +} + // A resource encapsulates the subject of an HTTP request. // The resource referred to may or may not exist // when the request is made. @@ -438,6 +456,7 @@ func (r bucketResource) put(a *action) interface{} { // TODO default acl objects: make(map[string]*object), multipartUploads: make(map[string][]*multipartUploadPart), + multipartMeta: make(map[string]http.Header), } a.srv.buckets[r.name] = r.bucket created = true @@ -592,8 +611,33 @@ func (objr objectResource) get(a *action) interface{} { h.Set(name, vals[0]) } } + + data := obj.data + status := http.StatusOK if r := a.req.Header.Get("Range"); r != "" { - fatalf(400, "NotImplemented", "range unimplemented") + // s3 ignores invalid ranges + if matches := rangePattern.FindStringSubmatch(r); len(matches) == 3 { + var err error + start := 0 + end := len(obj.data) - 1 + if matches[1] != "" { + start, err = strconv.Atoi(matches[1]) + } + if err == nil && matches[2] != "" { + end, err = strconv.Atoi(matches[2]) + } + if err == nil && start >= 0 && end >= start { + if start >= len(obj.data) { + fatalf(416, "InvalidRequest", "The requested range is not satisfiable") + } + if end > len(obj.data)-1 { + end = len(obj.data) - 1 + } + data = obj.data[start : end+1] + status = http.StatusPartialContent + h.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, len(obj.data))) + } + } } // TODO Last-Modified-Since // TODO If-Modified-Since @@ -602,14 +646,19 @@ func (objr objectResource) get(a *action) interface{} { // TODO If-None-Match // TODO Connection: close ?? // TODO x-amz-request-id - h.Set("Content-Length", fmt.Sprint(len(obj.data))) + h.Set("Content-Length", fmt.Sprint(len(data))) h.Set("ETag", hex.EncodeToString(obj.checksum)) h.Set("Last-Modified", obj.mtime.Format(time.RFC1123)) + + if status != http.StatusOK { + a.w.WriteHeader(status) + } + if a.req.Method == "HEAD" { return nil } // TODO avoid holding the lock when writing data. - _, err := a.w.Write(obj.data) + _, err := a.w.Write(data) if err != nil { // we can't do much except just log the fact. log.Printf("error writing data: %v", err) @@ -633,6 +682,7 @@ func (objr objectResource) put(a *action) interface{} { // TODO x-amz-storage-class uploadId := a.req.URL.Query().Get("uploadId") + var partNumber uint // Check that the upload ID is valid if this is a multipart upload if uploadId != "" { @@ -646,16 +696,13 @@ func (objr objectResource) put(a *action) interface{} { fatalf(400, "InvalidRequest", "Missing partNumber parameter") } - partNumber, err := strconv.ParseUint(partNumberStr, 10, 32) + number, err := strconv.ParseUint(partNumberStr, 10, 32) if err != nil { fatalf(400, "InvalidRequest", "partNumber is not a number") } - // Parts are 1-indexed for multipart uploads - if uint(partNumber)-1 != uint(len(objr.bucket.multipartUploads[uploadId])) { - fatalf(400, "InvalidRequest", "Invalid part number") - } + partNumber = uint(number) } var expectHash []byte @@ -712,9 +759,10 @@ func (objr objectResource) put(a *action) interface{} { parts := objr.bucket.multipartUploads[uploadId] part := &multipartUploadPart{ - data, - etag, - time.Now(), + index: partNumber, + data: data, + etag: etag, + lastModified: time.Now(), } objr.bucket.multipartUploads[uploadId] = append(parts, part) @@ -755,6 +803,13 @@ func (objr objectResource) post(a *action) interface{} { uploadId := strconv.FormatInt(rand.Int63(), 16) objr.bucket.multipartUploads[uploadId] = []*multipartUploadPart{} + objr.bucket.multipartMeta[uploadId] = make(http.Header) + for key, values := range a.req.Header { + key = http.CanonicalHeaderKey(key) + if metaHeaders[key] || strings.HasPrefix(key, "X-Amz-Meta-") { + objr.bucket.multipartMeta[uploadId][key] = values + } + } return &multipartInitResponse{ Bucket: objr.bucket.name, @@ -804,10 +859,12 @@ func (objr objectResource) post(a *action) interface{} { data := &bytes.Buffer{} w := io.MultiWriter(sum, data) + sort.Sort(multipartUploadPartByIndex(parts)) + for i, p := range parts { reqPart := req.Part[i] - if reqPart.PartNumber != uint(1+i) { + if reqPart.PartNumber != p.index { fatalf(400, "InvalidRequest", "Bad part number") } @@ -833,6 +890,7 @@ func (objr objectResource) post(a *action) interface{} { obj.checksum = sum.Sum(nil) obj.mtime = time.Now() objr.bucket.objects[objr.name] = obj + obj.meta = objr.bucket.multipartMeta[uploadId] objectLocation := fmt.Sprintf("http://%s/%s/%s", a.srv.listener.Addr().String(), objr.bucket.name, objr.name)