distribution/storage/layerupload.go
Stephen J Day 0270bec916 Handle empty blob files more appropriately
Several API tests were added to ensure correct acceptance of zero-size and
empty tar files. This led to several changes in the storage backend around the
guarantees of remote file reading, which backs the layer and layer upload type.

In support of these changes, zero-length and empty checks have been added to
the digest package. These provide a sanity check against upstream tarsum
changes. The fileReader has been modified to be more robust when reading and
seeking on zero-length or non-existent files. The file no longer needs to exist
for the reader to be created. Seeks can now move beyond the end of the file,
causing reads to issue an io.EOF. This eliminates errors during certain race
conditions for reading files which should be detected by stat calls. As a part
of this, a few error types were factored out and the read buffer size was
increased to something more reasonable.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2015-02-02 13:01:49 -08:00

235 lines
6.6 KiB
Go

package storage
import (
"fmt"
"io"
"path"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storagedriver"
"github.com/docker/docker/pkg/tarsum"
)
// layerUploadController is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface.
type layerUploadController struct {
layerStore *layerStore
uuid string
startedAt time.Time
fileWriter
}
var _ LayerUpload = &layerUploadController{}
// Name of the repository under which the layer will be linked.
func (luc *layerUploadController) Name() string {
return luc.layerStore.repository.Name()
}
// UUID returns the identifier for this upload.
func (luc *layerUploadController) UUID() string {
return luc.uuid
}
func (luc *layerUploadController) StartedAt() time.Time {
return luc.startedAt
}
// Finish marks the upload as completed, returning a valid handle to the
// uploaded layer. The final size and checksum are validated against the
// contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>.
func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) {
canonical, err := luc.validateLayer(digest)
if err != nil {
return nil, err
}
if err := luc.moveLayer(canonical); err != nil {
// TODO(stevvooe): Cleanup?
return nil, err
}
// Link the layer blob into the repository.
if err := luc.linkLayer(canonical); err != nil {
return nil, err
}
if err := luc.removeResources(); err != nil {
return nil, err
}
return luc.layerStore.Fetch(canonical)
}
// Cancel the layer upload process.
func (luc *layerUploadController) Cancel() error {
if err := luc.removeResources(); err != nil {
return err
}
luc.Close()
return nil
}
// validateLayer checks the layer data against the digest, returning an error
// if it does not match. The canonical digest is returned.
func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) {
// First, check the incoming tarsum version of the digest.
version, err := tarsum.GetVersionFromTarsum(dgst.String())
if err != nil {
return "", err
}
// TODO(stevvooe): Should we push this down into the digest type?
switch version {
case tarsum.Version1:
default:
// version 0 and dev, for now.
return "", ErrLayerInvalidDigest{
Digest: dgst,
Reason: ErrLayerTarSumVersionUnsupported,
}
}
digestVerifier := digest.NewDigestVerifier(dgst)
// TODO(stevvooe): Store resumable hash calculations in upload directory
// in driver. Something like a file at path <uuid>/resumablehash/<offest>
// with the hash state up to that point would be perfect. The hasher would
// then only have to fetch the difference.
// Read the file from the backend driver and validate it.
fr, err := newFileReader(luc.fileWriter.driver, luc.path)
if err != nil {
return "", err
}
tr := io.TeeReader(fr, digestVerifier)
// TODO(stevvooe): This is one of the places we need a Digester write
// sink. Instead, its read driven. This might be okay.
// Calculate an updated digest with the latest version.
canonical, err := digest.FromTarArchive(tr)
if err != nil {
return "", err
}
if !digestVerifier.Verified() {
return "", ErrLayerInvalidDigest{
Digest: dgst,
Reason: fmt.Errorf("content does not match digest"),
}
}
return canonical, nil
}
// moveLayer moves the data into its final, hash-qualified destination,
// identified by dgst. The layer should be validated before commencing the
// move.
func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
blobPath, err := luc.layerStore.repository.registry.pm.path(blobDataPathSpec{
digest: dgst,
})
if err != nil {
return err
}
// Check for existence
if _, err := luc.driver.Stat(blobPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist.
default:
return err
}
} else {
// If the path exists, we can assume that the content has already
// been uploaded, since the blob storage is content-addressable.
// While it may be corrupted, detection of such corruption belongs
// elsewhere.
return nil
}
// If no data was received, we may not actually have a file on disk. Check
// the size here and write a zero-length file to blobPath if this is the
// case. For the most part, this should only ever happen with zero-length
// tars.
if _, err := luc.driver.Stat(luc.path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// HACK(stevvooe): This is slightly dangerous: if we verify above,
// get a hash, then the underlying file is deleted, we risk moving
// a zero-length blob into a nonzero-length blob location. To
// prevent this horrid thing, we employ the hack of only allowing
// to this happen for the zero tarsum.
if dgst == digest.DigestTarSumV1EmptyTar {
return luc.driver.PutContent(blobPath, []byte{})
}
// We let this fail during the move below.
logrus.
WithField("upload.uuid", luc.UUID()).
WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest")
default:
return err // unrelated error
}
}
return luc.driver.Move(luc.path, blobPath)
}
// linkLayer links a valid, written layer blob into the registry under the
// named repository for the upload controller.
func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
layerLinkPath, err := luc.layerStore.repository.registry.pm.path(layerLinkPathSpec{
name: luc.Name(),
digest: digest,
})
if err != nil {
return err
}
return luc.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(digest))
}
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (luc *layerUploadController) removeResources() error {
dataPath, err := luc.layerStore.repository.registry.pm.path(uploadDataPathSpec{
name: luc.Name(),
uuid: luc.uuid,
})
if err != nil {
return err
}
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := luc.driver.Delete(dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!
default:
// This should be uncommon enough such that returning an error
// should be okay. At this point, the upload should be mostly
// complete, but perhaps the backend became unaccessible.
logrus.Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
return err
}
}
return nil
}