From 0737bb71753b2691f025506fa8a3e62040b9c093 Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Tue, 3 Nov 2015 09:57:39 +0100 Subject: [PATCH 1/2] Update Swift bindings Signed-off-by: Sylvain Baubeau --- Godeps/Godeps.json | 2 +- .../src/github.com/ncw/swift/README.md | 2 + .../src/github.com/ncw/swift/auth.go | 1 + .../src/github.com/ncw/swift/auth_v3.go | 1 + .../src/github.com/ncw/swift/swift.go | 52 +++++++++++++--- .../src/github.com/ncw/swift/swift_test.go | 59 +++++++++++++++++++ .../github.com/ncw/swift/swifttest/server.go | 38 ++++++++---- 7 files changed, 137 insertions(+), 18 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index e24937a0..4ccaaefb 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -86,7 +86,7 @@ }, { "ImportPath": "github.com/ncw/swift", - "Rev": "ca8cbbde50d4e12dd8ad70b1bd66589ae98efc5c" + "Rev": "c54732e87b0b283d1baf0a18db689d0aea460ba3" }, { "ImportPath": "github.com/noahdesu/go-ceph/rados", diff --git a/Godeps/_workspace/src/github.com/ncw/swift/README.md b/Godeps/_workspace/src/github.com/ncw/swift/README.md index 7d9b1be1..f51266a2 100644 --- a/Godeps/_workspace/src/github.com/ncw/swift/README.md +++ b/Godeps/_workspace/src/github.com/ncw/swift/README.md @@ -132,3 +132,5 @@ Contributors - Dai HaoJun - Hua Wang - Fabian Ruff +- Arturo Reuschenbach Puncernau +- Petr Kotek diff --git a/Godeps/_workspace/src/github.com/ncw/swift/auth.go b/Godeps/_workspace/src/github.com/ncw/swift/auth.go index ca35d237..3948bc1a 100644 --- a/Godeps/_workspace/src/github.com/ncw/swift/auth.go +++ b/Godeps/_workspace/src/github.com/ncw/swift/auth.go @@ -156,6 +156,7 @@ func (auth *v2Auth) Request(c *Connection) (*http.Request, error) { return nil, err } req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", c.UserAgent) return req, nil } diff --git a/Godeps/_workspace/src/github.com/ncw/swift/auth_v3.go b/Godeps/_workspace/src/github.com/ncw/swift/auth_v3.go index 7c375d22..b657747a 100644 --- a/Godeps/_workspace/src/github.com/ncw/swift/auth_v3.go +++ b/Godeps/_workspace/src/github.com/ncw/swift/auth_v3.go @@ -177,6 +177,7 @@ func (auth *v3Auth) Request(c *Connection) (*http.Request, error) { return nil, err } req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", c.UserAgent) return req, nil } diff --git a/Godeps/_workspace/src/github.com/ncw/swift/swift.go b/Godeps/_workspace/src/github.com/ncw/swift/swift.go index 003e1053..290f73a7 100644 --- a/Godeps/_workspace/src/github.com/ncw/swift/swift.go +++ b/Godeps/_workspace/src/github.com/ncw/swift/swift.go @@ -392,6 +392,26 @@ func (c *Connection) authenticated() bool { return c.StorageUrl != "" && c.AuthToken != "" } +// SwiftInfo contains the JSON object returned by Swift when the /info +// route is queried. The object contains, among others, the Swift version, +// the enabled middlewares and their configuration +type SwiftInfo map[string]interface{} + +// Discover Swift configuration by doing a request against /info +func (c *Connection) QueryInfo() (infos SwiftInfo, err error) { + infoUrl, err := url.Parse(c.StorageUrl) + if err != nil { + return nil, err + } + infoUrl.Path = path.Join(infoUrl.Path, "..", "..", "info") + resp, err := http.Get(infoUrl.String()) + if err == nil { + err = readJson(resp, &infos) + return infos, err + } + return nil, err +} + // RequestOpts contains parameters for Connection.storage. type RequestOpts struct { Container string @@ -418,6 +438,10 @@ type RequestOpts struct { // resp.Body.Close() must be called on it, unless noResponse is set in // which case the body will be closed in this function // +// If "Content-Length" is set in p.Headers it will be used - this can +// be used to override the default chunked transfer encoding for +// uploads. +// // This will Authenticate if necessary, and re-authenticate if it // receives a 401 error which means the token has expired // @@ -433,8 +457,9 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response, var req *http.Request for { var authToken string - targetUrl, authToken, err = c.getUrlAndAuthToken(targetUrl, p.OnReAuth) - + if targetUrl, authToken, err = c.getUrlAndAuthToken(targetUrl, p.OnReAuth); err != nil { + return //authentication failure + } var URL *url.URL URL, err = url.Parse(targetUrl) if err != nil { @@ -460,18 +485,27 @@ func (c *Connection) Call(targetUrl string, p RequestOpts) (resp *http.Response, } if p.Headers != nil { for k, v := range p.Headers { - req.Header.Add(k, v) + // Set ContentLength in req if the user passed it in in the headers + if k == "Content-Length" { + contentLength, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return nil, nil, fmt.Errorf("Invalid %q header %q: %v", k, v, err) + } + req.ContentLength = contentLength + } else { + req.Header.Add(k, v) + } } } - req.Header.Add("User-Agent", DefaultUserAgent) + req.Header.Add("User-Agent", c.UserAgent) req.Header.Add("X-Auth-Token", authToken) resp, err = c.doTimeoutRequest(timer, req) if err != nil { - if p.Operation == "HEAD" || p.Operation == "GET" { + if (p.Operation == "HEAD" || p.Operation == "GET") && retries > 0 { retries-- continue } - return + return nil, nil, err } // Check to see if token has expired if resp.StatusCode == 401 && retries > 0 { @@ -566,7 +600,8 @@ func readJson(resp *http.Response, result interface{}) (err error) { // ContainersOpts is options for Containers() and ContainerNames() type ContainersOpts struct { Limit int // For an integer value n, limits the number of results to at most n values. - Marker string // Given a string value x, return object names greater in value than the specified marker. + Prefix string // Given a string value x, return container names matching the specified prefix. + Marker string // Given a string value x, return container names greater in value than the specified marker. EndMarker string // Given a string value x, return container names less in value than the specified marker. Headers Headers // Any additional HTTP headers - can be nil } @@ -579,6 +614,9 @@ func (opts *ContainersOpts) parse() (url.Values, Headers) { if opts.Limit > 0 { v.Set("limit", strconv.Itoa(opts.Limit)) } + if opts.Prefix != "" { + v.Set("prefix", opts.Prefix) + } if opts.Marker != "" { v.Set("marker", opts.Marker) } diff --git a/Godeps/_workspace/src/github.com/ncw/swift/swift_test.go b/Godeps/_workspace/src/github.com/ncw/swift/swift_test.go index 87bc7a5d..196dcd9b 100644 --- a/Godeps/_workspace/src/github.com/ncw/swift/swift_test.go +++ b/Godeps/_workspace/src/github.com/ncw/swift/swift_test.go @@ -65,6 +65,7 @@ func makeConnection() (*swift.Connection, error) { UserName := os.Getenv("SWIFT_API_USER") ApiKey := os.Getenv("SWIFT_API_KEY") AuthUrl := os.Getenv("SWIFT_AUTH_URL") + Region := os.Getenv("SWIFT_REGION_NAME") Insecure := os.Getenv("SWIFT_AUTH_INSECURE") ConnectionChannelTimeout := os.Getenv("SWIFT_CONNECTION_CHANNEL_TIMEOUT") @@ -96,6 +97,7 @@ func makeConnection() (*swift.Connection, error) { UserName: UserName, ApiKey: ApiKey, AuthUrl: AuthUrl, + Region: Region, Transport: transport, ConnectTimeout: 60 * time.Second, Timeout: 60 * time.Second, @@ -601,6 +603,45 @@ func TestObjectPutString(t *testing.T) { } } +func TestObjectPut(t *testing.T) { + headers := swift.Headers{} + + // Set content size incorrectly - should produce an error + headers["Content-Length"] = strconv.FormatInt(CONTENT_SIZE-1, 10) + contents := bytes.NewBufferString(CONTENTS) + h, err := c.ObjectPut(CONTAINER, OBJECT, contents, true, CONTENT_MD5, "text/plain", headers) + if err == nil { + t.Fatal("Expecting error but didn't get one") + } + + // Now set content size correctly + contents = bytes.NewBufferString(CONTENTS) + headers["Content-Length"] = strconv.FormatInt(CONTENT_SIZE, 10) + h, err = c.ObjectPut(CONTAINER, OBJECT, contents, true, CONTENT_MD5, "text/plain", headers) + if err != nil { + t.Fatal(err) + } + + if h["Etag"] != CONTENT_MD5 { + t.Errorf("Bad Etag want %q got %q", CONTENT_MD5, h["Etag"]) + } + + // Fetch object info and compare + info, _, err := c.Object(CONTAINER, OBJECT) + if err != nil { + t.Error(err) + } + if info.ContentType != "text/plain" { + t.Error("Bad content type", info.ContentType) + } + if info.Bytes != CONTENT_SIZE { + t.Error("Bad length") + } + if info.Hash != CONTENT_MD5 { + t.Error("Bad length") + } +} + func TestObjectEmpty(t *testing.T) { err := c.ObjectPutString(CONTAINER, EMPTYOBJECT, "", "") if err != nil { @@ -1493,6 +1534,14 @@ func TestTempUrl(t *testing.T) { if content, err := ioutil.ReadAll(resp.Body); err != nil || string(content) != CONTENTS { t.Error("Bad content", err) } + + resp, err := http.Post(tempUrl, "image/jpeg", bytes.NewReader([]byte(CONTENTS))) + if err != nil { + t.Fatal("Failed to retrieve file from temporary url") + } + if resp.StatusCode != 401 { + t.Fatal("Expecting server to forbid access to object") + } } resp.Body.Close() @@ -1500,7 +1549,17 @@ func TestTempUrl(t *testing.T) { if err != nil { t.Fatal(err) } +} +func TestQueryInfo(t *testing.T) { + infos, err := c.QueryInfo() + if err != nil { + t.Log("Server doesn't support querying info") + return + } + if _, ok := infos["swift"]; !ok { + t.Fatal("No 'swift' section found in configuration") + } } func TestContainerDelete(t *testing.T) { diff --git a/Godeps/_workspace/src/github.com/ncw/swift/swifttest/server.go b/Godeps/_workspace/src/github.com/ncw/swift/swifttest/server.go index 587c9bad..a49abb9a 100644 --- a/Godeps/_workspace/src/github.com/ncw/swift/swifttest/server.go +++ b/Godeps/_workspace/src/github.com/ncw/swift/swifttest/server.go @@ -443,7 +443,7 @@ func (objr objectResource) get(a *action) interface{} { if obj, ok := item.(*object); ok { length := len(obj.data) size += length - sum.Write([]byte(components[0] + "/" + obj.name + "\n")) + sum.Write([]byte(hex.EncodeToString(obj.checksum))) if start >= cursor+length { continue } @@ -668,24 +668,42 @@ func (s *SwiftServer) serveHTTP(w http.ResponseWriter, req *http.Request) { panic(notAuthorized()) } + if req.URL.String() == "/info" { + jsonMarshal(w, &swift.SwiftInfo{ + "swift": map[string]interface{}{ + "version": "1.2", + }, + "tempurl": map[string]interface{}{ + "methods": []string{"GET", "HEAD", "PUT"}, + }, + }) + return + } + r = s.resourceForURL(req.URL) key := req.Header.Get("x-auth-token") - if key == "" { - secretKey := "" - signature := req.URL.Query().Get("temp_url_sig") - expires := req.URL.Query().Get("temp_url_expires") + signature := req.URL.Query().Get("temp_url_sig") + expires := req.URL.Query().Get("temp_url_expires") + if key == "" && signature != "" && expires != "" { accountName, _, _, _ := s.parseURL(req.URL) + secretKey := "" if account, ok := s.Accounts[accountName]; ok { secretKey = account.meta.Get("X-Account-Meta-Temp-Url-Key") } - mac := hmac.New(sha1.New, []byte(secretKey)) - body := fmt.Sprintf("%s\n%s\n%s", req.Method, expires, req.URL.Path) - mac.Write([]byte(body)) - expectedSignature := hex.EncodeToString(mac.Sum(nil)) + get_hmac := func(method string) string { + mac := hmac.New(sha1.New, []byte(secretKey)) + body := fmt.Sprintf("%s\n%s\n%s", method, expires, req.URL.Path) + mac.Write([]byte(body)) + return hex.EncodeToString(mac.Sum(nil)) + } - if signature != expectedSignature { + if req.Method == "HEAD" { + if signature != get_hmac("GET") && signature != get_hmac("POST") && signature != get_hmac("PUT") { + panic(notAuthorized()) + } + } else if signature != get_hmac(req.Method) { panic(notAuthorized()) } } else { From 3ff8af326b4f941987142496dcbeaacd901806fb Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Thu, 29 Oct 2015 12:24:56 +0100 Subject: [PATCH 2/2] Ensure read after write for segments Signed-off-by: Sylvain Baubeau --- registry/storage/driver/swift/swift.go | 64 +++++++++++++++++++++----- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/registry/storage/driver/swift/swift.go b/registry/storage/driver/swift/swift.go index c9d623d3..b0237281 100644 --- a/registry/storage/driver/swift/swift.go +++ b/registry/storage/driver/swift/swift.go @@ -20,6 +20,7 @@ package swift import ( "bytes" + "crypto/md5" "crypto/rand" "crypto/sha1" "crypto/tls" @@ -52,6 +53,12 @@ const defaultChunkSize = 20 * 1024 * 1024 // minChunkSize defines the minimum size of a segment const minChunkSize = 1 << 20 +// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded +var readAfterWriteTimeout = 15 * time.Second + +// readAfterWriteWait defines the time to sleep between two retries +var readAfterWriteWait = 200 * time.Millisecond + // Parameters A struct that encapsulates all of the driver parameters after all values have been set type Parameters struct { Username string @@ -252,6 +259,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea partNumber := 1 chunkSize := int64(d.ChunkSize) zeroBuf := make([]byte, d.ChunkSize) + hash := md5.New() getSegment := func() string { return fmt.Sprintf("%s/%016d", segmentPath, partNumber) @@ -292,18 +300,13 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea return 0, err } - if createManifest { - if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil { - return 0, err - } - } - // First, we skip the existing segments that are not modified by this call for i := range segments { if offset < cursor+segments[i].Bytes { break } cursor += segments[i].Bytes + hash.Write([]byte(segments[i].Hash)) partNumber++ } @@ -312,7 +315,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea if offset >= currentLength { for offset-currentLength >= chunkSize { // Insert a block a zero - _, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil) + headers, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil) if err != nil { if err == swift.ObjectNotFound { return 0, storagedriver.PathNotFoundError{Path: getSegment()} @@ -321,6 +324,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea } currentLength += chunkSize partNumber++ + hash.Write([]byte(headers["Etag"])) } cursor = currentLength @@ -355,13 +359,23 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea return false, bytesRead, err } - n, err := io.Copy(currentSegment, multi) + segmentHash := md5.New() + writer := io.MultiWriter(currentSegment, segmentHash) + + n, err := io.Copy(writer, multi) if err != nil { return false, bytesRead, err } if n > 0 { - defer currentSegment.Close() + defer func() { + closeError := currentSegment.Close() + if err != nil { + err = closeError + } + hexHash := hex.EncodeToString(segmentHash.Sum(nil)) + hash.Write([]byte(hexHash)) + }() bytesRead += n - max(0, offset-cursor) } @@ -379,7 +393,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea return false, bytesRead, err } - _, copyErr := io.Copy(currentSegment, file) + _, copyErr := io.Copy(writer, file) if err := file.Close(); err != nil { if err == swift.ObjectNotFound { @@ -414,7 +428,35 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea } } - return bytesRead, nil + for ; partNumber < len(segments); partNumber++ { + hash.Write([]byte(segments[partNumber].Hash)) + } + + if createManifest { + if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil { + return 0, err + } + } + + expectedHash := hex.EncodeToString(hash.Sum(nil)) + waitingTime := readAfterWriteWait + endTime := time.Now().Add(readAfterWriteTimeout) + for { + var infos swift.Object + if infos, _, err = d.Conn.Object(d.Container, d.swiftPath(path)); err == nil { + if strings.Trim(infos.Hash, "\"") == expectedHash { + return bytesRead, nil + } + err = fmt.Errorf("Timeout expired while waiting for segments of %s to show up", path) + } + if time.Now().Add(waitingTime).After(endTime) { + break + } + time.Sleep(waitingTime) + waitingTime *= 2 + } + + return bytesRead, err } // Stat retrieves the FileInfo for the given path, including the current size