Adds push/pull client functionality

These methods rely on an ObjectStore interface, which is meant to
approximate the storage behavior of the docker engine. This is very much
subject to change.
This commit is contained in:
Brian Bland 2014-11-13 18:42:49 -08:00
parent de4e976ef2
commit 0e1b1cc04e
6 changed files with 684 additions and 3 deletions

View file

@ -183,7 +183,7 @@ func (r *clientImpl) DeleteImage(name, tag string) error {
} }
func (r *clientImpl) ListImageTags(name string) ([]string, error) { func (r *clientImpl) ListImageTags(name string) ([]string, error) {
response, err := http.Get(fmt.Sprintf("%s/v2/%s/tags", r.Endpoint, name)) response, err := http.Get(fmt.Sprintf("%s/v2/%s/tags/list", r.Endpoint, name))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -264,7 +264,7 @@ func (r *clientImpl) GetImageLayer(name, tarsum string, byteOffset int) (io.Read
func (r *clientImpl) InitiateLayerUpload(name, tarsum string) (string, error) { func (r *clientImpl) InitiateLayerUpload(name, tarsum string) (string, error) {
postRequest, err := http.NewRequest("POST", postRequest, err := http.NewRequest("POST",
fmt.Sprintf("%s/v2/%s/layer/%s/upload", r.Endpoint, name, tarsum), nil) fmt.Sprintf("%s/v2/%s/layer/%s/upload/", r.Endpoint, name, tarsum), nil)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -329,7 +329,7 @@ func (r *clientImpl) UploadLayer(location string, layer io.ReadCloser, length in
return err return err
} }
queryValues := new(url.Values) queryValues := url.Values{}
queryValues.Set("length", fmt.Sprint(length)) queryValues.Set("length", fmt.Sprint(length))
queryValues.Set(checksum.HashAlgorithm, checksum.Sum) queryValues.Set(checksum.HashAlgorithm, checksum.Sum)
putRequest.URL.RawQuery = queryValues.Encode() putRequest.URL.RawQuery = queryValues.Encode()

238
client/client_test.go Normal file
View file

@ -0,0 +1,238 @@
package client
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"sync"
"testing"
"github.com/docker/docker-registry"
"github.com/docker/docker-registry/test"
)
type testLayer struct {
tarSum string
contents []byte
}
func TestPush(t *testing.T) {
name := "hello/world"
tag := "sometag"
testLayers := []testLayer{
{
tarSum: "12345",
contents: []byte("some contents"),
},
{
tarSum: "98765",
contents: []byte("some other contents"),
},
}
uploadLocations := make([]string, len(testLayers))
layers := make([]registry.FSLayer, len(testLayers))
history := make([]registry.ManifestHistory, len(testLayers))
for i, layer := range testLayers {
uploadLocations[i] = fmt.Sprintf("/v2/%s/layer/%s/upload-location-%d", name, layer.tarSum, i)
layers[i] = registry.FSLayer{BlobSum: layer.tarSum}
history[i] = registry.ManifestHistory{V1Compatibility: layer.tarSum}
}
manifest := &registry.ImageManifest{
Name: name,
Tag: tag,
Architecture: "x86",
FSLayers: layers,
History: history,
SchemaVersion: 1,
}
manifestBytes, err := json.Marshal(manifest)
layerRequestResponseMappings := make([]test.RequestResponseMapping, 2*len(testLayers))
for i, layer := range testLayers {
layerRequestResponseMappings[2*i] = test.RequestResponseMapping{
Request: test.Request{
Method: "POST",
Route: "/v2/" + name + "/layer/" + layer.tarSum + "/upload/",
},
Responses: []test.Response{
{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Location": {uploadLocations[i]},
}),
},
},
}
layerRequestResponseMappings[2*i+1] = test.RequestResponseMapping{
Request: test.Request{
Method: "PUT",
Route: uploadLocations[i],
Body: layer.contents,
},
Responses: []test.Response{
{
StatusCode: http.StatusCreated,
},
},
}
}
handler := test.NewHandler(append(layerRequestResponseMappings, test.RequestResponseMap{
test.RequestResponseMapping{
Request: test.Request{
Method: "PUT",
Route: "/v2/" + name + "/image/" + tag,
Body: manifestBytes,
},
Responses: []test.Response{
{
StatusCode: http.StatusOK,
},
},
},
}...))
server := httptest.NewServer(handler)
client := New(server.URL)
objectStore := &memoryObjectStore{
mutex: new(sync.Mutex),
manifestStorage: make(map[string]*registry.ImageManifest),
layerStorage: make(map[string]Layer),
}
for _, layer := range testLayers {
l, err := objectStore.Layer(layer.tarSum)
if err != nil {
t.Fatal(err)
}
writer, err := l.Writer()
if err != nil {
t.Fatal(err)
}
writer.Write(layer.contents)
writer.Close()
}
objectStore.WriteManifest(name, tag, manifest)
err = Push(client, objectStore, name, tag)
if err != nil {
t.Fatal(err)
}
}
func TestPull(t *testing.T) {
name := "hello/world"
tag := "sometag"
testLayers := []testLayer{
{
tarSum: "12345",
contents: []byte("some contents"),
},
{
tarSum: "98765",
contents: []byte("some other contents"),
},
}
layers := make([]registry.FSLayer, len(testLayers))
history := make([]registry.ManifestHistory, len(testLayers))
for i, layer := range testLayers {
layers[i] = registry.FSLayer{BlobSum: layer.tarSum}
history[i] = registry.ManifestHistory{V1Compatibility: layer.tarSum}
}
manifest := &registry.ImageManifest{
Name: name,
Tag: tag,
Architecture: "x86",
FSLayers: layers,
History: history,
SchemaVersion: 1,
}
manifestBytes, err := json.Marshal(manifest)
layerRequestResponseMappings := make([]test.RequestResponseMapping, len(testLayers))
for i, layer := range testLayers {
layerRequestResponseMappings[i] = test.RequestResponseMapping{
Request: test.Request{
Method: "GET",
Route: "/v2/" + name + "/layer/" + layer.tarSum,
},
Responses: []test.Response{
{
StatusCode: http.StatusOK,
Body: layer.contents,
},
},
}
}
handler := test.NewHandler(append(layerRequestResponseMappings, test.RequestResponseMap{
test.RequestResponseMapping{
Request: test.Request{
Method: "GET",
Route: "/v2/" + name + "/image/" + tag,
},
Responses: []test.Response{
{
StatusCode: http.StatusOK,
Body: manifestBytes,
},
},
},
}...))
server := httptest.NewServer(handler)
client := New(server.URL)
objectStore := &memoryObjectStore{
mutex: new(sync.Mutex),
manifestStorage: make(map[string]*registry.ImageManifest),
layerStorage: make(map[string]Layer),
}
err = Pull(client, objectStore, name, tag)
if err != nil {
t.Fatal(err)
}
m, err := objectStore.Manifest(name, tag)
if err != nil {
t.Fatal(err)
}
mBytes, err := json.Marshal(m)
if err != nil {
t.Fatal(err)
}
if string(mBytes) != string(manifestBytes) {
t.Fatal("Incorrect manifest")
}
for _, layer := range testLayers {
l, err := objectStore.Layer(layer.tarSum)
if err != nil {
t.Fatal(err)
}
reader, err := l.Reader()
if err != nil {
t.Fatal(err)
}
defer reader.Close()
layerBytes, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
if string(layerBytes) != string(layer.contents) {
t.Fatal("Incorrect layer")
}
}
}

158
client/objectstore.go Normal file
View file

@ -0,0 +1,158 @@
package client
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"sync"
"github.com/docker/docker-registry"
)
var (
// ErrLayerAlreadyExists is returned when attempting to create a layer with
// a tarsum that is already in use.
ErrLayerAlreadyExists = errors.New("Layer already exists")
// ErrLayerLocked is returned when attempting to write to a layer which is
// currently being written to.
ErrLayerLocked = errors.New("Layer locked")
)
// ObjectStore is an interface which is designed to approximate the docker
// engine storage. This interface is subject to change to conform to the
// future requirements of the engine.
type ObjectStore interface {
// Manifest retrieves the image manifest stored at the given repository name
// and tag
Manifest(name, tag string) (*registry.ImageManifest, error)
// WriteManifest stores an image manifest at the given repository name and
// tag
WriteManifest(name, tag string, manifest *registry.ImageManifest) error
// Layer returns a handle to a layer for reading and writing
Layer(blobSum string) (Layer, error)
}
// Layer is a generic image layer interface.
// A Layer may only be written to once
type Layer interface {
// Reader returns an io.ReadCloser which reads the contents of the layer
Reader() (io.ReadCloser, error)
// Writer returns an io.WriteCloser which may write the contents of the
// layer. This method may only be called once per Layer, and the contents
// are made available on Close
Writer() (io.WriteCloser, error)
// Wait blocks until the Layer can be read from
Wait() error
}
// memoryObjectStore is an in-memory implementation of the ObjectStore interface
type memoryObjectStore struct {
mutex *sync.Mutex
manifestStorage map[string]*registry.ImageManifest
layerStorage map[string]Layer
}
func (objStore *memoryObjectStore) Manifest(name, tag string) (*registry.ImageManifest, error) {
objStore.mutex.Lock()
defer objStore.mutex.Unlock()
manifest, ok := objStore.manifestStorage[name+":"+tag]
if !ok {
return nil, fmt.Errorf("No manifest found with Name: %q, Tag: %q", name, tag)
}
return manifest, nil
}
func (objStore *memoryObjectStore) WriteManifest(name, tag string, manifest *registry.ImageManifest) error {
objStore.mutex.Lock()
defer objStore.mutex.Unlock()
objStore.manifestStorage[name+":"+tag] = manifest
return nil
}
func (objStore *memoryObjectStore) Layer(blobSum string) (Layer, error) {
objStore.mutex.Lock()
defer objStore.mutex.Unlock()
layer, ok := objStore.layerStorage[blobSum]
if !ok {
layer = &memoryLayer{cond: sync.NewCond(new(sync.Mutex))}
objStore.layerStorage[blobSum] = layer
}
return layer, nil
}
type memoryLayer struct {
cond *sync.Cond
buffer *bytes.Buffer
written bool
}
func (ml *memoryLayer) Writer() (io.WriteCloser, error) {
ml.cond.L.Lock()
defer ml.cond.L.Unlock()
if ml.buffer != nil {
if !ml.written {
return nil, ErrLayerLocked
}
return nil, ErrLayerAlreadyExists
}
ml.buffer = new(bytes.Buffer)
return &memoryLayerWriter{cond: ml.cond, buffer: ml.buffer, done: &ml.written}, nil
}
func (ml *memoryLayer) Reader() (io.ReadCloser, error) {
ml.cond.L.Lock()
defer ml.cond.L.Unlock()
if ml.buffer == nil {
return nil, fmt.Errorf("Layer has not been written to yet")
}
if !ml.written {
return nil, ErrLayerLocked
}
return ioutil.NopCloser(bytes.NewReader(ml.buffer.Bytes())), nil
}
func (ml *memoryLayer) Wait() error {
ml.cond.L.Lock()
defer ml.cond.L.Unlock()
if ml.buffer == nil {
return fmt.Errorf("No writer to wait on")
}
for !ml.written {
ml.cond.Wait()
}
return nil
}
type memoryLayerWriter struct {
cond *sync.Cond
buffer *bytes.Buffer
done *bool
}
func (mlw *memoryLayerWriter) Write(p []byte) (int, error) {
return mlw.buffer.Write(p)
}
func (mlw *memoryLayerWriter) Close() error {
*mlw.done = true
mlw.cond.Broadcast()
return nil
}

