From 1336ced030492ddea4a5cb015e5cba731dd96d6a Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Wed, 19 Nov 2014 18:06:54 -0800 Subject: [PATCH 1/2] Updates client to newer routes and changes "layer" to "blob" --- client/client.go | 153 +++++++++++++++++++++++++----------------- client/client_test.go | 136 ++++++++++++++++++------------------- client/pull.go | 2 +- client/push.go | 46 +++++++------ errors.go | 51 ++++++-------- test/test.go | 44 +++++++++--- 6 files changed, 238 insertions(+), 194 deletions(-) diff --git a/client/client.go b/client/client.go index 2ea0e091..8eb156c5 100644 --- a/client/client.go +++ b/client/client.go @@ -16,57 +16,59 @@ import ( // Client implements the client interface to the registry http api type Client interface { // GetImageManifest returns an image manifest for the image at the given - // name, tag pair + // name, tag pair. GetImageManifest(name, tag string) (*registry.ImageManifest, error) // PutImageManifest uploads an image manifest for the image at the given - // name, tag pair + // name, tag pair. PutImageManifest(name, tag string, imageManifest *registry.ImageManifest) error - // DeleteImage removes the image at the given name, tag pair + // DeleteImage removes the image at the given name, tag pair. DeleteImage(name, tag string) error // ListImageTags returns a list of all image tags with the given repository - // name + // name. ListImageTags(name string) ([]string, error) - // GetImageLayer returns the image layer at the given name, tarsum pair in - // the form of an io.ReadCloser with the length of this layer - // A nonzero byteOffset can be provided to receive a partial layer beginning - // at the given offset - GetImageLayer(name, tarsum string, byteOffset int) (io.ReadCloser, int, error) + // BlobLength returns the length of the blob stored at the given name, + // digest pair. + // Returns a length value of -1 on error or if the blob does not exist. + BlobLength(name, digest string) (int, error) - // InitiateLayerUpload starts an image upload for the given name, tarsum - // pair and returns a unique location url to use for other layer upload - // methods - // Returns a *registry.LayerAlreadyExistsError if the layer already exists - // on the registry - InitiateLayerUpload(name, tarsum string) (string, error) + // GetBlob returns the blob stored at the given name, digest pair in the + // form of an io.ReadCloser with the length of this blob. + // A nonzero byteOffset can be provided to receive a partial blob beginning + // at the given offset. + GetBlob(name, digest string, byteOffset int) (io.ReadCloser, int, error) - // GetLayerUploadStatus returns the byte offset and length of the layer at - // the given upload location - GetLayerUploadStatus(location string) (int, int, error) + // InitiateBlobUpload starts a blob upload in the given repository namespace + // and returns a unique location url to use for other blob upload methods. + InitiateBlobUpload(name string) (string, error) - // UploadLayer uploads a full image layer to the registry - UploadLayer(location string, layer io.ReadCloser, length int, checksum *registry.Checksum) error + // GetBlobUploadStatus returns the byte offset and length of the blob at the + // given upload location. + GetBlobUploadStatus(location string) (int, int, error) - // UploadLayerChunk uploads a layer chunk with a given length and startByte - // to the registry - // FinishChunkedLayerUpload must be called to finalize this upload - UploadLayerChunk(location string, layerChunk io.ReadCloser, length, startByte int) error + // UploadBlob uploads a full blob to the registry. + UploadBlob(location string, blob io.ReadCloser, length int, digest string) error - // FinishChunkedLayerUpload completes a chunked layer upload at a given - // location - FinishChunkedLayerUpload(location string, length int, checksum *registry.Checksum) error + // UploadBlobChunk uploads a blob chunk with a given length and startByte to + // the registry. + // FinishChunkedBlobUpload must be called to finalize this upload. + UploadBlobChunk(location string, blobChunk io.ReadCloser, length, startByte int) error - // CancelLayerUpload deletes all content at the unfinished layer upload - // location and invalidates any future calls to this layer upload - CancelLayerUpload(location string) error + // FinishChunkedBlobUpload completes a chunked blob upload at a given + // location. + FinishChunkedBlobUpload(location string, length int, digest string) error + + // CancelBlobUpload deletes all content at the unfinished blob upload + // location and invalidates any future calls to this blob upload. + CancelBlobUpload(location string) error } // New returns a new Client which operates against a registry with the // given base endpoint -// This endpoint should not include /v2/ or any part of the url after this +// This endpoint should not include /v2/ or any part of the url after this. func New(endpoint string) Client { return &clientImpl{endpoint} } @@ -220,9 +222,41 @@ func (r *clientImpl) ListImageTags(name string) ([]string, error) { return tags.Tags, nil } -func (r *clientImpl) GetImageLayer(name, tarsum string, byteOffset int) (io.ReadCloser, int, error) { +func (r *clientImpl) BlobLength(name, digest string) (int, error) { + response, err := http.Head(fmt.Sprintf("%s/v2/%s/blob/%s", r.Endpoint, name, digest)) + if err != nil { + return -1, err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusOK: + lengthHeader := response.Header.Get("Content-Length") + length, err := strconv.ParseInt(lengthHeader, 10, 0) + if err != nil { + return -1, err + } + return int(length), nil + case response.StatusCode == http.StatusNotFound: + return -1, nil + case response.StatusCode >= 400 && response.StatusCode < 500: + errors := new(registry.Errors) + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errors) + if err != nil { + return -1, err + } + return -1, errors + default: + response.Body.Close() + return -1, ®istry.UnexpectedHTTPStatusError{Status: response.Status} + } +} + +func (r *clientImpl) GetBlob(name, digest string, byteOffset int) (io.ReadCloser, int, error) { getRequest, err := http.NewRequest("GET", - fmt.Sprintf("%s/v2/%s/layer/%s", r.Endpoint, name, tarsum), nil) + fmt.Sprintf("%s/v2/%s/blob/%s", r.Endpoint, name, digest), nil) if err != nil { return nil, 0, err } @@ -233,9 +267,6 @@ func (r *clientImpl) GetImageLayer(name, tarsum string, byteOffset int) (io.Read return nil, 0, err } - if response.StatusCode == http.StatusNotFound { - return nil, 0, ®istry.LayerNotFoundError{Name: name, TarSum: tarsum} - } // TODO(bbland): handle other status codes, like 5xx errors switch { case response.StatusCode == http.StatusOK: @@ -247,7 +278,7 @@ func (r *clientImpl) GetImageLayer(name, tarsum string, byteOffset int) (io.Read return response.Body, int(length), nil case response.StatusCode == http.StatusNotFound: response.Body.Close() - return nil, 0, ®istry.LayerNotFoundError{Name: name, TarSum: tarsum} + return nil, 0, ®istry.BlobNotFoundError{Name: name, Digest: digest} case response.StatusCode >= 400 && response.StatusCode < 500: errors := new(registry.Errors) decoder := json.NewDecoder(response.Body) @@ -262,9 +293,9 @@ func (r *clientImpl) GetImageLayer(name, tarsum string, byteOffset int) (io.Read } } -func (r *clientImpl) InitiateLayerUpload(name, tarsum string) (string, error) { +func (r *clientImpl) InitiateBlobUpload(name string) (string, error) { postRequest, err := http.NewRequest("POST", - fmt.Sprintf("%s/v2/%s/layer/%s/upload/", r.Endpoint, name, tarsum), nil) + fmt.Sprintf("%s/v2/%s/blob/upload/", r.Endpoint, name), nil) if err != nil { return "", err } @@ -279,8 +310,8 @@ func (r *clientImpl) InitiateLayerUpload(name, tarsum string) (string, error) { switch { case response.StatusCode == http.StatusAccepted: return response.Header.Get("Location"), nil - case response.StatusCode == http.StatusNotModified: - return "", ®istry.LayerAlreadyExistsError{Name: name, TarSum: tarsum} + // case response.StatusCode == http.StatusNotFound: + // return case response.StatusCode >= 400 && response.StatusCode < 500: errors := new(registry.Errors) decoder := json.NewDecoder(response.Body) @@ -294,7 +325,7 @@ func (r *clientImpl) InitiateLayerUpload(name, tarsum string) (string, error) { } } -func (r *clientImpl) GetLayerUploadStatus(location string) (int, int, error) { +func (r *clientImpl) GetBlobUploadStatus(location string) (int, int, error) { response, err := http.Get(fmt.Sprintf("%s%s", r.Endpoint, location)) if err != nil { return 0, 0, err @@ -306,7 +337,7 @@ func (r *clientImpl) GetLayerUploadStatus(location string) (int, int, error) { case response.StatusCode == http.StatusNoContent: return parseRangeHeader(response.Header.Get("Range")) case response.StatusCode == http.StatusNotFound: - return 0, 0, ®istry.LayerUploadNotFoundError{Location: location} + return 0, 0, ®istry.BlobUploadNotFoundError{Location: location} case response.StatusCode >= 400 && response.StatusCode < 500: errors := new(registry.Errors) decoder := json.NewDecoder(response.Body) @@ -320,18 +351,18 @@ func (r *clientImpl) GetLayerUploadStatus(location string) (int, int, error) { } } -func (r *clientImpl) UploadLayer(location string, layer io.ReadCloser, length int, checksum *registry.Checksum) error { - defer layer.Close() +func (r *clientImpl) UploadBlob(location string, blob io.ReadCloser, length int, digest string) error { + defer blob.Close() putRequest, err := http.NewRequest("PUT", - fmt.Sprintf("%s%s", r.Endpoint, location), layer) + fmt.Sprintf("%s%s", r.Endpoint, location), blob) if err != nil { return err } queryValues := url.Values{} queryValues.Set("length", fmt.Sprint(length)) - queryValues.Set(checksum.HashAlgorithm, checksum.Sum) + queryValues.Set("digest", digest) putRequest.URL.RawQuery = queryValues.Encode() putRequest.Header.Set("Content-Type", "application/octet-stream") @@ -348,7 +379,7 @@ func (r *clientImpl) UploadLayer(location string, layer io.ReadCloser, length in case response.StatusCode == http.StatusCreated: return nil case response.StatusCode == http.StatusNotFound: - return ®istry.LayerUploadNotFoundError{Location: location} + return ®istry.BlobUploadNotFoundError{Location: location} case response.StatusCode >= 400 && response.StatusCode < 500: errors := new(registry.Errors) decoder := json.NewDecoder(response.Body) @@ -362,11 +393,11 @@ func (r *clientImpl) UploadLayer(location string, layer io.ReadCloser, length in } } -func (r *clientImpl) UploadLayerChunk(location string, layerChunk io.ReadCloser, length, startByte int) error { - defer layerChunk.Close() +func (r *clientImpl) UploadBlobChunk(location string, blobChunk io.ReadCloser, length, startByte int) error { + defer blobChunk.Close() putRequest, err := http.NewRequest("PUT", - fmt.Sprintf("%s%s", r.Endpoint, location), layerChunk) + fmt.Sprintf("%s%s", r.Endpoint, location), blobChunk) if err != nil { return err } @@ -389,17 +420,17 @@ func (r *clientImpl) UploadLayerChunk(location string, layerChunk io.ReadCloser, case response.StatusCode == http.StatusAccepted: return nil case response.StatusCode == http.StatusRequestedRangeNotSatisfiable: - lastValidRange, layerSize, err := parseRangeHeader(response.Header.Get("Range")) + lastValidRange, blobSize, err := parseRangeHeader(response.Header.Get("Range")) if err != nil { return err } - return ®istry.LayerUploadInvalidRangeError{ + return ®istry.BlobUploadInvalidRangeError{ Location: location, LastValidRange: lastValidRange, - LayerSize: layerSize, + BlobSize: blobSize, } case response.StatusCode == http.StatusNotFound: - return ®istry.LayerUploadNotFoundError{Location: location} + return ®istry.BlobUploadNotFoundError{Location: location} case response.StatusCode >= 400 && response.StatusCode < 500: errors := new(registry.Errors) decoder := json.NewDecoder(response.Body) @@ -413,7 +444,7 @@ func (r *clientImpl) UploadLayerChunk(location string, layerChunk io.ReadCloser, } } -func (r *clientImpl) FinishChunkedLayerUpload(location string, length int, checksum *registry.Checksum) error { +func (r *clientImpl) FinishChunkedBlobUpload(location string, length int, digest string) error { putRequest, err := http.NewRequest("PUT", fmt.Sprintf("%s%s", r.Endpoint, location), nil) if err != nil { @@ -422,7 +453,7 @@ func (r *clientImpl) FinishChunkedLayerUpload(location string, length int, check queryValues := new(url.Values) queryValues.Set("length", fmt.Sprint(length)) - queryValues.Set(checksum.HashAlgorithm, checksum.Sum) + queryValues.Set("digest", digest) putRequest.URL.RawQuery = queryValues.Encode() putRequest.Header.Set("Content-Type", "application/octet-stream") @@ -441,7 +472,7 @@ func (r *clientImpl) FinishChunkedLayerUpload(location string, length int, check case response.StatusCode == http.StatusCreated: return nil case response.StatusCode == http.StatusNotFound: - return ®istry.LayerUploadNotFoundError{Location: location} + return ®istry.BlobUploadNotFoundError{Location: location} case response.StatusCode >= 400 && response.StatusCode < 500: errors := new(registry.Errors) decoder := json.NewDecoder(response.Body) @@ -455,7 +486,7 @@ func (r *clientImpl) FinishChunkedLayerUpload(location string, length int, check } } -func (r *clientImpl) CancelLayerUpload(location string) error { +func (r *clientImpl) CancelBlobUpload(location string) error { deleteRequest, err := http.NewRequest("DELETE", fmt.Sprintf("%s%s", r.Endpoint, location), nil) if err != nil { @@ -473,7 +504,7 @@ func (r *clientImpl) CancelLayerUpload(location string) error { case response.StatusCode == http.StatusNoContent: return nil case response.StatusCode == http.StatusNotFound: - return ®istry.LayerUploadNotFoundError{Location: location} + return ®istry.BlobUploadNotFoundError{Location: location} case response.StatusCode >= 400 && response.StatusCode < 500: errors := new(registry.Errors) decoder := json.NewDecoder(response.Body) @@ -490,7 +521,7 @@ func (r *clientImpl) CancelLayerUpload(location string) error { // imageManifestURL is a helper method for returning the full url to an image // manifest func (r *clientImpl) imageManifestURL(name, tag string) string { - return fmt.Sprintf("%s/v2/%s/image/%s", r.Endpoint, name, tag) + return fmt.Sprintf("%s/v2/%s/manifest/%s", r.Endpoint, name, tag) } // parseRangeHeader parses out the offset and length from a returned Range diff --git a/client/client_test.go b/client/client_test.go index e900463a..9840ae44 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -13,85 +13,87 @@ import ( "github.com/docker/docker-registry/test" ) -type testLayer struct { - tarSum string +type testBlob struct { + digest string contents []byte } func TestPush(t *testing.T) { name := "hello/world" tag := "sometag" - testLayers := []testLayer{ + testBlobs := []testBlob{ { - tarSum: "12345", + digest: "12345", contents: []byte("some contents"), }, { - tarSum: "98765", + digest: "98765", contents: []byte("some other contents"), }, } - uploadLocations := make([]string, len(testLayers)) - layers := make([]registry.FSLayer, len(testLayers)) - history := make([]registry.ManifestHistory, len(testLayers)) + uploadLocations := make([]string, len(testBlobs)) + blobs := make([]registry.FSLayer, len(testBlobs)) + history := make([]registry.ManifestHistory, len(testBlobs)) - for i, layer := range testLayers { - uploadLocations[i] = fmt.Sprintf("/v2/%s/layer/%s/upload-location-%d", name, layer.tarSum, i) - layers[i] = registry.FSLayer{BlobSum: layer.tarSum} - history[i] = registry.ManifestHistory{V1Compatibility: layer.tarSum} + for i, blob := range testBlobs { + // TODO(bbland): this is returning the same location for all uploads, + // because we can't know which blob will get which location. + // It's sort of okay because we're using unique digests, but this needs + // to change at some point. + uploadLocations[i] = fmt.Sprintf("/v2/%s/blob/test-uuid", name) + blobs[i] = registry.FSLayer{BlobSum: blob.digest} + history[i] = registry.ManifestHistory{V1Compatibility: blob.digest} } manifest := ®istry.ImageManifest{ Name: name, Tag: tag, Architecture: "x86", - FSLayers: layers, + FSLayers: blobs, History: history, SchemaVersion: 1, } manifestBytes, err := json.Marshal(manifest) - layerRequestResponseMappings := make([]test.RequestResponseMapping, 2*len(testLayers)) - for i, layer := range testLayers { - layerRequestResponseMappings[2*i] = test.RequestResponseMapping{ + blobRequestResponseMappings := make([]test.RequestResponseMapping, 2*len(testBlobs)) + for i, blob := range testBlobs { + blobRequestResponseMappings[2*i] = test.RequestResponseMapping{ Request: test.Request{ Method: "POST", - Route: "/v2/" + name + "/layer/" + layer.tarSum + "/upload/", + Route: "/v2/" + name + "/blob/upload/", }, - Responses: []test.Response{ - { - StatusCode: http.StatusAccepted, - Headers: http.Header(map[string][]string{ - "Location": {uploadLocations[i]}, - }), - }, + Response: test.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Location": {uploadLocations[i]}, + }), }, } - layerRequestResponseMappings[2*i+1] = test.RequestResponseMapping{ + blobRequestResponseMappings[2*i+1] = test.RequestResponseMapping{ Request: test.Request{ Method: "PUT", Route: uploadLocations[i], - Body: layer.contents, - }, - Responses: []test.Response{ - { - StatusCode: http.StatusCreated, + QueryParams: map[string][]string{ + "length": {fmt.Sprint(len(blob.contents))}, + "digest": {blob.digest}, }, + Body: blob.contents, + }, + Response: test.Response{ + StatusCode: http.StatusCreated, }, } } - handler := test.NewHandler(append(layerRequestResponseMappings, test.RequestResponseMap{ + handler := test.NewHandler(append(blobRequestResponseMappings, test.RequestResponseMap{ test.RequestResponseMapping{ Request: test.Request{ Method: "PUT", - Route: "/v2/" + name + "/image/" + tag, + Route: "/v2/" + name + "/manifest/" + tag, Body: manifestBytes, }, - Responses: []test.Response{ - { - StatusCode: http.StatusOK, - }, + Response: test.Response{ + StatusCode: http.StatusOK, }, }, }...)) @@ -103,8 +105,8 @@ func TestPush(t *testing.T) { layerStorage: make(map[string]Layer), } - for _, layer := range testLayers { - l, err := objectStore.Layer(layer.tarSum) + for _, blob := range testBlobs { + l, err := objectStore.Layer(blob.digest) if err != nil { t.Fatal(err) } @@ -114,7 +116,7 @@ func TestPush(t *testing.T) { t.Fatal(err) } - writer.Write(layer.contents) + writer.Write(blob.contents) writer.Close() } @@ -129,61 +131,57 @@ func TestPush(t *testing.T) { func TestPull(t *testing.T) { name := "hello/world" tag := "sometag" - testLayers := []testLayer{ + testBlobs := []testBlob{ { - tarSum: "12345", + digest: "12345", contents: []byte("some contents"), }, { - tarSum: "98765", + digest: "98765", contents: []byte("some other contents"), }, } - layers := make([]registry.FSLayer, len(testLayers)) - history := make([]registry.ManifestHistory, len(testLayers)) + blobs := make([]registry.FSLayer, len(testBlobs)) + history := make([]registry.ManifestHistory, len(testBlobs)) - for i, layer := range testLayers { - layers[i] = registry.FSLayer{BlobSum: layer.tarSum} - history[i] = registry.ManifestHistory{V1Compatibility: layer.tarSum} + for i, blob := range testBlobs { + blobs[i] = registry.FSLayer{BlobSum: blob.digest} + history[i] = registry.ManifestHistory{V1Compatibility: blob.digest} } manifest := ®istry.ImageManifest{ Name: name, Tag: tag, Architecture: "x86", - FSLayers: layers, + FSLayers: blobs, History: history, SchemaVersion: 1, } manifestBytes, err := json.Marshal(manifest) - layerRequestResponseMappings := make([]test.RequestResponseMapping, len(testLayers)) - for i, layer := range testLayers { - layerRequestResponseMappings[i] = test.RequestResponseMapping{ + blobRequestResponseMappings := make([]test.RequestResponseMapping, len(testBlobs)) + for i, blob := range testBlobs { + blobRequestResponseMappings[i] = test.RequestResponseMapping{ Request: test.Request{ Method: "GET", - Route: "/v2/" + name + "/layer/" + layer.tarSum, + Route: "/v2/" + name + "/blob/" + blob.digest, }, - Responses: []test.Response{ - { - StatusCode: http.StatusOK, - Body: layer.contents, - }, + Response: test.Response{ + StatusCode: http.StatusOK, + Body: blob.contents, }, } } - handler := test.NewHandler(append(layerRequestResponseMappings, test.RequestResponseMap{ + handler := test.NewHandler(append(blobRequestResponseMappings, test.RequestResponseMap{ test.RequestResponseMapping{ Request: test.Request{ Method: "GET", - Route: "/v2/" + name + "/image/" + tag, + Route: "/v2/" + name + "/manifest/" + tag, }, - Responses: []test.Response{ - { - StatusCode: http.StatusOK, - Body: manifestBytes, - }, + Response: test.Response{ + StatusCode: http.StatusOK, + Body: manifestBytes, }, }, }...)) @@ -214,8 +212,8 @@ func TestPull(t *testing.T) { t.Fatal("Incorrect manifest") } - for _, layer := range testLayers { - l, err := objectStore.Layer(layer.tarSum) + for _, blob := range testBlobs { + l, err := objectStore.Layer(blob.digest) if err != nil { t.Fatal(err) } @@ -226,13 +224,13 @@ func TestPull(t *testing.T) { } defer reader.Close() - layerBytes, err := ioutil.ReadAll(reader) + blobBytes, err := ioutil.ReadAll(reader) if err != nil { t.Fatal(err) } - if string(layerBytes) != string(layer.contents) { - t.Fatal("Incorrect layer") + if string(blobBytes) != string(blob.contents) { + t.Fatal("Incorrect blob") } } } diff --git a/client/pull.go b/client/pull.go index 75cc9af1..825b0c06 100644 --- a/client/pull.go +++ b/client/pull.go @@ -99,7 +99,7 @@ func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. } defer writer.Close() - layerReader, length, err := c.GetImageLayer(name, fsLayer.BlobSum, 0) + layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, 0) if err != nil { log.WithFields(log.Fields{ "error": err, diff --git a/client/push.go b/client/push.go index a1fb0e23..91bd9af6 100644 --- a/client/push.go +++ b/client/push.go @@ -2,7 +2,6 @@ package client import ( "bytes" - "crypto/sha1" "io" "io/ioutil" @@ -89,25 +88,10 @@ func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. }).Warn("Unable to read local layer") return err } - - location, err := c.InitiateLayerUpload(name, fsLayer.BlobSum) - if _, ok := err.(*registry.LayerAlreadyExistsError); ok { - log.WithField("layer", fsLayer).Info("Layer already exists") - return nil - } - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to upload layer") - return err - } + defer layerReader.Close() layerBuffer := new(bytes.Buffer) - checksum := sha1.New() - teeReader := io.TeeReader(layerReader, checksum) - - _, err = io.Copy(layerBuffer, teeReader) + layerSize, err := io.Copy(layerBuffer, layerReader) if err != nil { log.WithFields(log.Fields{ "error": err, @@ -116,9 +100,29 @@ func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. return err } - err = c.UploadLayer(location, ioutil.NopCloser(layerBuffer), layerBuffer.Len(), - ®istry.Checksum{HashAlgorithm: "sha1", Sum: string(checksum.Sum(nil))}, - ) + length, err := c.BlobLength(name, fsLayer.BlobSum) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to check existence of remote layer") + return err + } + if length >= 0 { + log.WithField("layer", fsLayer).Info("Layer already exists") + return nil + } + + location, err := c.InitiateBlobUpload(name) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to upload layer") + return err + } + + err = c.UploadBlob(location, ioutil.NopCloser(layerBuffer), int(layerSize), fsLayer.BlobSum) if err != nil { log.WithFields(log.Fields{ "error": err, diff --git a/errors.go b/errors.go index abec4965..bb59a5af 100644 --- a/errors.go +++ b/errors.go @@ -224,57 +224,44 @@ func (e *ImageManifestNotFoundError) Error() string { e.Name, e.Tag) } -// LayerAlreadyExistsError is returned when attempting to create a new layer -// that already exists in the registry. -type LayerAlreadyExistsError struct { - Name string - TarSum string -} - -func (e *LayerAlreadyExistsError) Error() string { - return fmt.Sprintf("Layer already found with Name: %s, TarSum: %s", - e.Name, e.TarSum) -} - -// LayerNotFoundError is returned when making an operation against a given image +// BlobNotFoundError is returned when making an operation against a given image // layer that does not exist in the registry. -type LayerNotFoundError struct { +type BlobNotFoundError struct { Name string - TarSum string + Digest string } -func (e *LayerNotFoundError) Error() string { - return fmt.Sprintf("No layer found with Name: %s, TarSum: %s", - e.Name, e.TarSum) +func (e *BlobNotFoundError) Error() string { + return fmt.Sprintf("No blob found with Name: %s, Digest: %s", + e.Name, e.Digest) } -// LayerUploadNotFoundError is returned when making a layer upload operation -// against an invalid layer upload location url +// BlobUploadNotFoundError is returned when making a blob upload operation against an +// invalid blob upload location url. // This may be the result of using a cancelled, completed, or stale upload // location. -type LayerUploadNotFoundError struct { +type BlobUploadNotFoundError struct { Location string } -func (e *LayerUploadNotFoundError) Error() string { - return fmt.Sprintf("No layer found upload found at Location: %s", - e.Location) +func (e *BlobUploadNotFoundError) Error() string { + return fmt.Sprintf("No blob upload found at Location: %s", e.Location) } -// LayerUploadInvalidRangeError is returned when attempting to upload an image -// layer chunk that is out of order. -// This provides the known LayerSize and LastValidRange which can be used to +// BlobUploadInvalidRangeError is returned when attempting to upload an image +// blob chunk that is out of order. +// This provides the known BlobSize and LastValidRange which can be used to // resume the upload. -type LayerUploadInvalidRangeError struct { +type BlobUploadInvalidRangeError struct { Location string LastValidRange int - LayerSize int + BlobSize int } -func (e *LayerUploadInvalidRangeError) Error() string { +func (e *BlobUploadInvalidRangeError) Error() string { return fmt.Sprintf( - "Invalid range provided for upload at Location: %s. Last Valid Range: %d, Layer Size: %d", - e.Location, e.LastValidRange, e.LayerSize) + "Invalid range provided for upload at Location: %s. Last Valid Range: %d, Blob Size: %d", + e.Location, e.LastValidRange, e.BlobSize) } // UnexpectedHTTPStatusError is returned when an unexpected HTTP status is diff --git a/test/test.go b/test/test.go index 71588819..24a08f75 100644 --- a/test/test.go +++ b/test/test.go @@ -6,16 +6,18 @@ import ( "io" "io/ioutil" "net/http" + "sort" + "strings" ) -// RequestResponseMap is a mapping from Requests to Responses +// RequestResponseMap is an ordered mapping from Requests to Responses type RequestResponseMap []RequestResponseMapping -// RequestResponseMapping defines an ordered list of Responses to be sent in -// response to a given Request +// RequestResponseMapping defines a Response to be sent in response to a given +// Request type RequestResponseMapping struct { - Request Request - Responses []Response + Request Request + Response Response } // TODO(bbland): add support for request headers @@ -28,12 +30,28 @@ type Request struct { // Route is the http route of this request Route string + // QueryParams are the query parameters of this request + QueryParams map[string][]string + // Body is the byte contents of the http request Body []byte } func (r Request) String() string { - return fmt.Sprintf("%s %s\n%s", r.Method, r.Route, r.Body) + queryString := "" + if len(r.QueryParams) > 0 { + queryString = "?" + keys := make([]string, 0, len(r.QueryParams)) + for k := range r.QueryParams { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + queryString += strings.Join(r.QueryParams[k], "&") + "&" + } + queryString = queryString[:len(queryString)-1] + } + return fmt.Sprintf("%s %s%s\n%s", r.Method, r.Route, queryString, r.Body) } // Response is a simplified http.Response object @@ -61,7 +79,12 @@ type testHandler struct { func NewHandler(requestResponseMap RequestResponseMap) http.Handler { responseMap := make(map[string][]Response) for _, mapping := range requestResponseMap { - responseMap[mapping.Request.String()] = mapping.Responses + responses, ok := responseMap[mapping.Request.String()] + if ok { + responseMap[mapping.Request.String()] = append(responses, mapping.Response) + } else { + responseMap[mapping.Request.String()] = []Response{mapping.Response} + } } return &testHandler{responseMap: responseMap} } @@ -71,9 +94,10 @@ func (app *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { requestBody, _ := ioutil.ReadAll(r.Body) request := Request{ - Method: r.Method, - Route: r.URL.Path, - Body: requestBody, + Method: r.Method, + Route: r.URL.Path, + QueryParams: r.URL.Query(), + Body: requestBody, } responses, ok := app.responseMap[request.String()] From 64c8bd29ccecbcdf5f9c6f56eec05915703b5293 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Wed, 19 Nov 2014 18:52:09 -0800 Subject: [PATCH 2/2] Uses new digest package instead of string digests Also drops extraneous test package and uses testutil instead --- client/client.go | 27 ++++++----- client/client_test.go | 55 +++++++++++----------- client/objectstore.go | 11 +++-- test/test.go => common/testutil/handler.go | 2 +- errors.go | 4 +- images.go | 3 +- 6 files changed, 54 insertions(+), 48 deletions(-) rename test/test.go => common/testutil/handler.go (99%) diff --git a/client/client.go b/client/client.go index 8eb156c5..944050e0 100644 --- a/client/client.go +++ b/client/client.go @@ -11,6 +11,7 @@ import ( "strconv" "github.com/docker/docker-registry" + "github.com/docker/docker-registry/digest" ) // Client implements the client interface to the registry http api @@ -33,13 +34,13 @@ type Client interface { // BlobLength returns the length of the blob stored at the given name, // digest pair. // Returns a length value of -1 on error or if the blob does not exist. - BlobLength(name, digest string) (int, error) + BlobLength(name string, dgst digest.Digest) (int, error) // GetBlob returns the blob stored at the given name, digest pair in the // form of an io.ReadCloser with the length of this blob. // A nonzero byteOffset can be provided to receive a partial blob beginning // at the given offset. - GetBlob(name, digest string, byteOffset int) (io.ReadCloser, int, error) + GetBlob(name string, dgst digest.Digest, byteOffset int) (io.ReadCloser, int, error) // InitiateBlobUpload starts a blob upload in the given repository namespace // and returns a unique location url to use for other blob upload methods. @@ -50,7 +51,7 @@ type Client interface { GetBlobUploadStatus(location string) (int, int, error) // UploadBlob uploads a full blob to the registry. - UploadBlob(location string, blob io.ReadCloser, length int, digest string) error + UploadBlob(location string, blob io.ReadCloser, length int, dgst digest.Digest) error // UploadBlobChunk uploads a blob chunk with a given length and startByte to // the registry. @@ -59,7 +60,7 @@ type Client interface { // FinishChunkedBlobUpload completes a chunked blob upload at a given // location. - FinishChunkedBlobUpload(location string, length int, digest string) error + FinishChunkedBlobUpload(location string, length int, dgst digest.Digest) error // CancelBlobUpload deletes all content at the unfinished blob upload // location and invalidates any future calls to this blob upload. @@ -222,8 +223,8 @@ func (r *clientImpl) ListImageTags(name string) ([]string, error) { return tags.Tags, nil } -func (r *clientImpl) BlobLength(name, digest string) (int, error) { - response, err := http.Head(fmt.Sprintf("%s/v2/%s/blob/%s", r.Endpoint, name, digest)) +func (r *clientImpl) BlobLength(name string, dgst digest.Digest) (int, error) { + response, err := http.Head(fmt.Sprintf("%s/v2/%s/blob/%s", r.Endpoint, name, dgst)) if err != nil { return -1, err } @@ -254,9 +255,9 @@ func (r *clientImpl) BlobLength(name, digest string) (int, error) { } } -func (r *clientImpl) GetBlob(name, digest string, byteOffset int) (io.ReadCloser, int, error) { +func (r *clientImpl) GetBlob(name string, dgst digest.Digest, byteOffset int) (io.ReadCloser, int, error) { getRequest, err := http.NewRequest("GET", - fmt.Sprintf("%s/v2/%s/blob/%s", r.Endpoint, name, digest), nil) + fmt.Sprintf("%s/v2/%s/blob/%s", r.Endpoint, name, dgst), nil) if err != nil { return nil, 0, err } @@ -278,7 +279,7 @@ func (r *clientImpl) GetBlob(name, digest string, byteOffset int) (io.ReadCloser return response.Body, int(length), nil case response.StatusCode == http.StatusNotFound: response.Body.Close() - return nil, 0, ®istry.BlobNotFoundError{Name: name, Digest: digest} + return nil, 0, ®istry.BlobNotFoundError{Name: name, Digest: dgst} case response.StatusCode >= 400 && response.StatusCode < 500: errors := new(registry.Errors) decoder := json.NewDecoder(response.Body) @@ -351,7 +352,7 @@ func (r *clientImpl) GetBlobUploadStatus(location string) (int, int, error) { } } -func (r *clientImpl) UploadBlob(location string, blob io.ReadCloser, length int, digest string) error { +func (r *clientImpl) UploadBlob(location string, blob io.ReadCloser, length int, dgst digest.Digest) error { defer blob.Close() putRequest, err := http.NewRequest("PUT", @@ -362,7 +363,7 @@ func (r *clientImpl) UploadBlob(location string, blob io.ReadCloser, length int, queryValues := url.Values{} queryValues.Set("length", fmt.Sprint(length)) - queryValues.Set("digest", digest) + queryValues.Set("digest", dgst.String()) putRequest.URL.RawQuery = queryValues.Encode() putRequest.Header.Set("Content-Type", "application/octet-stream") @@ -444,7 +445,7 @@ func (r *clientImpl) UploadBlobChunk(location string, blobChunk io.ReadCloser, l } } -func (r *clientImpl) FinishChunkedBlobUpload(location string, length int, digest string) error { +func (r *clientImpl) FinishChunkedBlobUpload(location string, length int, dgst digest.Digest) error { putRequest, err := http.NewRequest("PUT", fmt.Sprintf("%s%s", r.Endpoint, location), nil) if err != nil { @@ -453,7 +454,7 @@ func (r *clientImpl) FinishChunkedBlobUpload(location string, length int, digest queryValues := new(url.Values) queryValues.Set("length", fmt.Sprint(length)) - queryValues.Set("digest", digest) + queryValues.Set("digest", dgst.String()) putRequest.URL.RawQuery = queryValues.Encode() putRequest.Header.Set("Content-Type", "application/octet-stream") diff --git a/client/client_test.go b/client/client_test.go index 9840ae44..a77e7665 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -10,11 +10,12 @@ import ( "testing" "github.com/docker/docker-registry" - "github.com/docker/docker-registry/test" + "github.com/docker/docker-registry/common/testutil" + "github.com/docker/docker-registry/digest" ) type testBlob struct { - digest string + digest digest.Digest contents []byte } @@ -42,7 +43,7 @@ func TestPush(t *testing.T) { // to change at some point. uploadLocations[i] = fmt.Sprintf("/v2/%s/blob/test-uuid", name) blobs[i] = registry.FSLayer{BlobSum: blob.digest} - history[i] = registry.ManifestHistory{V1Compatibility: blob.digest} + history[i] = registry.ManifestHistory{V1Compatibility: blob.digest.String()} } manifest := ®istry.ImageManifest{ @@ -55,44 +56,44 @@ func TestPush(t *testing.T) { } manifestBytes, err := json.Marshal(manifest) - blobRequestResponseMappings := make([]test.RequestResponseMapping, 2*len(testBlobs)) + blobRequestResponseMappings := make([]testutil.RequestResponseMapping, 2*len(testBlobs)) for i, blob := range testBlobs { - blobRequestResponseMappings[2*i] = test.RequestResponseMapping{ - Request: test.Request{ + blobRequestResponseMappings[2*i] = testutil.RequestResponseMapping{ + Request: testutil.Request{ Method: "POST", Route: "/v2/" + name + "/blob/upload/", }, - Response: test.Response{ + Response: testutil.Response{ StatusCode: http.StatusAccepted, Headers: http.Header(map[string][]string{ "Location": {uploadLocations[i]}, }), }, } - blobRequestResponseMappings[2*i+1] = test.RequestResponseMapping{ - Request: test.Request{ + blobRequestResponseMappings[2*i+1] = testutil.RequestResponseMapping{ + Request: testutil.Request{ Method: "PUT", Route: uploadLocations[i], QueryParams: map[string][]string{ "length": {fmt.Sprint(len(blob.contents))}, - "digest": {blob.digest}, + "digest": {blob.digest.String()}, }, Body: blob.contents, }, - Response: test.Response{ + Response: testutil.Response{ StatusCode: http.StatusCreated, }, } } - handler := test.NewHandler(append(blobRequestResponseMappings, test.RequestResponseMap{ - test.RequestResponseMapping{ - Request: test.Request{ + handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{ + testutil.RequestResponseMapping{ + Request: testutil.Request{ Method: "PUT", Route: "/v2/" + name + "/manifest/" + tag, Body: manifestBytes, }, - Response: test.Response{ + Response: testutil.Response{ StatusCode: http.StatusOK, }, }, @@ -102,7 +103,7 @@ func TestPush(t *testing.T) { objectStore := &memoryObjectStore{ mutex: new(sync.Mutex), manifestStorage: make(map[string]*registry.ImageManifest), - layerStorage: make(map[string]Layer), + layerStorage: make(map[digest.Digest]Layer), } for _, blob := range testBlobs { @@ -146,7 +147,7 @@ func TestPull(t *testing.T) { for i, blob := range testBlobs { blobs[i] = registry.FSLayer{BlobSum: blob.digest} - history[i] = registry.ManifestHistory{V1Compatibility: blob.digest} + history[i] = registry.ManifestHistory{V1Compatibility: blob.digest.String()} } manifest := ®istry.ImageManifest{ @@ -159,27 +160,27 @@ func TestPull(t *testing.T) { } manifestBytes, err := json.Marshal(manifest) - blobRequestResponseMappings := make([]test.RequestResponseMapping, len(testBlobs)) + blobRequestResponseMappings := make([]testutil.RequestResponseMapping, len(testBlobs)) for i, blob := range testBlobs { - blobRequestResponseMappings[i] = test.RequestResponseMapping{ - Request: test.Request{ + blobRequestResponseMappings[i] = testutil.RequestResponseMapping{ + Request: testutil.Request{ Method: "GET", - Route: "/v2/" + name + "/blob/" + blob.digest, + Route: "/v2/" + name + "/blob/" + blob.digest.String(), }, - Response: test.Response{ + Response: testutil.Response{ StatusCode: http.StatusOK, Body: blob.contents, }, } } - handler := test.NewHandler(append(blobRequestResponseMappings, test.RequestResponseMap{ - test.RequestResponseMapping{ - Request: test.Request{ + handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{ + testutil.RequestResponseMapping{ + Request: testutil.Request{ Method: "GET", Route: "/v2/" + name + "/manifest/" + tag, }, - Response: test.Response{ + Response: testutil.Response{ StatusCode: http.StatusOK, Body: manifestBytes, }, @@ -190,7 +191,7 @@ func TestPull(t *testing.T) { objectStore := &memoryObjectStore{ mutex: new(sync.Mutex), manifestStorage: make(map[string]*registry.ImageManifest), - layerStorage: make(map[string]Layer), + layerStorage: make(map[digest.Digest]Layer), } err = Pull(client, objectStore, name, tag) diff --git a/client/objectstore.go b/client/objectstore.go index d8e2ac76..bee73ff0 100644 --- a/client/objectstore.go +++ b/client/objectstore.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/docker/docker-registry" + "github.com/docker/docker-registry/digest" ) var ( @@ -34,7 +35,7 @@ type ObjectStore interface { WriteManifest(name, tag string, manifest *registry.ImageManifest) error // Layer returns a handle to a layer for reading and writing - Layer(blobSum string) (Layer, error) + Layer(dgst digest.Digest) (Layer, error) } // Layer is a generic image layer interface. @@ -56,7 +57,7 @@ type Layer interface { type memoryObjectStore struct { mutex *sync.Mutex manifestStorage map[string]*registry.ImageManifest - layerStorage map[string]Layer + layerStorage map[digest.Digest]Layer } func (objStore *memoryObjectStore) Manifest(name, tag string) (*registry.ImageManifest, error) { @@ -78,14 +79,14 @@ func (objStore *memoryObjectStore) WriteManifest(name, tag string, manifest *reg return nil } -func (objStore *memoryObjectStore) Layer(blobSum string) (Layer, error) { +func (objStore *memoryObjectStore) Layer(dgst digest.Digest) (Layer, error) { objStore.mutex.Lock() defer objStore.mutex.Unlock() - layer, ok := objStore.layerStorage[blobSum] + layer, ok := objStore.layerStorage[dgst] if !ok { layer = &memoryLayer{cond: sync.NewCond(new(sync.Mutex))} - objStore.layerStorage[blobSum] = layer + objStore.layerStorage[dgst] = layer } return layer, nil diff --git a/test/test.go b/common/testutil/handler.go similarity index 99% rename from test/test.go rename to common/testutil/handler.go index 24a08f75..fa118cd1 100644 --- a/test/test.go +++ b/common/testutil/handler.go @@ -1,4 +1,4 @@ -package test +package testutil import ( "bytes" diff --git a/errors.go b/errors.go index bb59a5af..9a28e5b6 100644 --- a/errors.go +++ b/errors.go @@ -3,6 +3,8 @@ package registry import ( "fmt" "strings" + + "github.com/docker/docker-registry/digest" ) // ErrorCode represents the error type. The errors are serialized via strings @@ -228,7 +230,7 @@ func (e *ImageManifestNotFoundError) Error() string { // layer that does not exist in the registry. type BlobNotFoundError struct { Name string - Digest string + Digest digest.Digest } func (e *BlobNotFoundError) Error() string { diff --git a/images.go b/images.go index e30c6a5f..534069b2 100644 --- a/images.go +++ b/images.go @@ -4,6 +4,7 @@ import ( "encoding/json" "net/http" + "github.com/docker/docker-registry/digest" "github.com/gorilla/handlers" ) @@ -52,7 +53,7 @@ func (m *ImageManifest) UnmarshalJSON(b []byte) error { // FSLayer is a container struct for BlobSums defined in an image manifest type FSLayer struct { // BlobSum is the tarsum of the referenced filesystem image layer - BlobSum string `json:"blobSum"` + BlobSum digest.Digest `json:"blobSum"` } // ManifestHistory stores unstructured v1 compatibility information