Merge pull request #740 from BrianBland/ng-push-pull
WIP: Adds push/pull client functionality
This commit is contained in:
commit
a2d232aaec
6 changed files with 684 additions and 3 deletions
|
@ -183,7 +183,7 @@ func (r *clientImpl) DeleteImage(name, tag string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *clientImpl) ListImageTags(name string) ([]string, error) {
|
func (r *clientImpl) ListImageTags(name string) ([]string, error) {
|
||||||
response, err := http.Get(fmt.Sprintf("%s/v2/%s/tags", r.Endpoint, name))
|
response, err := http.Get(fmt.Sprintf("%s/v2/%s/tags/list", r.Endpoint, name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -264,7 +264,7 @@ func (r *clientImpl) GetImageLayer(name, tarsum string, byteOffset int) (io.Read
|
||||||
|
|
||||||
func (r *clientImpl) InitiateLayerUpload(name, tarsum string) (string, error) {
|
func (r *clientImpl) InitiateLayerUpload(name, tarsum string) (string, error) {
|
||||||
postRequest, err := http.NewRequest("POST",
|
postRequest, err := http.NewRequest("POST",
|
||||||
fmt.Sprintf("%s/v2/%s/layer/%s/upload", r.Endpoint, name, tarsum), nil)
|
fmt.Sprintf("%s/v2/%s/layer/%s/upload/", r.Endpoint, name, tarsum), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -329,7 +329,7 @@ func (r *clientImpl) UploadLayer(location string, layer io.ReadCloser, length in
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
queryValues := new(url.Values)
|
queryValues := url.Values{}
|
||||||
queryValues.Set("length", fmt.Sprint(length))
|
queryValues.Set("length", fmt.Sprint(length))
|
||||||
queryValues.Set(checksum.HashAlgorithm, checksum.Sum)
|
queryValues.Set(checksum.HashAlgorithm, checksum.Sum)
|
||||||
putRequest.URL.RawQuery = queryValues.Encode()
|
putRequest.URL.RawQuery = queryValues.Encode()
|
||||||
|
|
238
client/client_test.go
Normal file
238
client/client_test.go
Normal file
|
@ -0,0 +1,238 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry"
|
||||||
|
"github.com/docker/docker-registry/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testLayer struct {
|
||||||
|
tarSum string
|
||||||
|
contents []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPush(t *testing.T) {
|
||||||
|
name := "hello/world"
|
||||||
|
tag := "sometag"
|
||||||
|
testLayers := []testLayer{
|
||||||
|
{
|
||||||
|
tarSum: "12345",
|
||||||
|
contents: []byte("some contents"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
tarSum: "98765",
|
||||||
|
contents: []byte("some other contents"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
uploadLocations := make([]string, len(testLayers))
|
||||||
|
layers := make([]registry.FSLayer, len(testLayers))
|
||||||
|
history := make([]registry.ManifestHistory, len(testLayers))
|
||||||
|
|
||||||
|
for i, layer := range testLayers {
|
||||||
|
uploadLocations[i] = fmt.Sprintf("/v2/%s/layer/%s/upload-location-%d", name, layer.tarSum, i)
|
||||||
|
layers[i] = registry.FSLayer{BlobSum: layer.tarSum}
|
||||||
|
history[i] = registry.ManifestHistory{V1Compatibility: layer.tarSum}
|
||||||
|
}
|
||||||
|
|
||||||
|
manifest := ®istry.ImageManifest{
|
||||||
|
Name: name,
|
||||||
|
Tag: tag,
|
||||||
|
Architecture: "x86",
|
||||||
|
FSLayers: layers,
|
||||||
|
History: history,
|
||||||
|
SchemaVersion: 1,
|
||||||
|
}
|
||||||
|
manifestBytes, err := json.Marshal(manifest)
|
||||||
|
|
||||||
|
layerRequestResponseMappings := make([]test.RequestResponseMapping, 2*len(testLayers))
|
||||||
|
for i, layer := range testLayers {
|
||||||
|
layerRequestResponseMappings[2*i] = test.RequestResponseMapping{
|
||||||
|
Request: test.Request{
|
||||||
|
Method: "POST",
|
||||||
|
Route: "/v2/" + name + "/layer/" + layer.tarSum + "/upload/",
|
||||||
|
},
|
||||||
|
Responses: []test.Response{
|
||||||
|
{
|
||||||
|
StatusCode: http.StatusAccepted,
|
||||||
|
Headers: http.Header(map[string][]string{
|
||||||
|
"Location": {uploadLocations[i]},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
layerRequestResponseMappings[2*i+1] = test.RequestResponseMapping{
|
||||||
|
Request: test.Request{
|
||||||
|
Method: "PUT",
|
||||||
|
Route: uploadLocations[i],
|
||||||
|
Body: layer.contents,
|
||||||
|
},
|
||||||
|
Responses: []test.Response{
|
||||||
|
{
|
||||||
|
StatusCode: http.StatusCreated,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
handler := test.NewHandler(append(layerRequestResponseMappings, test.RequestResponseMap{
|
||||||
|
test.RequestResponseMapping{
|
||||||
|
Request: test.Request{
|
||||||
|
Method: "PUT",
|
||||||
|
Route: "/v2/" + name + "/image/" + tag,
|
||||||
|
Body: manifestBytes,
|
||||||
|
},
|
||||||
|
Responses: []test.Response{
|
||||||
|
{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}...))
|
||||||
|
server := httptest.NewServer(handler)
|
||||||
|
client := New(server.URL)
|
||||||
|
objectStore := &memoryObjectStore{
|
||||||
|
mutex: new(sync.Mutex),
|
||||||
|
manifestStorage: make(map[string]*registry.ImageManifest),
|
||||||
|
layerStorage: make(map[string]Layer),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, layer := range testLayers {
|
||||||
|
l, err := objectStore.Layer(layer.tarSum)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
writer, err := l.Writer()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.Write(layer.contents)
|
||||||
|
writer.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
objectStore.WriteManifest(name, tag, manifest)
|
||||||
|
|
||||||
|
err = Push(client, objectStore, name, tag)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPull(t *testing.T) {
|
||||||
|
name := "hello/world"
|
||||||
|
tag := "sometag"
|
||||||
|
testLayers := []testLayer{
|
||||||
|
{
|
||||||
|
tarSum: "12345",
|
||||||
|
contents: []byte("some contents"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
tarSum: "98765",
|
||||||
|
contents: []byte("some other contents"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
layers := make([]registry.FSLayer, len(testLayers))
|
||||||
|
history := make([]registry.ManifestHistory, len(testLayers))
|
||||||
|
|
||||||
|
for i, layer := range testLayers {
|
||||||
|
layers[i] = registry.FSLayer{BlobSum: layer.tarSum}
|
||||||
|
history[i] = registry.ManifestHistory{V1Compatibility: layer.tarSum}
|
||||||
|
}
|
||||||
|
|
||||||
|
manifest := ®istry.ImageManifest{
|
||||||
|
Name: name,
|
||||||
|
Tag: tag,
|
||||||
|
Architecture: "x86",
|
||||||
|
FSLayers: layers,
|
||||||
|
History: history,
|
||||||
|
SchemaVersion: 1,
|
||||||
|
}
|
||||||
|
manifestBytes, err := json.Marshal(manifest)
|
||||||
|
|
||||||
|
layerRequestResponseMappings := make([]test.RequestResponseMapping, len(testLayers))
|
||||||
|
for i, layer := range testLayers {
|
||||||
|
layerRequestResponseMappings[i] = test.RequestResponseMapping{
|
||||||
|
Request: test.Request{
|
||||||
|
Method: "GET",
|
||||||
|
Route: "/v2/" + name + "/layer/" + layer.tarSum,
|
||||||
|
},
|
||||||
|
Responses: []test.Response{
|
||||||
|
{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Body: layer.contents,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
handler := test.NewHandler(append(layerRequestResponseMappings, test.RequestResponseMap{
|
||||||
|
test.RequestResponseMapping{
|
||||||
|
Request: test.Request{
|
||||||
|
Method: "GET",
|
||||||
|
Route: "/v2/" + name + "/image/" + tag,
|
||||||
|
},
|
||||||
|
Responses: []test.Response{
|
||||||
|
{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Body: manifestBytes,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}...))
|
||||||
|
server := httptest.NewServer(handler)
|
||||||
|
client := New(server.URL)
|
||||||
|
objectStore := &memoryObjectStore{
|
||||||
|
mutex: new(sync.Mutex),
|
||||||
|
manifestStorage: make(map[string]*registry.ImageManifest),
|
||||||
|
layerStorage: make(map[string]Layer),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = Pull(client, objectStore, name, tag)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
m, err := objectStore.Manifest(name, tag)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mBytes, err := json.Marshal(m)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(mBytes) != string(manifestBytes) {
|
||||||
|
t.Fatal("Incorrect manifest")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, layer := range testLayers {
|
||||||
|
l, err := objectStore.Layer(layer.tarSum)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := l.Reader()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
layerBytes, err := ioutil.ReadAll(reader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(layerBytes) != string(layer.contents) {
|
||||||
|
t.Fatal("Incorrect layer")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
158
client/objectstore.go
Normal file
158
client/objectstore.go
Normal file
|
@ -0,0 +1,158 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrLayerAlreadyExists is returned when attempting to create a layer with
|
||||||
|
// a tarsum that is already in use.
|
||||||
|
ErrLayerAlreadyExists = errors.New("Layer already exists")
|
||||||
|
|
||||||
|
// ErrLayerLocked is returned when attempting to write to a layer which is
|
||||||
|
// currently being written to.
|
||||||
|
ErrLayerLocked = errors.New("Layer locked")
|
||||||
|
)
|
||||||
|
|
||||||
|
// ObjectStore is an interface which is designed to approximate the docker
|
||||||
|
// engine storage. This interface is subject to change to conform to the
|
||||||
|
// future requirements of the engine.
|
||||||
|
type ObjectStore interface {
|
||||||
|
// Manifest retrieves the image manifest stored at the given repository name
|
||||||
|
// and tag
|
||||||
|
Manifest(name, tag string) (*registry.ImageManifest, error)
|
||||||
|
|
||||||
|
// WriteManifest stores an image manifest at the given repository name and
|
||||||
|
// tag
|
||||||
|
WriteManifest(name, tag string, manifest *registry.ImageManifest) error
|
||||||
|
|
||||||
|
// Layer returns a handle to a layer for reading and writing
|
||||||
|
Layer(blobSum string) (Layer, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Layer is a generic image layer interface.
|
||||||
|
// A Layer may only be written to once
|
||||||
|
type Layer interface {
|
||||||
|
// Reader returns an io.ReadCloser which reads the contents of the layer
|
||||||
|
Reader() (io.ReadCloser, error)
|
||||||
|
|
||||||
|
// Writer returns an io.WriteCloser which may write the contents of the
|
||||||
|
// layer. This method may only be called once per Layer, and the contents
|
||||||
|
// are made available on Close
|
||||||
|
Writer() (io.WriteCloser, error)
|
||||||
|
|
||||||
|
// Wait blocks until the Layer can be read from
|
||||||
|
Wait() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// memoryObjectStore is an in-memory implementation of the ObjectStore interface
|
||||||
|
type memoryObjectStore struct {
|
||||||
|
mutex *sync.Mutex
|
||||||
|
manifestStorage map[string]*registry.ImageManifest
|
||||||
|
layerStorage map[string]Layer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (objStore *memoryObjectStore) Manifest(name, tag string) (*registry.ImageManifest, error) {
|
||||||
|
objStore.mutex.Lock()
|
||||||
|
defer objStore.mutex.Unlock()
|
||||||
|
|
||||||
|
manifest, ok := objStore.manifestStorage[name+":"+tag]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("No manifest found with Name: %q, Tag: %q", name, tag)
|
||||||
|
}
|
||||||
|
return manifest, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (objStore *memoryObjectStore) WriteManifest(name, tag string, manifest *registry.ImageManifest) error {
|
||||||
|
objStore.mutex.Lock()
|
||||||
|
defer objStore.mutex.Unlock()
|
||||||
|
|
||||||
|
objStore.manifestStorage[name+":"+tag] = manifest
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (objStore *memoryObjectStore) Layer(blobSum string) (Layer, error) {
|
||||||
|
objStore.mutex.Lock()
|
||||||
|
defer objStore.mutex.Unlock()
|
||||||
|
|
||||||
|
layer, ok := objStore.layerStorage[blobSum]
|
||||||
|
if !ok {
|
||||||
|
layer = &memoryLayer{cond: sync.NewCond(new(sync.Mutex))}
|
||||||
|
objStore.layerStorage[blobSum] = layer
|
||||||
|
}
|
||||||
|
|
||||||
|
return layer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type memoryLayer struct {
|
||||||
|
cond *sync.Cond
|
||||||
|
buffer *bytes.Buffer
|
||||||
|
written bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ml *memoryLayer) Writer() (io.WriteCloser, error) {
|
||||||
|
ml.cond.L.Lock()
|
||||||
|
defer ml.cond.L.Unlock()
|
||||||
|
|
||||||
|
if ml.buffer != nil {
|
||||||
|
if !ml.written {
|
||||||
|
return nil, ErrLayerLocked
|
||||||
|
}
|
||||||
|
return nil, ErrLayerAlreadyExists
|
||||||
|
}
|
||||||
|
|
||||||
|
ml.buffer = new(bytes.Buffer)
|
||||||
|
return &memoryLayerWriter{cond: ml.cond, buffer: ml.buffer, done: &ml.written}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ml *memoryLayer) Reader() (io.ReadCloser, error) {
|
||||||
|
ml.cond.L.Lock()
|
||||||
|
defer ml.cond.L.Unlock()
|
||||||
|
|
||||||
|
if ml.buffer == nil {
|
||||||
|
return nil, fmt.Errorf("Layer has not been written to yet")
|
||||||
|
}
|
||||||
|
if !ml.written {
|
||||||
|
return nil, ErrLayerLocked
|
||||||
|
}
|
||||||
|
|
||||||
|
return ioutil.NopCloser(bytes.NewReader(ml.buffer.Bytes())), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ml *memoryLayer) Wait() error {
|
||||||
|
ml.cond.L.Lock()
|
||||||
|
defer ml.cond.L.Unlock()
|
||||||
|
|
||||||
|
if ml.buffer == nil {
|
||||||
|
return fmt.Errorf("No writer to wait on")
|
||||||
|
}
|
||||||
|
|
||||||
|
for !ml.written {
|
||||||
|
ml.cond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type memoryLayerWriter struct {
|
||||||
|
cond *sync.Cond
|
||||||
|
buffer *bytes.Buffer
|
||||||
|
done *bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mlw *memoryLayerWriter) Write(p []byte) (int, error) {
|
||||||
|
return mlw.buffer.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mlw *memoryLayerWriter) Close() error {
|
||||||
|
*mlw.done = true
|
||||||
|
mlw.cond.Broadcast()
|
||||||
|
return nil
|
||||||
|
}
|
93
client/pull.go
Normal file
93
client/pull.go
Normal file
|
@ -0,0 +1,93 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
log "github.com/Sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Pull implements a client pull workflow for the image defined by the given
|
||||||
|
// name and tag pair, using the given ObjectStore for local manifest and layer
|
||||||
|
// storage
|
||||||
|
func Pull(c Client, objectStore ObjectStore, name, tag string) error {
|
||||||
|
manifest, err := c.GetImageManifest(name, tag)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.WithField("manifest", manifest).Info("Pulled manifest")
|
||||||
|
|
||||||
|
if len(manifest.FSLayers) != len(manifest.History) {
|
||||||
|
return fmt.Errorf("Length of history not equal to number of layers")
|
||||||
|
}
|
||||||
|
if len(manifest.FSLayers) == 0 {
|
||||||
|
return fmt.Errorf("Image has no layers")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, fsLayer := range manifest.FSLayers {
|
||||||
|
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"layer": fsLayer,
|
||||||
|
}).Warn("Unable to write local layer")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
writer, err := layer.Writer()
|
||||||
|
if err == ErrLayerAlreadyExists {
|
||||||
|
log.WithField("layer", fsLayer).Info("Layer already exists")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err == ErrLayerLocked {
|
||||||
|
log.WithField("layer", fsLayer).Info("Layer download in progress, waiting")
|
||||||
|
layer.Wait()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"layer": fsLayer,
|
||||||
|
}).Warn("Unable to write local layer")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer writer.Close()
|
||||||
|
|
||||||
|
layerReader, length, err := c.GetImageLayer(name, fsLayer.BlobSum, 0)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"layer": fsLayer,
|
||||||
|
}).Warn("Unable to download layer")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer layerReader.Close()
|
||||||
|
|
||||||
|
copied, err := io.Copy(writer, layerReader)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"layer": fsLayer,
|
||||||
|
}).Warn("Unable to download layer")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if copied != int64(length) {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"expected": length,
|
||||||
|
"written": copied,
|
||||||
|
"layer": fsLayer,
|
||||||
|
}).Warn("Wrote incorrect number of bytes for layer")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = objectStore.WriteManifest(name, tag, manifest)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"manifest": manifest,
|
||||||
|
}).Warn("Unable to write image manifest")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
95
client/push.go
Normal file
95
client/push.go
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/sha1"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry"
|
||||||
|
|
||||||
|
log "github.com/Sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Push implements a client push workflow for the image defined by the given
|
||||||
|
// name and tag pair, using the given ObjectStore for local manifest and layer
|
||||||
|
// storage
|
||||||
|
func Push(c Client, objectStore ObjectStore, name, tag string) error {
|
||||||
|
manifest, err := objectStore.Manifest(name, tag)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"name": name,
|
||||||
|
"tag": tag,
|
||||||
|
}).Info("No image found")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, fsLayer := range manifest.FSLayers {
|
||||||
|
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"layer": fsLayer,
|
||||||
|
}).Warn("Unable to read local layer")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
layerReader, err := layer.Reader()
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"layer": fsLayer,
|
||||||
|
}).Warn("Unable to read local layer")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
location, err := c.InitiateLayerUpload(name, fsLayer.BlobSum)
|
||||||
|
if _, ok := err.(*registry.LayerAlreadyExistsError); ok {
|
||||||
|
log.WithField("layer", fsLayer).Info("Layer already exists")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"layer": fsLayer,
|
||||||
|
}).Warn("Unable to upload layer")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
layerBuffer := new(bytes.Buffer)
|
||||||
|
checksum := sha1.New()
|
||||||
|
teeReader := io.TeeReader(layerReader, checksum)
|
||||||
|
|
||||||
|
_, err = io.Copy(layerBuffer, teeReader)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"layer": fsLayer,
|
||||||
|
}).Warn("Unable to read local layer")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.UploadLayer(location, ioutil.NopCloser(layerBuffer), layerBuffer.Len(),
|
||||||
|
®istry.Checksum{HashAlgorithm: "sha1", Sum: string(checksum.Sum(nil))},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"layer": fsLayer,
|
||||||
|
}).Warn("Unable to upload layer")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.PutImageManifest(name, tag, manifest)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"error": err,
|
||||||
|
"manifest": manifest,
|
||||||
|
}).Warn("Unable to upload manifest")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
97
test/test.go
Normal file
97
test/test.go
Normal file
|
@ -0,0 +1,97 @@
|
||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RequestResponseMap is a mapping from Requests to Responses
|
||||||
|
type RequestResponseMap []RequestResponseMapping
|
||||||
|
|
||||||
|
// RequestResponseMapping defines an ordered list of Responses to be sent in
|
||||||
|
// response to a given Request
|
||||||
|
type RequestResponseMapping struct {
|
||||||
|
Request Request
|
||||||
|
Responses []Response
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(bbland): add support for request headers
|
||||||
|
|
||||||
|
// Request is a simplified http.Request object
|
||||||
|
type Request struct {
|
||||||
|
// Method is the http method of the request, for example GET
|
||||||
|
Method string
|
||||||
|
|
||||||
|
// Route is the http route of this request
|
||||||
|
Route string
|
||||||
|
|
||||||
|
// Body is the byte contents of the http request
|
||||||
|
Body []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r Request) String() string {
|
||||||
|
return fmt.Sprintf("%s %s\n%s", r.Method, r.Route, r.Body)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response is a simplified http.Response object
|
||||||
|
type Response struct {
|
||||||
|
// Statuscode is the http status code of the Response
|
||||||
|
StatusCode int
|
||||||
|
|
||||||
|
// Headers are the http headers of this Response
|
||||||
|
Headers http.Header
|
||||||
|
|
||||||
|
// Body is the response body
|
||||||
|
Body []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// testHandler is an http.Handler with a defined mapping from Request to an
|
||||||
|
// ordered list of Response objects
|
||||||
|
type testHandler struct {
|
||||||
|
responseMap map[string][]Response
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHandler returns a new test handler that responds to defined requests
|
||||||
|
// with specified responses
|
||||||
|
// Each time a Request is received, the next Response is returned in the
|
||||||
|
// mapping, until no Responses are defined, at which point a 404 is sent back
|
||||||
|
func NewHandler(requestResponseMap RequestResponseMap) http.Handler {
|
||||||
|
responseMap := make(map[string][]Response)
|
||||||
|
for _, mapping := range requestResponseMap {
|
||||||
|
responseMap[mapping.Request.String()] = mapping.Responses
|
||||||
|
}
|
||||||
|
return &testHandler{responseMap: responseMap}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
defer r.Body.Close()
|
||||||
|
|
||||||
|
requestBody, _ := ioutil.ReadAll(r.Body)
|
||||||
|
request := Request{
|
||||||
|
Method: r.Method,
|
||||||
|
Route: r.URL.Path,
|
||||||
|
Body: requestBody,
|
||||||
|
}
|
||||||
|
|
||||||
|
responses, ok := app.responseMap[request.String()]
|
||||||
|
|
||||||
|
if !ok || len(responses) == 0 {
|
||||||
|
http.NotFound(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
response := responses[0]
|
||||||
|
app.responseMap[request.String()] = responses[1:]
|
||||||
|
|
||||||
|
responseHeader := w.Header()
|
||||||
|
for k, v := range response.Headers {
|
||||||
|
responseHeader[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(response.StatusCode)
|
||||||
|
|
||||||
|
io.Copy(w, bytes.NewReader(response.Body))
|
||||||
|
}
|
Loading…
Reference in a new issue