package client

import (
	"fmt"

	log "github.com/Sirupsen/logrus"
	"github.com/docker/distribution/manifest"
)

// simultaneousLayerPushWindow is the size of the parallel layer push window.
// A layer may not be pushed until the layer preceeding it by the length of the
// push window has been successfully pushed.
const simultaneousLayerPushWindow = 4

type pushFunction func(fsLayer manifest.FSLayer) error

// Push implements a client push workflow for the image defined by the given
// name and tag pair, using the given ObjectStore for local manifest and layer
// storage
func Push(c Client, objectStore ObjectStore, name, tag string) error {
	manifest, err := objectStore.Manifest(name, tag)
	if err != nil {
		log.WithFields(log.Fields{
			"error": err,
			"name":  name,
			"tag":   tag,
		}).Info("No image found")
		return err
	}

	errChans := make([]chan error, len(manifest.FSLayers))
	for i := range manifest.FSLayers {
		errChans[i] = make(chan error)
	}

	cancelCh := make(chan struct{})

	// Iterate over each layer in the manifest, simultaneously pushing no more
	// than simultaneousLayerPushWindow layers at a time. If an error is
	// received from a layer push, we abort the push.
	for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ {
		dependentLayer := i - simultaneousLayerPushWindow
		if dependentLayer >= 0 {
			err := <-errChans[dependentLayer]
			if err != nil {
				log.WithField("error", err).Warn("Push aborted")
				close(cancelCh)
				return err
			}
		}

		if i < len(manifest.FSLayers) {
			go func(i int) {
				select {
				case errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i]):
				case <-cancelCh: // recv broadcast notification about cancelation
				}
			}(i)
		}
	}

	err = c.PutImageManifest(name, tag, manifest)
	if err != nil {
		log.WithFields(log.Fields{
			"error":    err,
			"manifest": manifest,
		}).Warn("Unable to upload manifest")
		return err
	}

	return nil
}

func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error {
	log.WithField("layer", fsLayer).Info("Pushing layer")

	layer, err := objectStore.Layer(fsLayer.BlobSum)
	if err != nil {
		log.WithFields(log.Fields{
			"error": err,
			"layer": fsLayer,
		}).Warn("Unable to read local layer")
		return err
	}

	layerReader, err := layer.Reader()
	if err != nil {
		log.WithFields(log.Fields{
			"error": err,
			"layer": fsLayer,
		}).Warn("Unable to read local layer")
		return err
	}
	defer layerReader.Close()

	if layerReader.CurrentSize() != layerReader.Size() {
		log.WithFields(log.Fields{
			"layer":       fsLayer,
			"currentSize": layerReader.CurrentSize(),
			"size":        layerReader.Size(),
		}).Warn("Local layer incomplete")
		return fmt.Errorf("Local layer incomplete")
	}

	length, err := c.BlobLength(name, fsLayer.BlobSum)
	if err != nil {
		log.WithFields(log.Fields{
			"error": err,
			"layer": fsLayer,
		}).Warn("Unable to check existence of remote layer")
		return err
	}
	if length >= 0 {
		log.WithField("layer", fsLayer).Info("Layer already exists")
		return nil
	}

	location, err := c.InitiateBlobUpload(name)
	if err != nil {
		log.WithFields(log.Fields{
			"error": err,
			"layer": fsLayer,
		}).Warn("Unable to upload layer")
		return err
	}

	err = c.UploadBlob(location, layerReader, int(layerReader.CurrentSize()), fsLayer.BlobSum)
	if err != nil {
		log.WithFields(log.Fields{
			"error": err,
			"layer": fsLayer,
		}).Warn("Unable to upload layer")
		return err
	}

	return nil
}