93
client/pull.go Normal file
View file

@ -0,0 +1,93 @@
package client
import (
"fmt"
"io"
log "github.com/Sirupsen/logrus"
)
// Pull implements a client pull workflow for the image defined by the given
// name and tag pair, using the given ObjectStore for local manifest and layer
// storage
func Pull(c Client, objectStore ObjectStore, name, tag string) error {
manifest, err := c.GetImageManifest(name, tag)
if err != nil {
return err
}
log.WithField("manifest", manifest).Info("Pulled manifest")
if len(manifest.FSLayers) != len(manifest.History) {
return fmt.Errorf("Length of history not equal to number of layers")
}
if len(manifest.FSLayers) == 0 {
return fmt.Errorf("Image has no layers")
}
for _, fsLayer := range manifest.FSLayers {
layer, err := objectStore.Layer(fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to write local layer")
return err
}
writer, err := layer.Writer()
if err == ErrLayerAlreadyExists {
log.WithField("layer", fsLayer).Info("Layer already exists")
continue
}
if err == ErrLayerLocked {
log.WithField("layer", fsLayer).Info("Layer download in progress, waiting")
layer.Wait()
continue
}
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to write local layer")
return err
}
defer writer.Close()
layerReader, length, err := c.GetImageLayer(name, fsLayer.BlobSum, 0)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to download layer")
return err
}
defer layerReader.Close()
copied, err := io.Copy(writer, layerReader)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to download layer")
return err
}
if copied != int64(length) {
log.WithFields(log.Fields{
"expected": length,
"written": copied,
"layer": fsLayer,
}).Warn("Wrote incorrect number of bytes for layer")
}
}
err = objectStore.WriteManifest(name, tag, manifest)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"manifest": manifest,
}).Warn("Unable to write image manifest")
return err
}
return nil
}

