Move client package under registry package
Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
c3b07952ad
commit
d6308bc62b
6 changed files with 1620 additions and 0 deletions
137
docs/client/push.go
Normal file
137
docs/client/push.go
Normal file
|
@ -0,0 +1,137 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/docker/distribution/manifest"
|
||||
)
|
||||
|
||||
// simultaneousLayerPushWindow is the size of the parallel layer push window.
|
||||
// A layer may not be pushed until the layer preceeding it by the length of the
|
||||
// push window has been successfully pushed.
|
||||
const simultaneousLayerPushWindow = 4
|
||||
|
||||
type pushFunction func(fsLayer manifest.FSLayer) error
|
||||
|
||||
// Push implements a client push workflow for the image defined by the given
|
||||
// name and tag pair, using the given ObjectStore for local manifest and layer
|
||||
// storage
|
||||
func Push(c Client, objectStore ObjectStore, name, tag string) error {
|
||||
manifest, err := objectStore.Manifest(name, tag)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"name": name,
|
||||
"tag": tag,
|
||||
}).Info("No image found")
|
||||
return err
|
||||
}
|
||||
|
||||
errChans := make([]chan error, len(manifest.FSLayers))
|
||||
for i := range manifest.FSLayers {
|
||||
errChans[i] = make(chan error)
|
||||
}
|
||||
|
||||
cancelCh := make(chan struct{})
|
||||
|
||||
// Iterate over each layer in the manifest, simultaneously pushing no more
|
||||
// than simultaneousLayerPushWindow layers at a time. If an error is
|
||||
// received from a layer push, we abort the push.
|
||||
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ {
|
||||
dependentLayer := i - simultaneousLayerPushWindow
|
||||
if dependentLayer >= 0 {
|
||||
err := <-errChans[dependentLayer]
|
||||
if err != nil {
|
||||
log.WithField("error", err).Warn("Push aborted")
|
||||
close(cancelCh)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if i < len(manifest.FSLayers) {
|
||||
go func(i int) {
|
||||
select {
|
||||
case errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i]):
|
||||
case <-cancelCh: // recv broadcast notification about cancelation
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
|
||||
err = c.PutImageManifest(name, tag, manifest)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"manifest": manifest,
|
||||
}).Warn("Unable to upload manifest")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error {
|
||||
log.WithField("layer", fsLayer).Info("Pushing layer")
|
||||
|
||||
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to read local layer")
|
||||
return err
|
||||
}
|
||||
|
||||
layerReader, err := layer.Reader()
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to read local layer")
|
||||
return err
|
||||
}
|
||||
defer layerReader.Close()
|
||||
|
||||
if layerReader.CurrentSize() != layerReader.Size() {
|
||||
log.WithFields(log.Fields{
|
||||
"layer": fsLayer,
|
||||
"currentSize": layerReader.CurrentSize(),
|
||||
"size": layerReader.Size(),
|
||||
}).Warn("Local layer incomplete")
|
||||
return fmt.Errorf("Local layer incomplete")
|
||||
}
|
||||
|
||||
length, err := c.BlobLength(name, fsLayer.BlobSum)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to check existence of remote layer")
|
||||
return err
|
||||
}
|
||||
if length >= 0 {
|
||||
log.WithField("layer", fsLayer).Info("Layer already exists")
|
||||
return nil
|
||||
}
|
||||
|
||||
location, err := c.InitiateBlobUpload(name)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to upload layer")
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.UploadBlob(location, layerReader, int(layerReader.CurrentSize()), fsLayer.BlobSum)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
"layer": fsLayer,
|
||||
}).Warn("Unable to upload layer")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue