distribution/storage/layerupload.go
Brian Bland 07ba5db168 Serializes upload state to an HMAC token for subsequent requests
To support clustered registry, upload UUIDs must be recognizable by
registries that did not issue the UUID. By creating an HMAC verifiable
upload state token, registries can validate upload requests that other
instances authorized. The tokenProvider interface could also use a redis
store or other system for token handling in the future.
2015-01-05 14:27:05 -08:00

419 lines
11 KiB
Go

package storage
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
"github.com/docker/docker/pkg/tarsum"
)
// LayerUploadState captures the state serializable state of the layer upload.
type LayerUploadState struct {
// name is the primary repository under which the layer will be linked.
Name string
// UUID identifies the upload.
UUID string
// offset contains the current progress of the upload.
Offset int64
}
// layerUploadController is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface.
type layerUploadController struct {
LayerUploadState
layerStore *layerStore
uploadStore layerUploadStore
fp layerFile
err error // terminal error, if set, controller is closed
}
// layerFile documents the interface used while writing layer files, similar
// to *os.File. This is separate from layerReader, for now, because we want to
// store uploads on the local file system until we have write-through hashing
// support. They should be combined once this is worked out.
type layerFile interface {
io.WriteSeeker
io.Reader
io.Closer
// Sync commits the contents of the writer to storage.
Sync() (err error)
}
// layerUploadStore provides storage for temporary files and upload state of
// layers. This is be used by the LayerService to manage the state of ongoing
// uploads. This interface will definitely change and will most likely end up
// being exported to the app layer. Move the layer.go when it's ready to go.
type layerUploadStore interface {
New(name string) (LayerUploadState, error)
Open(uuid string) (layerFile, error)
GetState(uuid string) (LayerUploadState, error)
DeleteState(uuid string) error
}
var _ LayerUpload = &layerUploadController{}
// Name of the repository under which the layer will be linked.
func (luc *layerUploadController) Name() string {
return luc.LayerUploadState.Name
}
// UUID returns the identifier for this upload.
func (luc *layerUploadController) UUID() string {
return luc.LayerUploadState.UUID
}
// Offset returns the position of the last byte written to this layer.
func (luc *layerUploadController) Offset() int64 {
return luc.LayerUploadState.Offset
}
// Finish marks the upload as completed, returning a valid handle to the
// uploaded layer. The final size and checksum are validated against the
// contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>.
func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) {
// This section is going to be pretty ugly now. We will have to read the
// file twice. First, to get the tarsum and checksum. When those are
// available, and validated, we will upload it to the blob store and link
// it into the repository. In the future, we need to use resumable hash
// calculations for tarsum and checksum that can be calculated during the
// upload. This will allow us to cut the data directly into a temporary
// directory in the storage backend.
fp, err := luc.file()
if err != nil {
// Cleanup?
return nil, err
}
digest, err = luc.validateLayer(fp, size, digest)
if err != nil {
return nil, err
}
if nn, err := luc.writeLayer(fp, digest); err != nil {
// Cleanup?
return nil, err
} else if size >= 0 && nn != size {
// TODO(stevvooe): Short write. Will have to delete the location and
// report an error. This error needs to be reported to the client.
return nil, fmt.Errorf("short write writing layer")
}
// Yes! We have written some layer data. Let's make it visible. Link the
// layer blob into the repository.
if err := luc.linkLayer(digest); err != nil {
return nil, err
}
// Ok, the upload has completed and finished. Delete the state.
if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil {
// Can we ignore this error?
return nil, err
}
return luc.layerStore.Fetch(luc.Name(), digest)
}
// Cancel the layer upload process.
func (luc *layerUploadController) Cancel() error {
if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil {
return err
}
return luc.Close()
}
func (luc *layerUploadController) Write(p []byte) (int, error) {
wr, err := luc.file()
if err != nil {
return 0, err
}
n, err := wr.Write(p)
// Because we expect the reported offset to be consistent with the storage
// state, unfortunately, we need to Sync on every call to write.
if err := wr.Sync(); err != nil {
// Effectively, ignore the write state if the Sync fails. Report that
// no bytes were written and seek back to the starting offset.
offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET)
if seekErr != nil {
// What do we do here? Quite disasterous.
luc.reset()
return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr)
}
if offset != luc.Offset() {
return 0, fmt.Errorf("unexpected offset after seek")
}
return 0, err
}
luc.LayerUploadState.Offset += int64(n)
return n, err
}
func (luc *layerUploadController) Close() error {
if luc.err != nil {
return luc.err
}
if luc.fp != nil {
luc.err = luc.fp.Close()
}
return luc.err
}
func (luc *layerUploadController) file() (layerFile, error) {
if luc.fp != nil {
return luc.fp, nil
}
fp, err := luc.uploadStore.Open(luc.UUID())
if err != nil {
return nil, err
}
// TODO(stevvooe): We may need a more aggressive check here to ensure that
// the file length is equal to the current offset. We may want to sync the
// offset before return the layer upload to the client so it can be
// validated before proceeding with any writes.
// Seek to the current layer offset for good measure.
if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil {
return nil, err
}
luc.fp = fp
return luc.fp, nil
}
// reset closes and drops the current writer.
func (luc *layerUploadController) reset() {
if luc.fp != nil {
luc.fp.Close()
luc.fp = nil
}
}
// validateLayer runs several checks on the layer file to ensure its validity.
// This is currently very expensive and relies on fast io and fast seek on the
// local host. If successful, the latest digest is returned, which should be
// used over the passed in value.
func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) {
// First, check the incoming tarsum version of the digest.
version, err := tarsum.GetVersionFromTarsum(dgst.String())
if err != nil {
return "", err
}
// TODO(stevvooe): Should we push this down into the digest type?
switch version {
case tarsum.Version1:
default:
// version 0 and dev, for now.
return "", ErrLayerTarSumVersionUnsupported
}
digestVerifier := digest.NewDigestVerifier(dgst)
lengthVerifier := digest.NewLengthVerifier(size)
// First, seek to the end of the file, checking the size is as expected.
end, err := fp.Seek(0, os.SEEK_END)
if err != nil {
return "", err
}
// Only check size if it is greater than
if size >= 0 && end != size {
// Fast path length check.
return "", ErrLayerInvalidSize{Size: size}
}
// Now seek back to start and take care of the digest.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
return "", err
}
tr := io.TeeReader(fp, digestVerifier)
// Only verify the size if a positive size argument has been passed.
if size >= 0 {
tr = io.TeeReader(tr, lengthVerifier)
}
// TODO(stevvooe): This is one of the places we need a Digester write
// sink. Instead, its read driven. This migth be okay.
// Calculate an updated digest with the latest version.
dgst, err = digest.FromReader(tr)
if err != nil {
return "", err
}
if size >= 0 && !lengthVerifier.Verified() {
return "", ErrLayerInvalidSize{Size: size}
}
if !digestVerifier.Verified() {
return "", ErrLayerInvalidDigest{manifest.FSLayer{BlobSum: dgst}}
}
return dgst, nil
}
// writeLayer actually writes the the layer file into its final destination,
// identified by dgst. The layer should be validated before commencing the
// write.
func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) {
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
digest: dgst,
})
if err != nil {
return 0, err
}
// Check for existence
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
// TODO(stevvooe): This check is kind of problematic and very racy.
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist.
default:
// TODO(stevvooe): This isn't actually an error: the blob store is
// content addressable and we should just use this to ensure we
// have it written. Although, we do need to verify that the
// content that is there is the correct length.
return 0, err
}
}
// Seek our local layer file back now.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
// Cleanup?
return 0, err
}
// Okay: we can write the file to the blob store.
return luc.layerStore.driver.WriteStream(blobPath, 0, fp)
}
// linkLayer links a valid, written layer blob into the registry under the
// named repository for the upload controller.
func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{
name: luc.Name(),
digest: digest,
})
if err != nil {
return err
}
return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest))
}
// localFSLayerUploadStore implements a local layerUploadStore. There are some
// complexities around hashsums that make round tripping to the storage
// backend problematic, so we'll store and read locally for now. By GO-beta,
// this should be fully implemented on top of the backend storagedriver.
//
// For now, the directory layout is as follows:
//
// /<temp dir>/registry-layer-upload/
// <uuid>/
// -> state.json
// -> data
//
// Each upload, identified by uuid, has its own directory with a state file
// and a data file. The state file has a json representation of the current
// state. The data file is the in-progress upload data.
type localFSLayerUploadStore struct {
root string
}
func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) {
path, err := ioutil.TempDir("", "registry-layer-upload")
if err != nil {
return nil, err
}
return &localFSLayerUploadStore{
root: path,
}, nil
}
func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) {
lus := LayerUploadState{
Name: name,
UUID: uuid.New(),
}
if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil {
return lus, err
}
return lus, nil
}
func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
return fp, nil
}
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
var lus LayerUploadState
if _, err := os.Stat(llufs.path(uuid, "")); err != nil {
if os.IsNotExist(err) {
return lus, ErrLayerUploadUnknown
}
return lus, err
}
return lus, nil
}
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
if os.IsNotExist(err) {
return ErrLayerUploadUnknown
}
return err
}
return nil
}
func (llufs *localFSLayerUploadStore) path(uuid, file string) string {
return filepath.Join(llufs.root, uuid, file)
}