95
client/push.go Normal file
View file

@ -0,0 +1,95 @@
package client
import (
"bytes"
"crypto/sha1"
"io"
"io/ioutil"
"github.com/docker/docker-registry"
log "github.com/Sirupsen/logrus"
)
// Push implements a client push workflow for the image defined by the given
// name and tag pair, using the given ObjectStore for local manifest and layer
// storage
func Push(c Client, objectStore ObjectStore, name, tag string) error {
manifest, err := objectStore.Manifest(name, tag)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"name": name,
"tag": tag,
}).Info("No image found")
return err
}
for _, fsLayer := range manifest.FSLayers {
layer, err := objectStore.Layer(fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
layerReader, err := layer.Reader()
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
location, err := c.InitiateLayerUpload(name, fsLayer.BlobSum)
if _, ok := err.(*registry.LayerAlreadyExistsError); ok {
log.WithField("layer", fsLayer).Info("Layer already exists")
continue
}
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
layerBuffer := new(bytes.Buffer)
checksum := sha1.New()
teeReader := io.TeeReader(layerReader, checksum)
_, err = io.Copy(layerBuffer, teeReader)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
err = c.UploadLayer(location, ioutil.NopCloser(layerBuffer), layerBuffer.Len(),
&registry.Checksum{HashAlgorithm: "sha1", Sum: string(checksum.Sum(nil))},
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
}
err = c.PutImageManifest(name, tag, manifest)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"manifest": manifest,
}).Warn("Unable to upload manifest")
return err
}
return nil
}

