forked from TrueCloudLab/distribution
d08f0edcf1
Routes and errors are now all referenced from a single v2 package. This packages exports are acceptable for use in the server side as well as integration into docker core.
137 lines
3.4 KiB
Go
137 lines
3.4 KiB
Go
package client
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
"github.com/docker/docker-registry/storage"
|
|
)
|
|
|
|
// simultaneousLayerPushWindow is the size of the parallel layer push window.
|
|
// A layer may not be pushed until the layer preceeding it by the length of the
|
|
// push window has been successfully pushed.
|
|
const simultaneousLayerPushWindow = 4
|
|
|
|
type pushFunction func(fsLayer storage.FSLayer) error
|
|
|
|
// Push implements a client push workflow for the image defined by the given
|
|
// name and tag pair, using the given ObjectStore for local manifest and layer
|
|
// storage
|
|
func Push(c Client, objectStore ObjectStore, name, tag string) error {
|
|
manifest, err := objectStore.Manifest(name, tag)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"name": name,
|
|
"tag": tag,
|
|
}).Info("No image found")
|
|
return err
|
|
}
|
|
|
|
errChans := make([]chan error, len(manifest.FSLayers))
|
|
for i := range manifest.FSLayers {
|
|
errChans[i] = make(chan error)
|
|
}
|
|
|
|
cancelCh := make(chan struct{})
|
|
|
|
// Iterate over each layer in the manifest, simultaneously pushing no more
|
|
// than simultaneousLayerPushWindow layers at a time. If an error is
|
|
// received from a layer push, we abort the push.
|
|
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ {
|
|
dependentLayer := i - simultaneousLayerPushWindow
|
|
if dependentLayer >= 0 {
|
|
err := <-errChans[dependentLayer]
|
|
if err != nil {
|
|
log.WithField("error", err).Warn("Push aborted")
|
|
close(cancelCh)
|
|
return err
|
|
}
|
|
}
|
|
|
|
if i < len(manifest.FSLayers) {
|
|
go func(i int) {
|
|
select {
|
|
case errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i]):
|
|
case <-cancelCh: // recv broadcast notification about cancelation
|
|
}
|
|
}(i)
|
|
}
|
|
}
|
|
|
|
err = c.PutImageManifest(name, tag, manifest)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"manifest": manifest,
|
|
}).Warn("Unable to upload manifest")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer storage.FSLayer) error {
|
|
log.WithField("layer", fsLayer).Info("Pushing layer")
|
|
|
|
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"layer": fsLayer,
|
|
}).Warn("Unable to read local layer")
|
|
return err
|
|
}
|
|
|
|
layerReader, err := layer.Reader()
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"layer": fsLayer,
|
|
}).Warn("Unable to read local layer")
|
|
return err
|
|
}
|
|
defer layerReader.Close()
|
|
|
|
if layerReader.CurrentSize() != layerReader.Size() {
|
|
log.WithFields(log.Fields{
|
|
"layer": fsLayer,
|
|
"currentSize": layerReader.CurrentSize(),
|
|
"size": layerReader.Size(),
|
|
}).Warn("Local layer incomplete")
|
|
return fmt.Errorf("Local layer incomplete")
|
|
}
|
|
|
|
length, err := c.BlobLength(name, fsLayer.BlobSum)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"layer": fsLayer,
|
|
}).Warn("Unable to check existence of remote layer")
|
|
return err
|
|
}
|
|
if length >= 0 {
|
|
log.WithField("layer", fsLayer).Info("Layer already exists")
|
|
return nil
|
|
}
|
|
|
|
location, err := c.InitiateBlobUpload(name)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"layer": fsLayer,
|
|
}).Warn("Unable to upload layer")
|
|
return err
|
|
}
|
|
|
|
err = c.UploadBlob(location, layerReader, int(layerReader.CurrentSize()), fsLayer.BlobSum)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"layer": fsLayer,
|
|
}).Warn("Unable to upload layer")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|