a4024b2f90
Because manifests and their signatures are a discrete component of the registry, we are moving the definitions into a separate package. This causes us to lose some test coverage, but we can fill this in shortly. No changes have been made to the external interfaces, but they are likely to come. Signed-off-by: Stephen J Day <stephen.day@docker.com>
456 lines
12 KiB
Go
456 lines
12 KiB
Go
package storage
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
|
|
"code.google.com/p/go-uuid/uuid"
|
|
|
|
"github.com/docker/distribution/digest"
|
|
"github.com/docker/distribution/manifest"
|
|
"github.com/docker/distribution/storagedriver"
|
|
"github.com/docker/docker/pkg/tarsum"
|
|
|
|
"io"
|
|
)
|
|
|
|
// LayerUploadState captures the state serializable state of the layer upload.
|
|
type LayerUploadState struct {
|
|
// name is the primary repository under which the layer will be linked.
|
|
Name string
|
|
|
|
// UUID identifies the upload.
|
|
UUID string
|
|
|
|
// offset contains the current progress of the upload.
|
|
Offset int64
|
|
}
|
|
|
|
// layerUploadController is used to control the various aspects of resumable
|
|
// layer upload. It implements the LayerUpload interface.
|
|
type layerUploadController struct {
|
|
LayerUploadState
|
|
|
|
layerStore *layerStore
|
|
uploadStore layerUploadStore
|
|
fp layerFile
|
|
err error // terminal error, if set, controller is closed
|
|
}
|
|
|
|
// layerFile documents the interface used while writing layer files, similar
|
|
// to *os.File. This is separate from layerReader, for now, because we want to
|
|
// store uploads on the local file system until we have write-through hashing
|
|
// support. They should be combined once this is worked out.
|
|
type layerFile interface {
|
|
io.WriteSeeker
|
|
io.Reader
|
|
io.Closer
|
|
|
|
// Sync commits the contents of the writer to storage.
|
|
Sync() (err error)
|
|
}
|
|
|
|
// layerUploadStore provides storage for temporary files and upload state of
|
|
// layers. This is be used by the LayerService to manage the state of ongoing
|
|
// uploads. This interface will definitely change and will most likely end up
|
|
// being exported to the app layer. Move the layer.go when it's ready to go.
|
|
type layerUploadStore interface {
|
|
New(name string) (LayerUploadState, error)
|
|
Open(uuid string) (layerFile, error)
|
|
GetState(uuid string) (LayerUploadState, error)
|
|
SaveState(lus LayerUploadState) error
|
|
DeleteState(uuid string) error
|
|
}
|
|
|
|
var _ LayerUpload = &layerUploadController{}
|
|
|
|
// Name of the repository under which the layer will be linked.
|
|
func (luc *layerUploadController) Name() string {
|
|
return luc.LayerUploadState.Name
|
|
}
|
|
|
|
// UUID returns the identifier for this upload.
|
|
func (luc *layerUploadController) UUID() string {
|
|
return luc.LayerUploadState.UUID
|
|
}
|
|
|
|
// Offset returns the position of the last byte written to this layer.
|
|
func (luc *layerUploadController) Offset() int64 {
|
|
return luc.LayerUploadState.Offset
|
|
}
|
|
|
|
// Finish marks the upload as completed, returning a valid handle to the
|
|
// uploaded layer. The final size and checksum are validated against the
|
|
// contents of the uploaded layer. The checksum should be provided in the
|
|
// format <algorithm>:<hex digest>.
|
|
func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) {
|
|
|
|
// This section is going to be pretty ugly now. We will have to read the
|
|
// file twice. First, to get the tarsum and checksum. When those are
|
|
// available, and validated, we will upload it to the blob store and link
|
|
// it into the repository. In the future, we need to use resumable hash
|
|
// calculations for tarsum and checksum that can be calculated during the
|
|
// upload. This will allow us to cut the data directly into a temporary
|
|
// directory in the storage backend.
|
|
|
|
fp, err := luc.file()
|
|
|
|
if err != nil {
|
|
// Cleanup?
|
|
return nil, err
|
|
}
|
|
|
|
digest, err = luc.validateLayer(fp, size, digest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if nn, err := luc.writeLayer(fp, digest); err != nil {
|
|
// Cleanup?
|
|
return nil, err
|
|
} else if size >= 0 && nn != size {
|
|
// TODO(stevvooe): Short write. Will have to delete the location and
|
|
// report an error. This error needs to be reported to the client.
|
|
return nil, fmt.Errorf("short write writing layer")
|
|
}
|
|
|
|
// Yes! We have written some layer data. Let's make it visible. Link the
|
|
// layer blob into the repository.
|
|
if err := luc.linkLayer(digest); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Ok, the upload has completed and finished. Delete the state.
|
|
if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil {
|
|
// Can we ignore this error?
|
|
return nil, err
|
|
}
|
|
|
|
return luc.layerStore.Fetch(luc.Name(), digest)
|
|
}
|
|
|
|
// Cancel the layer upload process.
|
|
func (luc *layerUploadController) Cancel() error {
|
|
if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil {
|
|
return err
|
|
}
|
|
|
|
return luc.Close()
|
|
}
|
|
|
|
func (luc *layerUploadController) Write(p []byte) (int, error) {
|
|
wr, err := luc.file()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
n, err := wr.Write(p)
|
|
|
|
// Because we expect the reported offset to be consistent with the storage
|
|
// state, unfortunately, we need to Sync on every call to write.
|
|
if err := wr.Sync(); err != nil {
|
|
// Effectively, ignore the write state if the Sync fails. Report that
|
|
// no bytes were written and seek back to the starting offset.
|
|
offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET)
|
|
if seekErr != nil {
|
|
// What do we do here? Quite disasterous.
|
|
luc.reset()
|
|
|
|
return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr)
|
|
}
|
|
|
|
if offset != luc.Offset() {
|
|
return 0, fmt.Errorf("unexpected offset after seek")
|
|
}
|
|
|
|
return 0, err
|
|
}
|
|
|
|
luc.LayerUploadState.Offset += int64(n)
|
|
|
|
if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil {
|
|
// TODO(stevvooe): This failure case may require more thought.
|
|
return n, err
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
func (luc *layerUploadController) Close() error {
|
|
if luc.err != nil {
|
|
return luc.err
|
|
}
|
|
|
|
if luc.fp != nil {
|
|
luc.err = luc.fp.Close()
|
|
}
|
|
|
|
return luc.err
|
|
}
|
|
|
|
func (luc *layerUploadController) file() (layerFile, error) {
|
|
if luc.fp != nil {
|
|
return luc.fp, nil
|
|
}
|
|
|
|
fp, err := luc.uploadStore.Open(luc.UUID())
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO(stevvooe): We may need a more aggressive check here to ensure that
|
|
// the file length is equal to the current offset. We may want to sync the
|
|
// offset before return the layer upload to the client so it can be
|
|
// validated before proceeding with any writes.
|
|
|
|
// Seek to the current layer offset for good measure.
|
|
if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
luc.fp = fp
|
|
|
|
return luc.fp, nil
|
|
}
|
|
|
|
// reset closes and drops the current writer.
|
|
func (luc *layerUploadController) reset() {
|
|
if luc.fp != nil {
|
|
luc.fp.Close()
|
|
luc.fp = nil
|
|
}
|
|
}
|
|
|
|
// validateLayer runs several checks on the layer file to ensure its validity.
|
|
// This is currently very expensive and relies on fast io and fast seek on the
|
|
// local host. If successful, the latest digest is returned, which should be
|
|
// used over the passed in value.
|
|
func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) {
|
|
// First, check the incoming tarsum version of the digest.
|
|
version, err := tarsum.GetVersionFromTarsum(dgst.String())
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// TODO(stevvooe): Should we push this down into the digest type?
|
|
switch version {
|
|
case tarsum.Version1:
|
|
default:
|
|
// version 0 and dev, for now.
|
|
return "", ErrLayerTarSumVersionUnsupported
|
|
}
|
|
|
|
digestVerifier := digest.NewDigestVerifier(dgst)
|
|
lengthVerifier := digest.NewLengthVerifier(size)
|
|
|
|
// First, seek to the end of the file, checking the size is as expected.
|
|
end, err := fp.Seek(0, os.SEEK_END)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Only check size if it is greater than
|
|
if size >= 0 && end != size {
|
|
// Fast path length check.
|
|
return "", ErrLayerInvalidSize{Size: size}
|
|
}
|
|
|
|
// Now seek back to start and take care of the digest.
|
|
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
tr := io.TeeReader(fp, digestVerifier)
|
|
|
|
// Only verify the size if a positive size argument has been passed.
|
|
if size >= 0 {
|
|
tr = io.TeeReader(tr, lengthVerifier)
|
|
}
|
|
|
|
// TODO(stevvooe): This is one of the places we need a Digester write
|
|
// sink. Instead, its read driven. This migth be okay.
|
|
|
|
// Calculate an updated digest with the latest version.
|
|
dgst, err = digest.FromReader(tr)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if size >= 0 && !lengthVerifier.Verified() {
|
|
return "", ErrLayerInvalidSize{Size: size}
|
|
}
|
|
|
|
if !digestVerifier.Verified() {
|
|
return "", ErrLayerInvalidDigest{manifest.FSLayer{BlobSum: dgst}}
|
|
}
|
|
|
|
return dgst, nil
|
|
}
|
|
|
|
// writeLayer actually writes the the layer file into its final destination,
|
|
// identified by dgst. The layer should be validated before commencing the
|
|
// write.
|
|
func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) {
|
|
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
|
|
digest: dgst,
|
|
})
|
|
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Check for existence
|
|
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
|
|
// TODO(stevvooe): This check is kind of problematic and very racy.
|
|
switch err := err.(type) {
|
|
case storagedriver.PathNotFoundError:
|
|
break // ensure that it doesn't exist.
|
|
default:
|
|
// TODO(stevvooe): This isn't actually an error: the blob store is
|
|
// content addressable and we should just use this to ensure we
|
|
// have it written. Although, we do need to verify that the
|
|
// content that is there is the correct length.
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
// Seek our local layer file back now.
|
|
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
|
// Cleanup?
|
|
return 0, err
|
|
}
|
|
|
|
// Okay: we can write the file to the blob store.
|
|
return luc.layerStore.driver.WriteStream(blobPath, 0, fp)
|
|
}
|
|
|
|
// linkLayer links a valid, written layer blob into the registry under the
|
|
// named repository for the upload controller.
|
|
func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
|
|
layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{
|
|
name: luc.Name(),
|
|
digest: digest,
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest))
|
|
}
|
|
|
|
// localFSLayerUploadStore implements a local layerUploadStore. There are some
|
|
// complexities around hashsums that make round tripping to the storage
|
|
// backend problematic, so we'll store and read locally for now. By GO-beta,
|
|
// this should be fully implemented on top of the backend storagedriver.
|
|
//
|
|
// For now, the directory layout is as follows:
|
|
//
|
|
// /<temp dir>/registry-layer-upload/
|
|
// <uuid>/
|
|
// -> state.json
|
|
// -> data
|
|
//
|
|
// Each upload, identified by uuid, has its own directory with a state file
|
|
// and a data file. The state file has a json representation of the current
|
|
// state. The data file is the in-progress upload data.
|
|
type localFSLayerUploadStore struct {
|
|
root string
|
|
}
|
|
|
|
func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) {
|
|
path, err := ioutil.TempDir("", "registry-layer-upload")
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &localFSLayerUploadStore{
|
|
root: path,
|
|
}, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) {
|
|
lus := LayerUploadState{
|
|
Name: name,
|
|
UUID: uuid.New(),
|
|
}
|
|
|
|
if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil {
|
|
return lus, err
|
|
}
|
|
|
|
if err := llufs.SaveState(lus); err != nil {
|
|
return lus, err
|
|
}
|
|
|
|
return lus, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
|
|
fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return fp, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
|
|
// TODO(stevvoe): Storing this state on the local file system is an
|
|
// intermediate stop gap. This technique is unlikely to handle any kind of
|
|
// concurrency very well.
|
|
|
|
var lus LayerUploadState
|
|
fp, err := os.Open(llufs.path(uuid, "state.json"))
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return lus, ErrLayerUploadUnknown
|
|
}
|
|
|
|
return lus, err
|
|
}
|
|
defer fp.Close()
|
|
|
|
dec := json.NewDecoder(fp)
|
|
if err := dec.Decode(&lus); err != nil {
|
|
return lus, err
|
|
}
|
|
|
|
return lus, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error {
|
|
p, err := json.Marshal(lus)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644)
|
|
if os.IsNotExist(err) {
|
|
return ErrLayerUploadUnknown
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
|
|
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
|
|
if os.IsNotExist(err) {
|
|
return ErrLayerUploadUnknown
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) path(uuid, file string) string {
|
|
return filepath.Join(llufs.root, uuid, file)
|
|
}
|