From 50d64ac63a41f6d9f29fa5d3e3b82c32ed7732a1 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 18 Nov 2014 15:44:39 -0800 Subject: [PATCH] Allows layers to be partially pulled and resumed Adds a sort of contrived test for resumable pulls --- client/client_test.go | 131 ++++++++++++++++++++++++++++++++++++ client/objectstore.go | 150 +++++++++++++++++++++++++++++++----------- client/pull.go | 30 ++++++--- client/push.go | 19 +++--- 4 files changed, 271 insertions(+), 59 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index a77e7665e..267f5a5b6 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -117,6 +117,7 @@ func TestPush(t *testing.T) { t.Fatal(err) } + writer.SetSize(len(blob.contents)) writer.Write(blob.contents) writer.Close() } @@ -235,3 +236,133 @@ func TestPull(t *testing.T) { } } } + +func TestPullResume(t *testing.T) { + name := "hello/world" + tag := "sometag" + testBlobs := []testBlob{ + { + digest: "12345", + contents: []byte("some contents"), + }, + { + digest: "98765", + contents: []byte("some other contents"), + }, + } + layers := make([]registry.FSLayer, len(testBlobs)) + history := make([]registry.ManifestHistory, len(testBlobs)) + + for i, layer := range testBlobs { + layers[i] = registry.FSLayer{BlobSum: layer.digest} + history[i] = registry.ManifestHistory{V1Compatibility: layer.digest.String()} + } + + manifest := ®istry.ImageManifest{ + Name: name, + Tag: tag, + Architecture: "x86", + FSLayers: layers, + History: history, + SchemaVersion: 1, + } + manifestBytes, err := json.Marshal(manifest) + + layerRequestResponseMappings := make([]testutil.RequestResponseMapping, 2*len(testBlobs)) + for i, blob := range testBlobs { + layerRequestResponseMappings[2*i] = testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/" + name + "/blob/" + blob.digest.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Body: blob.contents[:len(blob.contents)/2], + Headers: http.Header(map[string][]string{ + "Content-Length": {fmt.Sprint(len(blob.contents))}, + }), + }, + } + layerRequestResponseMappings[2*i+1] = testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/" + name + "/blob/" + blob.digest.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Body: blob.contents[len(blob.contents)/2:], + }, + } + } + + for i := 0; i < 3; i++ { + layerRequestResponseMappings = append(layerRequestResponseMappings, testutil.RequestResponseMap{ + testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/" + name + "/manifest/" + tag, + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Body: manifestBytes, + }, + }, + }...) + } + + handler := testutil.NewHandler(layerRequestResponseMappings) + server := httptest.NewServer(handler) + client := New(server.URL) + objectStore := &memoryObjectStore{ + mutex: new(sync.Mutex), + manifestStorage: make(map[string]*registry.ImageManifest), + layerStorage: make(map[digest.Digest]Layer), + } + + for attempts := 0; attempts < 3; attempts++ { + err = Pull(client, objectStore, name, tag) + if err == nil { + break + } + } + + if err != nil { + t.Fatal(err) + } + + m, err := objectStore.Manifest(name, tag) + if err != nil { + t.Fatal(err) + } + + mBytes, err := json.Marshal(m) + if err != nil { + t.Fatal(err) + } + + if string(mBytes) != string(manifestBytes) { + t.Fatal("Incorrect manifest") + } + + for _, blob := range testBlobs { + l, err := objectStore.Layer(blob.digest) + if err != nil { + t.Fatal(err) + } + + reader, err := l.Reader() + if err != nil { + t.Fatal(err) + } + defer reader.Close() + + layerBytes, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + + if string(layerBytes) != string(blob.contents) { + t.Fatal("Incorrect blob") + } + } +} diff --git a/client/objectstore.go b/client/objectstore.go index bee73ff07..2e6f0b45d 100644 --- a/client/objectstore.go +++ b/client/objectstore.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "sync" "github.com/docker/docker-registry" @@ -39,20 +38,49 @@ type ObjectStore interface { } // Layer is a generic image layer interface. -// A Layer may only be written to once +// A Layer may not be written to if it is already complete. type Layer interface { - // Reader returns an io.ReadCloser which reads the contents of the layer - Reader() (io.ReadCloser, error) + // Reader returns a LayerReader or an error if the layer has not been + // written to or is currently being written to. + Reader() (LayerReader, error) - // Writer returns an io.WriteCloser which may write the contents of the - // layer. This method may only be called once per Layer, and the contents - // are made available on Close - Writer() (io.WriteCloser, error) + // Writer returns a LayerWriter or an error if the layer has been fully + // written to or is currently being written to. + Writer() (LayerWriter, error) - // Wait blocks until the Layer can be read from + // Wait blocks until the Layer can be read from. Wait() error } +// LayerReader is a read-only handle to a Layer, which exposes the CurrentSize +// and full Size in addition to implementing the io.ReadCloser interface. +type LayerReader interface { + io.ReadCloser + + // CurrentSize returns the number of bytes written to the underlying Layer + CurrentSize() int + + // Size returns the full size of the underlying Layer + Size() int +} + +// LayerWriter is a write-only handle to a Layer, which exposes the CurrentSize +// and full Size in addition to implementing the io.WriteCloser interface. +// SetSize must be called on this LayerWriter before it can be written to. +type LayerWriter interface { + io.WriteCloser + + // CurrentSize returns the number of bytes written to the underlying Layer + CurrentSize() int + + // Size returns the full size of the underlying Layer + Size() int + + // SetSize sets the full size of the underlying Layer. + // This must be called before any calls to Write + SetSize(int) error +} + // memoryObjectStore is an in-memory implementation of the ObjectStore interface type memoryObjectStore struct { mutex *sync.Mutex @@ -93,67 +121,113 @@ func (objStore *memoryObjectStore) Layer(dgst digest.Digest) (Layer, error) { } type memoryLayer struct { - cond *sync.Cond - buffer *bytes.Buffer - written bool + cond *sync.Cond + contents []byte + expectedSize int + writing bool } -func (ml *memoryLayer) Writer() (io.WriteCloser, error) { +func (ml *memoryLayer) Reader() (LayerReader, error) { ml.cond.L.Lock() defer ml.cond.L.Unlock() - if ml.buffer != nil { - if !ml.written { - return nil, ErrLayerLocked - } - return nil, ErrLayerAlreadyExists - } - - ml.buffer = new(bytes.Buffer) - return &memoryLayerWriter{cond: ml.cond, buffer: ml.buffer, done: &ml.written}, nil -} - -func (ml *memoryLayer) Reader() (io.ReadCloser, error) { - ml.cond.L.Lock() - defer ml.cond.L.Unlock() - - if ml.buffer == nil { + if ml.contents == nil { return nil, fmt.Errorf("Layer has not been written to yet") } - if !ml.written { + if ml.writing { return nil, ErrLayerLocked } - return ioutil.NopCloser(bytes.NewReader(ml.buffer.Bytes())), nil + return &memoryLayerReader{ml: ml, reader: bytes.NewReader(ml.contents)}, nil +} + +func (ml *memoryLayer) Writer() (LayerWriter, error) { + ml.cond.L.Lock() + defer ml.cond.L.Unlock() + + if ml.contents != nil { + if ml.writing { + return nil, ErrLayerLocked + } + if ml.expectedSize == len(ml.contents) { + return nil, ErrLayerAlreadyExists + } + } else { + ml.contents = make([]byte, 0) + } + + ml.writing = true + return &memoryLayerWriter{ml: ml, buffer: bytes.NewBuffer(ml.contents)}, nil } func (ml *memoryLayer) Wait() error { ml.cond.L.Lock() defer ml.cond.L.Unlock() - if ml.buffer == nil { + if ml.contents == nil { return fmt.Errorf("No writer to wait on") } - for !ml.written { + for ml.writing { ml.cond.Wait() } return nil } +type memoryLayerReader struct { + ml *memoryLayer + reader *bytes.Reader +} + +func (mlr *memoryLayerReader) Read(p []byte) (int, error) { + return mlr.reader.Read(p) +} + +func (mlr *memoryLayerReader) Close() error { + return nil +} + +func (mlr *memoryLayerReader) CurrentSize() int { + return len(mlr.ml.contents) +} + +func (mlr *memoryLayerReader) Size() int { + return mlr.ml.expectedSize +} + type memoryLayerWriter struct { - cond *sync.Cond + ml *memoryLayer buffer *bytes.Buffer - done *bool } func (mlw *memoryLayerWriter) Write(p []byte) (int, error) { - return mlw.buffer.Write(p) + if mlw.ml.expectedSize == 0 { + return 0, fmt.Errorf("Must set size before writing to layer") + } + wrote, err := mlw.buffer.Write(p) + mlw.ml.contents = mlw.buffer.Bytes() + return wrote, err } func (mlw *memoryLayerWriter) Close() error { - *mlw.done = true - mlw.cond.Broadcast() + mlw.ml.writing = false + mlw.ml.cond.Broadcast() + return nil +} + +func (mlw *memoryLayerWriter) CurrentSize() int { + return len(mlw.ml.contents) +} + +func (mlw *memoryLayerWriter) Size() int { + return mlw.ml.expectedSize +} + +func (mlw *memoryLayerWriter) SetSize(size int) error { + if !mlw.ml.writing { + return fmt.Errorf("Layer is closed for writing") + } + mlw.ml.expectedSize = size return nil } diff --git a/client/pull.go b/client/pull.go index bce067568..5d7ee56f3 100644 --- a/client/pull.go +++ b/client/pull.go @@ -89,7 +89,7 @@ func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. return err } - writer, err := layer.Writer() + layerWriter, err := layer.Writer() if err == ErrLayerAlreadyExists { log.WithField("layer", fsLayer).Info("Layer already exists") return nil @@ -106,9 +106,17 @@ func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. }).Warn("Unable to write local layer") return err } - defer writer.Close() + defer layerWriter.Close() - layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, 0) + if layerWriter.CurrentSize() > 0 { + log.WithFields(log.Fields{ + "layer": fsLayer, + "currentSize": layerWriter.CurrentSize(), + "size": layerWriter.Size(), + }).Info("Layer partially downloaded, resuming") + } + + layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, layerWriter.CurrentSize()) if err != nil { log.WithFields(log.Fields{ "error": err, @@ -118,7 +126,9 @@ func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. } defer layerReader.Close() - copied, err := io.Copy(writer, layerReader) + layerWriter.SetSize(layerWriter.CurrentSize() + length) + + _, err = io.Copy(layerWriter, layerReader) if err != nil { log.WithFields(log.Fields{ "error": err, @@ -126,15 +136,15 @@ func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. }).Warn("Unable to download layer") return err } - if copied != int64(length) { + if layerWriter.CurrentSize() != layerWriter.Size() { log.WithFields(log.Fields{ - "expected": length, - "written": copied, - "layer": fsLayer, - }).Warn("Wrote incorrect number of bytes for layer") + "size": layerWriter.Size(), + "currentSize": layerWriter.CurrentSize(), + "layer": fsLayer, + }).Warn("Layer invalid size") return fmt.Errorf( "Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d", - fsLayer, length, copied, + fsLayer, layerWriter.Size(), layerWriter.CurrentSize(), ) } return nil diff --git a/client/push.go b/client/push.go index 087260586..fae5cc10b 100644 --- a/client/push.go +++ b/client/push.go @@ -1,9 +1,7 @@ package client import ( - "bytes" - "io" - "io/ioutil" + "errors" "github.com/docker/docker-registry" @@ -96,14 +94,13 @@ func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. } defer layerReader.Close() - layerBuffer := new(bytes.Buffer) - layerSize, err := io.Copy(layerBuffer, layerReader) - if err != nil { + if layerReader.CurrentSize() != layerReader.Size() { log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to read local layer") - return err + "layer": fsLayer, + "currentSize": layerReader.CurrentSize(), + "size": layerReader.Size(), + }).Warn("Local layer incomplete") + return errors.New("Local layer incomplete") } length, err := c.BlobLength(name, fsLayer.BlobSum) @@ -128,7 +125,7 @@ func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. return err } - err = c.UploadBlob(location, ioutil.NopCloser(layerBuffer), int(layerSize), fsLayer.BlobSum) + err = c.UploadBlob(location, layerReader, int(layerReader.CurrentSize()), fsLayer.BlobSum) if err != nil { log.WithFields(log.Fields{ "error": err,