97
test/test.go Normal file
View file

@ -0,0 +1,97 @@
package test
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
)
// RequestResponseMap is a mapping from Requests to Responses
type RequestResponseMap []RequestResponseMapping
// RequestResponseMapping defines an ordered list of Responses to be sent in
// response to a given Request
type RequestResponseMapping struct {
Request Request
Responses []Response
}
// TODO(bbland): add support for request headers
// Request is a simplified http.Request object
type Request struct {
// Method is the http method of the request, for example GET
Method string
// Route is the http route of this request
Route string
// Body is the byte contents of the http request
Body []byte
}
func (r Request) String() string {
return fmt.Sprintf("%s %s\n%s", r.Method, r.Route, r.Body)
}
// Response is a simplified http.Response object
type Response struct {
// Statuscode is the http status code of the Response
StatusCode int
// Headers are the http headers of this Response
Headers http.Header
// Body is the response body
Body []byte
}
// testHandler is an http.Handler with a defined mapping from Request to an
// ordered list of Response objects
type testHandler struct {
responseMap map[string][]Response
}
// NewHandler returns a new test handler that responds to defined requests
// with specified responses
// Each time a Request is received, the next Response is returned in the
// mapping, until no Responses are defined, at which point a 404 is sent back
func NewHandler(requestResponseMap RequestResponseMap) http.Handler {
responseMap := make(map[string][]Response)
for _, mapping := range requestResponseMap {
responseMap[mapping.Request.String()] = mapping.Responses
}
return &testHandler{responseMap: responseMap}
}
func (app *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
requestBody, _ := ioutil.ReadAll(r.Body)
request := Request{
Method: r.Method,
Route: r.URL.Path,
Body: requestBody,
}
responses, ok := app.responseMap[request.String()]
if !ok || len(responses) == 0 {
http.NotFound(w, r)
return
}
response := responses[0]
app.responseMap[request.String()] = responses[1:]
responseHeader := w.Header()
for k, v := range response.Headers {
responseHeader[k] = v
}
w.WriteHeader(response.StatusCode)
io.Copy(w, bytes.NewReader(response.Body))
}