Merge pull request #742 from BrianBland/ng-push-pull
Adds sliding-window parallelization to Push/Pull operations
This commit is contained in:
commit
1e8f0ce50a
2 changed files with 178 additions and 103 deletions
141
client/pull.go
141
client/pull.go
|
@ -4,9 +4,16 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/docker/docker-registry"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
// simultaneousLayerPullWindow is the size of the parallel layer pull window.
|
||||
// A layer may not be pulled until the layer preceeding it by the length of the
|
||||
// pull window has been successfully pulled.
|
||||
const simultaneousLayerPullWindow = 4
|
||||
|
||||
// Pull implements a client pull workflow for the image defined by the given
|
||||
// name and tag pair, using the given ObjectStore for local manifest and layer
|
||||
// storage
|
||||
|
@ -24,59 +31,28 @@ func Pull(c Client, objectStore ObjectStore, name, tag string) error {
|
|||
return fmt.Errorf("Image has no layers")
|
||||
}
|
||||
|
||||
for _, fsLayer := range manifest.FSLayers {
|
||||
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to write local layer")
|
||||
return err
|
||||
errChans := make([]chan error, len(manifest.FSLayers))
|
||||
for i := range manifest.FSLayers {
|
||||
errChans[i] = make(chan error)
|
||||
}
|
||||
|
||||
// Iterate over each layer in the manifest, simultaneously pulling no more
|
||||
// than simultaneousLayerPullWindow layers at a time. If an error is
|
||||
// received from a layer pull, we abort the push.
|
||||
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPullWindow; i++ {
|
||||
dependentLayer := i - simultaneousLayerPullWindow
|
||||
if dependentLayer >= 0 {
|
||||
err := <-errChans[dependentLayer]
|
||||
if err != nil {
|
||||
log.WithField("error", err).Warn("Pull aborted")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
writer, err := layer.Writer()
|
||||
if err == ErrLayerAlreadyExists {
|
||||
log.WithField("layer", fsLayer).Info("Layer already exists")
|
||||
continue
|
||||
}
|
||||
if err == ErrLayerLocked {
|
||||
log.WithField("layer", fsLayer).Info("Layer download in progress, waiting")
|
||||
layer.Wait()
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to write local layer")
|
||||
return err
|
||||
}
|
||||
defer writer.Close()
|
||||
|
||||
layerReader, length, err := c.GetImageLayer(name, fsLayer.BlobSum, 0)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to download layer")
|
||||
return err
|
||||
}
|
||||
defer layerReader.Close()
|
||||
|
||||
copied, err := io.Copy(writer, layerReader)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to download layer")
|
||||
return err
|
||||
}
|
||||
if copied != int64(length) {
|
||||
log.WithFields(log.Fields{
|
||||
"expected": length,
|
||||
"written": copied,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Wrote incorrect number of bytes for layer")
|
||||
if i < len(manifest.FSLayers) {
|
||||
go func(i int) {
|
||||
errChans[i] <- pullLayer(c, objectStore, name, manifest.FSLayers[i])
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -91,3 +67,66 @@ func Pull(c Client, objectStore ObjectStore, name, tag string) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry.FSLayer) error {
|
||||
log.WithField("layer", fsLayer).Info("Pulling layer")
|
||||
|
||||
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to write local layer")
|
||||
return err
|
||||
}
|
||||
|
||||
writer, err := layer.Writer()
|
||||
if err == ErrLayerAlreadyExists {
|
||||
log.WithField("layer", fsLayer).Info("Layer already exists")
|
||||
return nil
|
||||
}
|
||||
if err == ErrLayerLocked {
|
||||
log.WithField("layer", fsLayer).Info("Layer download in progress, waiting")
|
||||
layer.Wait()
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to write local layer")
|
||||
return err
|
||||
}
|
||||
defer writer.Close()
|
||||
|
||||
layerReader, length, err := c.GetImageLayer(name, fsLayer.BlobSum, 0)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to download layer")
|
||||
return err
|
||||
}
|
||||
defer layerReader.Close()
|
||||
|
||||
copied, err := io.Copy(writer, layerReader)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to download layer")
|
||||
return err
|
||||
}
|
||||
if copied != int64(length) {
|
||||
log.WithFields(log.Fields{
|
||||
"expected": length,
|
||||
"written": copied,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Wrote incorrect number of bytes for layer")
|
||||
return fmt.Errorf(
|
||||
"Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d",
|
||||
fsLayer, length, copied,
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
140
client/push.go
140
client/push.go
|
@ -11,6 +11,13 @@ import (
|
|||
log "github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
// simultaneousLayerPushWindow is the size of the parallel layer push window.
|
||||
// A layer may not be pushed until the layer preceeding it by the length of the
|
||||
// push window has been successfully pushed.
|
||||
const simultaneousLayerPushWindow = 4
|
||||
|
||||
type pushFunction func(fsLayer registry.FSLayer) error
|
||||
|
||||
// Push implements a client push workflow for the image defined by the given
|
||||
// name and tag pair, using the given ObjectStore for local manifest and layer
|
||||
// storage
|
||||
|
@ -25,60 +32,28 @@ func Push(c Client, objectStore ObjectStore, name, tag string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
for _, fsLayer := range manifest.FSLayers {
|
||||
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to read local layer")
|
||||
return err
|
||||
errChans := make([]chan error, len(manifest.FSLayers))
|
||||
for i := range manifest.FSLayers {
|
||||
errChans[i] = make(chan error)
|
||||
}
|
||||
|
||||
// Iterate over each layer in the manifest, simultaneously pushing no more
|
||||
// than simultaneousLayerPushWindow layers at a time. If an error is
|
||||
// received from a layer push, we abort the push.
|
||||
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ {
|
||||
dependentLayer := i - simultaneousLayerPushWindow
|
||||
if dependentLayer >= 0 {
|
||||
err := <-errChans[dependentLayer]
|
||||
if err != nil {
|
||||
log.WithField("error", err).Warn("Push aborted")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
layerReader, err := layer.Reader()
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to read local layer")
|
||||
return err
|
||||
}
|
||||
|
||||
location, err := c.InitiateLayerUpload(name, fsLayer.BlobSum)
|
||||
if _, ok := err.(*registry.LayerAlreadyExistsError); ok {
|
||||
log.WithField("layer", fsLayer).Info("Layer already exists")
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to upload layer")
|
||||
return err
|
||||
}
|
||||
|
||||
layerBuffer := new(bytes.Buffer)
|
||||
checksum := sha1.New()
|
||||
teeReader := io.TeeReader(layerReader, checksum)
|
||||
|
||||
_, err = io.Copy(layerBuffer, teeReader)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to read local layer")
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.UploadLayer(location, ioutil.NopCloser(layerBuffer), layerBuffer.Len(),
|
||||
®istry.Checksum{HashAlgorithm: "sha1", Sum: string(checksum.Sum(nil))},
|
||||
)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to upload layer")
|
||||
return err
|
||||
if i < len(manifest.FSLayers) {
|
||||
go func(i int) {
|
||||
errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i])
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -93,3 +68,64 @@ func Push(c Client, objectStore ObjectStore, name, tag string) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer registry.FSLayer) error {
|
||||
log.WithField("layer", fsLayer).Info("Pushing layer")
|
||||
|
||||
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to read local layer")
|
||||
return err
|
||||
}
|
||||
|
||||
layerReader, err := layer.Reader()
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to read local layer")
|
||||
return err
|
||||
}
|
||||
|
||||
location, err := c.InitiateLayerUpload(name, fsLayer.BlobSum)
|
||||
if _, ok := err.(*registry.LayerAlreadyExistsError); ok {
|
||||
log.WithField("layer", fsLayer).Info("Layer already exists")
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to upload layer")
|
||||
return err
|
||||
}
|
||||
|
||||
layerBuffer := new(bytes.Buffer)
|
||||
checksum := sha1.New()
|
||||
teeReader := io.TeeReader(layerReader, checksum)
|
||||
|
||||
_, err = io.Copy(layerBuffer, teeReader)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to read local layer")
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.UploadLayer(location, ioutil.NopCloser(layerBuffer), layerBuffer.Len(),
|
||||
®istry.Checksum{HashAlgorithm: "sha1", Sum: string(checksum.Sum(nil))},
|
||||
)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to upload layer")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue