package client import ( "fmt" "io" log "github.com/Sirupsen/logrus" "github.com/docker/distribution/manifest" ) // simultaneousLayerPullWindow is the size of the parallel layer pull window. // A layer may not be pulled until the layer preceeding it by the length of the // pull window has been successfully pulled. const simultaneousLayerPullWindow = 4 // Pull implements a client pull workflow for the image defined by the given // name and tag pair, using the given ObjectStore for local manifest and layer // storage func Pull(c Client, objectStore ObjectStore, name, tag string) error { manifest, err := c.GetImageManifest(name, tag) if err != nil { return err } log.WithField("manifest", manifest).Info("Pulled manifest") if len(manifest.FSLayers) != len(manifest.History) { return fmt.Errorf("Length of history not equal to number of layers") } if len(manifest.FSLayers) == 0 { return fmt.Errorf("Image has no layers") } errChans := make([]chan error, len(manifest.FSLayers)) for i := range manifest.FSLayers { errChans[i] = make(chan error) } // To avoid leak of goroutines we must notify // pullLayer goroutines about a cancelation, // otherwise they will lock forever. cancelCh := make(chan struct{}) // Iterate over each layer in the manifest, simultaneously pulling no more // than simultaneousLayerPullWindow layers at a time. If an error is // received from a layer pull, we abort the push. for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPullWindow; i++ { dependentLayer := i - simultaneousLayerPullWindow if dependentLayer >= 0 { err := <-errChans[dependentLayer] if err != nil { log.WithField("error", err).Warn("Pull aborted") close(cancelCh) return err } } if i < len(manifest.FSLayers) { go func(i int) { select { case errChans[i] <- pullLayer(c, objectStore, name, manifest.FSLayers[i]): case <-cancelCh: // no chance to recv until cancelCh's closed } }(i) } } err = objectStore.WriteManifest(name, tag, manifest) if err != nil { log.WithFields(log.Fields{ "error": err, "manifest": manifest, }).Warn("Unable to write image manifest") return err } return nil } func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error { log.WithField("layer", fsLayer).Info("Pulling layer") layer, err := objectStore.Layer(fsLayer.BlobSum) if err != nil { log.WithFields(log.Fields{ "error": err, "layer": fsLayer, }).Warn("Unable to write local layer") return err } layerWriter, err := layer.Writer() if err == ErrLayerAlreadyExists { log.WithField("layer", fsLayer).Info("Layer already exists") return nil } if err == ErrLayerLocked { log.WithField("layer", fsLayer).Info("Layer download in progress, waiting") layer.Wait() return nil } if err != nil { log.WithFields(log.Fields{ "error": err, "layer": fsLayer, }).Warn("Unable to write local layer") return err } defer layerWriter.Close() if layerWriter.CurrentSize() > 0 { log.WithFields(log.Fields{ "layer": fsLayer, "currentSize": layerWriter.CurrentSize(), "size": layerWriter.Size(), }).Info("Layer partially downloaded, resuming") } layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, layerWriter.CurrentSize()) if err != nil { log.WithFields(log.Fields{ "error": err, "layer": fsLayer, }).Warn("Unable to download layer") return err } defer layerReader.Close() layerWriter.SetSize(layerWriter.CurrentSize() + length) _, err = io.Copy(layerWriter, layerReader) if err != nil { log.WithFields(log.Fields{ "error": err, "layer": fsLayer, }).Warn("Unable to download layer") return err } if layerWriter.CurrentSize() != layerWriter.Size() { log.WithFields(log.Fields{ "size": layerWriter.Size(), "currentSize": layerWriter.CurrentSize(), "layer": fsLayer, }).Warn("Layer invalid size") return fmt.Errorf( "Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d", fsLayer, layerWriter.Size(), layerWriter.CurrentSize(), ) } return nil }