From 28b7b82e2da45316dfe358e9e1779ac98e968dec Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 17 Nov 2014 17:33:03 -0800 Subject: [PATCH] Adds sliding-window parallelization to Push/Pull operations A layer can only be pushed/pulled if the layer preceding it by the length of the push/pull window has been successfully pushed. An error returned from pushing or pulling any layer will cause the full operation to be aborted. --- client/pull.go | 141 +++++++++++++++++++++++++++++++------------------ client/push.go | 140 ++++++++++++++++++++++++++++++------------------ 2 files changed, 178 insertions(+), 103 deletions(-) diff --git a/client/pull.go b/client/pull.go index 91c7283a..75cc9af1 100644 --- a/client/pull.go +++ b/client/pull.go @@ -4,9 +4,16 @@ import ( "fmt" "io" + "github.com/docker/docker-registry" + log "github.com/Sirupsen/logrus" ) +// simultaneousLayerPullWindow is the size of the parallel layer pull window. +// A layer may not be pulled until the layer preceeding it by the length of the +// pull window has been successfully pulled. +const simultaneousLayerPullWindow = 4 + // Pull implements a client pull workflow for the image defined by the given // name and tag pair, using the given ObjectStore for local manifest and layer // storage @@ -24,59 +31,28 @@ func Pull(c Client, objectStore ObjectStore, name, tag string) error { return fmt.Errorf("Image has no layers") } - for _, fsLayer := range manifest.FSLayers { - layer, err := objectStore.Layer(fsLayer.BlobSum) - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to write local layer") - return err + errChans := make([]chan error, len(manifest.FSLayers)) + for i := range manifest.FSLayers { + errChans[i] = make(chan error) + } + + // Iterate over each layer in the manifest, simultaneously pulling no more + // than simultaneousLayerPullWindow layers at a time. If an error is + // received from a layer pull, we abort the push. + for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPullWindow; i++ { + dependentLayer := i - simultaneousLayerPullWindow + if dependentLayer >= 0 { + err := <-errChans[dependentLayer] + if err != nil { + log.WithField("error", err).Warn("Pull aborted") + return err + } } - writer, err := layer.Writer() - if err == ErrLayerAlreadyExists { - log.WithField("layer", fsLayer).Info("Layer already exists") - continue - } - if err == ErrLayerLocked { - log.WithField("layer", fsLayer).Info("Layer download in progress, waiting") - layer.Wait() - continue - } - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to write local layer") - return err - } - defer writer.Close() - - layerReader, length, err := c.GetImageLayer(name, fsLayer.BlobSum, 0) - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to download layer") - return err - } - defer layerReader.Close() - - copied, err := io.Copy(writer, layerReader) - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to download layer") - return err - } - if copied != int64(length) { - log.WithFields(log.Fields{ - "expected": length, - "written": copied, - "layer": fsLayer, - }).Warn("Wrote incorrect number of bytes for layer") + if i < len(manifest.FSLayers) { + go func(i int) { + errChans[i] <- pullLayer(c, objectStore, name, manifest.FSLayers[i]) + }(i) } } @@ -91,3 +67,66 @@ func Pull(c Client, objectStore ObjectStore, name, tag string) error { return nil } + +func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry.FSLayer) error { + log.WithField("layer", fsLayer).Info("Pulling layer") + + layer, err := objectStore.Layer(fsLayer.BlobSum) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to write local layer") + return err + } + + writer, err := layer.Writer() + if err == ErrLayerAlreadyExists { + log.WithField("layer", fsLayer).Info("Layer already exists") + return nil + } + if err == ErrLayerLocked { + log.WithField("layer", fsLayer).Info("Layer download in progress, waiting") + layer.Wait() + return nil + } + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to write local layer") + return err + } + defer writer.Close() + + layerReader, length, err := c.GetImageLayer(name, fsLayer.BlobSum, 0) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to download layer") + return err + } + defer layerReader.Close() + + copied, err := io.Copy(writer, layerReader) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to download layer") + return err + } + if copied != int64(length) { + log.WithFields(log.Fields{ + "expected": length, + "written": copied, + "layer": fsLayer, + }).Warn("Wrote incorrect number of bytes for layer") + return fmt.Errorf( + "Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d", + fsLayer, length, copied, + ) + } + return nil +} diff --git a/client/push.go b/client/push.go index 4b9634e0..a1fb0e23 100644 --- a/client/push.go +++ b/client/push.go @@ -11,6 +11,13 @@ import ( log "github.com/Sirupsen/logrus" ) +// simultaneousLayerPushWindow is the size of the parallel layer push window. +// A layer may not be pushed until the layer preceeding it by the length of the +// push window has been successfully pushed. +const simultaneousLayerPushWindow = 4 + +type pushFunction func(fsLayer registry.FSLayer) error + // Push implements a client push workflow for the image defined by the given // name and tag pair, using the given ObjectStore for local manifest and layer // storage @@ -25,60 +32,28 @@ func Push(c Client, objectStore ObjectStore, name, tag string) error { return err } - for _, fsLayer := range manifest.FSLayers { - layer, err := objectStore.Layer(fsLayer.BlobSum) - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to read local layer") - return err + errChans := make([]chan error, len(manifest.FSLayers)) + for i := range manifest.FSLayers { + errChans[i] = make(chan error) + } + + // Iterate over each layer in the manifest, simultaneously pushing no more + // than simultaneousLayerPushWindow layers at a time. If an error is + // received from a layer push, we abort the push. + for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ { + dependentLayer := i - simultaneousLayerPushWindow + if dependentLayer >= 0 { + err := <-errChans[dependentLayer] + if err != nil { + log.WithField("error", err).Warn("Push aborted") + return err + } } - layerReader, err := layer.Reader() - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to read local layer") - return err - } - - location, err := c.InitiateLayerUpload(name, fsLayer.BlobSum) - if _, ok := err.(*registry.LayerAlreadyExistsError); ok { - log.WithField("layer", fsLayer).Info("Layer already exists") - continue - } - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to upload layer") - return err - } - - layerBuffer := new(bytes.Buffer) - checksum := sha1.New() - teeReader := io.TeeReader(layerReader, checksum) - - _, err = io.Copy(layerBuffer, teeReader) - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to read local layer") - return err - } - - err = c.UploadLayer(location, ioutil.NopCloser(layerBuffer), layerBuffer.Len(), - ®istry.Checksum{HashAlgorithm: "sha1", Sum: string(checksum.Sum(nil))}, - ) - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "layer": fsLayer, - }).Warn("Unable to upload layer") - return err + if i < len(manifest.FSLayers) { + go func(i int) { + errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i]) + }(i) } } @@ -93,3 +68,64 @@ func Push(c Client, objectStore ObjectStore, name, tag string) error { return nil } + +func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer registry.FSLayer) error { + log.WithField("layer", fsLayer).Info("Pushing layer") + + layer, err := objectStore.Layer(fsLayer.BlobSum) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to read local layer") + return err + } + + layerReader, err := layer.Reader() + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to read local layer") + return err + } + + location, err := c.InitiateLayerUpload(name, fsLayer.BlobSum) + if _, ok := err.(*registry.LayerAlreadyExistsError); ok { + log.WithField("layer", fsLayer).Info("Layer already exists") + return nil + } + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to upload layer") + return err + } + + layerBuffer := new(bytes.Buffer) + checksum := sha1.New() + teeReader := io.TeeReader(layerReader, checksum) + + _, err = io.Copy(layerBuffer, teeReader) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to read local layer") + return err + } + + err = c.UploadLayer(location, ioutil.NopCloser(layerBuffer), layerBuffer.Len(), + ®istry.Checksum{HashAlgorithm: "sha1", Sum: string(checksum.Sum(nil))}, + ) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to upload layer") + return err + } + + return nil +}