From 0e1b1cc04e9f97de88995a601688fb9f2f8e21a2 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Thu, 13 Nov 2014 18:42:49 -0800 Subject: [PATCH] Adds push/pull client functionality These methods rely on an ObjectStore interface, which is meant to approximate the storage behavior of the docker engine. This is very much subject to change. --- client/client.go | 6 +- client/client_test.go | 238 ++++++++++++++++++++++++++++++++++++++++++ client/objectstore.go | 158 ++++++++++++++++++++++++++++ client/pull.go | 93 +++++++++++++++++ client/push.go | 95 +++++++++++++++++ test/test.go | 97 +++++++++++++++++ 6 files changed, 684 insertions(+), 3 deletions(-) create mode 100644 client/client_test.go create mode 100644 client/objectstore.go create mode 100644 client/pull.go create mode 100644 client/push.go create mode 100644 test/test.go diff --git a/client/client.go b/client/client.go index c41586951..2ea0e091c 100644 --- a/client/client.go +++ b/client/client.go @@ -183,7 +183,7 @@ func (r *clientImpl) DeleteImage(name, tag string) error { } func (r *clientImpl) ListImageTags(name string) ([]string, error) { - response, err := http.Get(fmt.Sprintf("%s/v2/%s/tags", r.Endpoint, name)) + response, err := http.Get(fmt.Sprintf("%s/v2/%s/tags/list", r.Endpoint, name)) if err != nil { return nil, err } @@ -264,7 +264,7 @@ func (r *clientImpl) GetImageLayer(name, tarsum string, byteOffset int) (io.Read func (r *clientImpl) InitiateLayerUpload(name, tarsum string) (string, error) { postRequest, err := http.NewRequest("POST", - fmt.Sprintf("%s/v2/%s/layer/%s/upload", r.Endpoint, name, tarsum), nil) + fmt.Sprintf("%s/v2/%s/layer/%s/upload/", r.Endpoint, name, tarsum), nil) if err != nil { return "", err } @@ -329,7 +329,7 @@ func (r *clientImpl) UploadLayer(location string, layer io.ReadCloser, length in return err } - queryValues := new(url.Values) + queryValues := url.Values{} queryValues.Set("length", fmt.Sprint(length)) queryValues.Set(checksum.HashAlgorithm, checksum.Sum) putRequest.URL.RawQuery = queryValues.Encode() diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 000000000..e900463ae --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,238 @@ +package client + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "sync" + "testing" + + "github.com/docker/docker-registry" + "github.com/docker/docker-registry/test" +) + +type testLayer struct { + tarSum string + contents []byte +} + +func TestPush(t *testing.T) { + name := "hello/world" + tag := "sometag" + testLayers := []testLayer{ + { + tarSum: "12345", + contents: []byte("some contents"), + }, + { + tarSum: "98765", + contents: []byte("some other contents"), + }, + } + uploadLocations := make([]string, len(testLayers)) + layers := make([]registry.FSLayer, len(testLayers)) + history := make([]registry.ManifestHistory, len(testLayers)) + + for i, layer := range testLayers { + uploadLocations[i] = fmt.Sprintf("/v2/%s/layer/%s/upload-location-%d", name, layer.tarSum, i) + layers[i] = registry.FSLayer{BlobSum: layer.tarSum} + history[i] = registry.ManifestHistory{V1Compatibility: layer.tarSum} + } + + manifest := ®istry.ImageManifest{ + Name: name, + Tag: tag, + Architecture: "x86", + FSLayers: layers, + History: history, + SchemaVersion: 1, + } + manifestBytes, err := json.Marshal(manifest) + + layerRequestResponseMappings := make([]test.RequestResponseMapping, 2*len(testLayers)) + for i, layer := range testLayers { + layerRequestResponseMappings[2*i] = test.RequestResponseMapping{ + Request: test.Request{ + Method: "POST", + Route: "/v2/" + name + "/layer/" + layer.tarSum + "/upload/", + }, + Responses: []test.Response{ + { + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Location": {uploadLocations[i]}, + }), + }, + }, + } + layerRequestResponseMappings[2*i+1] = test.RequestResponseMapping{ + Request: test.Request{ + Method: "PUT", + Route: uploadLocations[i], + Body: layer.contents, + }, + Responses: []test.Response{ + { + StatusCode: http.StatusCreated, + }, + }, + } + } + + handler := test.NewHandler(append(layerRequestResponseMappings, test.RequestResponseMap{ + test.RequestResponseMapping{ + Request: test.Request{ + Method: "PUT", + Route: "/v2/" + name + "/image/" + tag, + Body: manifestBytes, + }, + Responses: []test.Response{ + { + StatusCode: http.StatusOK, + }, + }, + }, + }...)) + server := httptest.NewServer(handler) + client := New(server.URL) + objectStore := &memoryObjectStore{ + mutex: new(sync.Mutex), + manifestStorage: make(map[string]*registry.ImageManifest), + layerStorage: make(map[string]Layer), + } + + for _, layer := range testLayers { + l, err := objectStore.Layer(layer.tarSum) + if err != nil { + t.Fatal(err) + } + + writer, err := l.Writer() + if err != nil { + t.Fatal(err) + } + + writer.Write(layer.contents) + writer.Close() + } + + objectStore.WriteManifest(name, tag, manifest) + + err = Push(client, objectStore, name, tag) + if err != nil { + t.Fatal(err) + } +} + +func TestPull(t *testing.T) { + name := "hello/world" + tag := "sometag" + testLayers := []testLayer{ + { + tarSum: "12345", + contents: []byte("some contents"), + }, + { + tarSum: "98765", + contents: []byte("some other contents"), + }, + } + layers := make([]registry.FSLayer, len(testLayers)) + history := make([]registry.ManifestHistory, len(testLayers)) + + for i, layer := range testLayers { + layers[i] = registry.FSLayer{BlobSum: layer.tarSum} + history[i] = registry.ManifestHistory{V1Compatibility: layer.tarSum} + } + + manifest := ®istry.ImageManifest{ + Name: name, + Tag: tag, + Architecture: "x86", + FSLayers: layers, + History: history, + SchemaVersion: 1, + } + manifestBytes, err := json.Marshal(manifest) + + layerRequestResponseMappings := make([]test.RequestResponseMapping, len(testLayers)) + for i, layer := range testLayers { + layerRequestResponseMappings[i] = test.RequestResponseMapping{ + Request: test.Request{ + Method: "GET", + Route: "/v2/" + name + "/layer/" + layer.tarSum, + }, + Responses: []test.Response{ + { + StatusCode: http.StatusOK, + Body: layer.contents, + }, + }, + } + } + + handler := test.NewHandler(append(layerRequestResponseMappings, test.RequestResponseMap{ + test.RequestResponseMapping{ + Request: test.Request{ + Method: "GET", + Route: "/v2/" + name + "/image/" + tag, + }, + Responses: []test.Response{ + { + StatusCode: http.StatusOK, + Body: manifestBytes, + }, + }, + }, + }...)) + server := httptest.NewServer(handler) + client := New(server.URL) + objectStore := &memoryObjectStore{ + mutex: new(sync.Mutex), + manifestStorage: make(map[string]*registry.ImageManifest), + layerStorage: make(map[string]Layer), + } + + err = Pull(client, objectStore, name, tag) + if err != nil { + t.Fatal(err) + } + + m, err := objectStore.Manifest(name, tag) + if err != nil { + t.Fatal(err) + } + + mBytes, err := json.Marshal(m) + if err != nil { + t.Fatal(err) + } + + if string(mBytes) != string(manifestBytes) { + t.Fatal("Incorrect manifest") + } + + for _, layer := range testLayers { + l, err := objectStore.Layer(layer.tarSum) + if err != nil { + t.Fatal(err) + } + + reader, err := l.Reader() + if err != nil { + t.Fatal(err) + } + defer reader.Close() + + layerBytes, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + + if string(layerBytes) != string(layer.contents) { + t.Fatal("Incorrect layer") + } + } +} diff --git a/client/objectstore.go b/client/objectstore.go new file mode 100644 index 000000000..d8e2ac763 --- /dev/null +++ b/client/objectstore.go @@ -0,0 +1,158 @@ +package client + +import ( + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "sync" + + "github.com/docker/docker-registry" +) + +var ( + // ErrLayerAlreadyExists is returned when attempting to create a layer with + // a tarsum that is already in use. + ErrLayerAlreadyExists = errors.New("Layer already exists") + + // ErrLayerLocked is returned when attempting to write to a layer which is + // currently being written to. + ErrLayerLocked = errors.New("Layer locked") +) + +// ObjectStore is an interface which is designed to approximate the docker +// engine storage. This interface is subject to change to conform to the +// future requirements of the engine. +type ObjectStore interface { + // Manifest retrieves the image manifest stored at the given repository name + // and tag + Manifest(name, tag string) (*registry.ImageManifest, error) + + // WriteManifest stores an image manifest at the given repository name and + // tag + WriteManifest(name, tag string, manifest *registry.ImageManifest) error + + // Layer returns a handle to a layer for reading and writing + Layer(blobSum string) (Layer, error) +} + +// Layer is a generic image layer interface. +// A Layer may only be written to once +type Layer interface { + // Reader returns an io.ReadCloser which reads the contents of the layer + Reader() (io.ReadCloser, error) + + // Writer returns an io.WriteCloser which may write the contents of the + // layer. This method may only be called once per Layer, and the contents + // are made available on Close + Writer() (io.WriteCloser, error) + + // Wait blocks until the Layer can be read from + Wait() error +} + +// memoryObjectStore is an in-memory implementation of the ObjectStore interface +type memoryObjectStore struct { + mutex *sync.Mutex + manifestStorage map[string]*registry.ImageManifest + layerStorage map[string]Layer +} + +func (objStore *memoryObjectStore) Manifest(name, tag string) (*registry.ImageManifest, error) { + objStore.mutex.Lock() + defer objStore.mutex.Unlock() + + manifest, ok := objStore.manifestStorage[name+":"+tag] + if !ok { + return nil, fmt.Errorf("No manifest found with Name: %q, Tag: %q", name, tag) + } + return manifest, nil +} + +func (objStore *memoryObjectStore) WriteManifest(name, tag string, manifest *registry.ImageManifest) error { + objStore.mutex.Lock() + defer objStore.mutex.Unlock() + + objStore.manifestStorage[name+":"+tag] = manifest + return nil +} + +func (objStore *memoryObjectStore) Layer(blobSum string) (Layer, error) { + objStore.mutex.Lock() + defer objStore.mutex.Unlock() + + layer, ok := objStore.layerStorage[blobSum] + if !ok { + layer = &memoryLayer{cond: sync.NewCond(new(sync.Mutex))} + objStore.layerStorage[blobSum] = layer + } + + return layer, nil +} + +type memoryLayer struct { + cond *sync.Cond + buffer *bytes.Buffer + written bool +} + +func (ml *memoryLayer) Writer() (io.WriteCloser, error) { + ml.cond.L.Lock() + defer ml.cond.L.Unlock() + + if ml.buffer != nil { + if !ml.written { + return nil, ErrLayerLocked + } + return nil, ErrLayerAlreadyExists + } + + ml.buffer = new(bytes.Buffer) + return &memoryLayerWriter{cond: ml.cond, buffer: ml.buffer, done: &ml.written}, nil +} + +func (ml *memoryLayer) Reader() (io.ReadCloser, error) { + ml.cond.L.Lock() + defer ml.cond.L.Unlock() + + if ml.buffer == nil { + return nil, fmt.Errorf("Layer has not been written to yet") + } + if !ml.written { + return nil, ErrLayerLocked + } + + return ioutil.NopCloser(bytes.NewReader(ml.buffer.Bytes())), nil +} + +func (ml *memoryLayer) Wait() error { + ml.cond.L.Lock() + defer ml.cond.L.Unlock() + + if ml.buffer == nil { + return fmt.Errorf("No writer to wait on") + } + + for !ml.written { + ml.cond.Wait() + } + + return nil +} + +type memoryLayerWriter struct { + cond *sync.Cond + buffer *bytes.Buffer + done *bool +} + +func (mlw *memoryLayerWriter) Write(p []byte) (int, error) { + return mlw.buffer.Write(p) +} + +func (mlw *memoryLayerWriter) Close() error { + *mlw.done = true + mlw.cond.Broadcast() + return nil +} diff --git a/client/pull.go b/client/pull.go new file mode 100644 index 000000000..91c7283a8 --- /dev/null +++ b/client/pull.go @@ -0,0 +1,93 @@ +package client + +import ( + "fmt" + "io" + + log "github.com/Sirupsen/logrus" +) + +// Pull implements a client pull workflow for the image defined by the given +// name and tag pair, using the given ObjectStore for local manifest and layer +// storage +func Pull(c Client, objectStore ObjectStore, name, tag string) error { + manifest, err := c.GetImageManifest(name, tag) + if err != nil { + return err + } + log.WithField("manifest", manifest).Info("Pulled manifest") + + if len(manifest.FSLayers) != len(manifest.History) { + return fmt.Errorf("Length of history not equal to number of layers") + } + if len(manifest.FSLayers) == 0 { + return fmt.Errorf("Image has no layers") + } + + for _, fsLayer := range manifest.FSLayers { + layer, err := objectStore.Layer(fsLayer.BlobSum) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to write local layer") + return err + } + + writer, err := layer.Writer() + if err == ErrLayerAlreadyExists { + log.WithField("layer", fsLayer).Info("Layer already exists") + continue + } + if err == ErrLayerLocked { + log.WithField("layer", fsLayer).Info("Layer download in progress, waiting") + layer.Wait() + continue + } + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to write local layer") + return err + } + defer writer.Close() + + layerReader, length, err := c.GetImageLayer(name, fsLayer.BlobSum, 0) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to download layer") + return err + } + defer layerReader.Close() + + copied, err := io.Copy(writer, layerReader) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to download layer") + return err + } + if copied != int64(length) { + log.WithFields(log.Fields{ + "expected": length, + "written": copied, + "layer": fsLayer, + }).Warn("Wrote incorrect number of bytes for layer") + } + } + + err = objectStore.WriteManifest(name, tag, manifest) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "manifest": manifest, + }).Warn("Unable to write image manifest") + return err + } + + return nil +} diff --git a/client/push.go b/client/push.go new file mode 100644 index 000000000..4b9634e09 --- /dev/null +++ b/client/push.go @@ -0,0 +1,95 @@ +package client + +import ( + "bytes" + "crypto/sha1" + "io" + "io/ioutil" + + "github.com/docker/docker-registry" + + log "github.com/Sirupsen/logrus" +) + +// Push implements a client push workflow for the image defined by the given +// name and tag pair, using the given ObjectStore for local manifest and layer +// storage +func Push(c Client, objectStore ObjectStore, name, tag string) error { + manifest, err := objectStore.Manifest(name, tag) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "name": name, + "tag": tag, + }).Info("No image found") + return err + } + + for _, fsLayer := range manifest.FSLayers { + layer, err := objectStore.Layer(fsLayer.BlobSum) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to read local layer") + return err + } + + layerReader, err := layer.Reader() + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to read local layer") + return err + } + + location, err := c.InitiateLayerUpload(name, fsLayer.BlobSum) + if _, ok := err.(*registry.LayerAlreadyExistsError); ok { + log.WithField("layer", fsLayer).Info("Layer already exists") + continue + } + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to upload layer") + return err + } + + layerBuffer := new(bytes.Buffer) + checksum := sha1.New() + teeReader := io.TeeReader(layerReader, checksum) + + _, err = io.Copy(layerBuffer, teeReader) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to read local layer") + return err + } + + err = c.UploadLayer(location, ioutil.NopCloser(layerBuffer), layerBuffer.Len(), + ®istry.Checksum{HashAlgorithm: "sha1", Sum: string(checksum.Sum(nil))}, + ) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to upload layer") + return err + } + } + + err = c.PutImageManifest(name, tag, manifest) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "manifest": manifest, + }).Warn("Unable to upload manifest") + return err + } + + return nil +} diff --git a/test/test.go b/test/test.go new file mode 100644 index 000000000..715888192 --- /dev/null +++ b/test/test.go @@ -0,0 +1,97 @@ +package test + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" +) + +// RequestResponseMap is a mapping from Requests to Responses +type RequestResponseMap []RequestResponseMapping + +// RequestResponseMapping defines an ordered list of Responses to be sent in +// response to a given Request +type RequestResponseMapping struct { + Request Request + Responses []Response +} + +// TODO(bbland): add support for request headers + +// Request is a simplified http.Request object +type Request struct { + // Method is the http method of the request, for example GET + Method string + + // Route is the http route of this request + Route string + + // Body is the byte contents of the http request + Body []byte +} + +func (r Request) String() string { + return fmt.Sprintf("%s %s\n%s", r.Method, r.Route, r.Body) +} + +// Response is a simplified http.Response object +type Response struct { + // Statuscode is the http status code of the Response + StatusCode int + + // Headers are the http headers of this Response + Headers http.Header + + // Body is the response body + Body []byte +} + +// testHandler is an http.Handler with a defined mapping from Request to an +// ordered list of Response objects +type testHandler struct { + responseMap map[string][]Response +} + +// NewHandler returns a new test handler that responds to defined requests +// with specified responses +// Each time a Request is received, the next Response is returned in the +// mapping, until no Responses are defined, at which point a 404 is sent back +func NewHandler(requestResponseMap RequestResponseMap) http.Handler { + responseMap := make(map[string][]Response) + for _, mapping := range requestResponseMap { + responseMap[mapping.Request.String()] = mapping.Responses + } + return &testHandler{responseMap: responseMap} +} + +func (app *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + requestBody, _ := ioutil.ReadAll(r.Body) + request := Request{ + Method: r.Method, + Route: r.URL.Path, + Body: requestBody, + } + + responses, ok := app.responseMap[request.String()] + + if !ok || len(responses) == 0 { + http.NotFound(w, r) + return + } + + response := responses[0] + app.responseMap[request.String()] = responses[1:] + + responseHeader := w.Header() + for k, v := range response.Headers { + responseHeader[k] = v + } + + w.WriteHeader(response.StatusCode) + + io.Copy(w, bytes.NewReader(response.Body)) +}