forked from TrueCloudLab/distribution
497 lines
13 KiB
Go
497 lines
13 KiB
Go
package storage
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
|
|
"code.google.com/p/go-uuid/uuid"
|
|
|
|
"github.com/docker/docker-registry/digest"
|
|
"github.com/docker/docker-registry/storagedriver"
|
|
"github.com/docker/docker/pkg/tarsum"
|
|
|
|
"io"
|
|
)
|
|
|
|
// LayerUploadState captures the state serializable state of the layer upload.
|
|
type LayerUploadState struct {
|
|
// name is the primary repository under which the layer will be linked.
|
|
Name string
|
|
|
|
// UUID identifies the upload.
|
|
UUID string
|
|
|
|
// offset contains the current progress of the upload.
|
|
Offset int64
|
|
}
|
|
|
|
// layerUploadController is used to control the various aspects of resumable
|
|
// layer upload. It implements the LayerUpload interface.
|
|
type layerUploadController struct {
|
|
LayerUploadState
|
|
|
|
layerStore *layerStore
|
|
uploadStore layerUploadStore
|
|
fp layerFile
|
|
err error // terminal error, if set, controller is closed
|
|
}
|
|
|
|
// layerFile documents the interface used while writing layer files, similar
|
|
// to *os.File. This is separate from layerReader, for now, because we want to
|
|
// store uploads on the local file system until we have write-through hashing
|
|
// support. They should be combined once this is worked out.
|
|
type layerFile interface {
|
|
io.WriteSeeker
|
|
io.Reader
|
|
io.Closer
|
|
|
|
// Sync commits the contents of the writer to storage.
|
|
Sync() (err error)
|
|
}
|
|
|
|
// layerUploadStore provides storage for temporary files and upload state of
|
|
// layers. This is be used by the LayerService to manage the state of ongoing
|
|
// uploads. This interface will definitely change and will most likely end up
|
|
// being exported to the app layer. Move the layer.go when it's ready to go.
|
|
type layerUploadStore interface {
|
|
New(name string) (LayerUploadState, error)
|
|
Open(uuid string) (layerFile, error)
|
|
GetState(uuid string) (LayerUploadState, error)
|
|
SaveState(lus LayerUploadState) error
|
|
DeleteState(uuid string) error
|
|
}
|
|
|
|
var _ LayerUpload = &layerUploadController{}
|
|
|
|
// Name of the repository under which the layer will be linked.
|
|
func (luc *layerUploadController) Name() string {
|
|
return luc.LayerUploadState.Name
|
|
}
|
|
|
|
// UUID returns the identifier for this upload.
|
|
func (luc *layerUploadController) UUID() string {
|
|
return luc.LayerUploadState.UUID
|
|
}
|
|
|
|
// Offset returns the position of the last byte written to this layer.
|
|
func (luc *layerUploadController) Offset() int64 {
|
|
return luc.LayerUploadState.Offset
|
|
}
|
|
|
|
// Finish marks the upload as completed, returning a valid handle to the
|
|
// uploaded layer. The final size and checksum are validated against the
|
|
// contents of the uploaded layer. The checksum should be provided in the
|
|
// format <algorithm>:<hex digest>.
|
|
func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) {
|
|
|
|
// This section is going to be pretty ugly now. We will have to read the
|
|
// file twice. First, to get the tarsum and checksum. When those are
|
|
// available, and validated, we will upload it to the blob store and link
|
|
// it into the repository. In the future, we need to use resumable hash
|
|
// calculations for tarsum and checksum that can be calculated during the
|
|
// upload. This will allow us to cut the data directly into a temporary
|
|
// directory in the storage backend.
|
|
|
|
fp, err := luc.file()
|
|
|
|
if err != nil {
|
|
// Cleanup?
|
|
return nil, err
|
|
}
|
|
|
|
digest, err = luc.validateLayer(fp, size, digest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := luc.writeLayer(fp, size, digest); err != nil {
|
|
// Cleanup?
|
|
return nil, err
|
|
}
|
|
|
|
// Yes! We have written some layer data. Let's make it visible. Link the
|
|
// layer blob into the repository.
|
|
if err := luc.linkLayer(digest); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Ok, the upload has completed and finished. Delete the state.
|
|
if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil {
|
|
// Can we ignore this error?
|
|
return nil, err
|
|
}
|
|
|
|
return luc.layerStore.Fetch(luc.Name(), digest)
|
|
}
|
|
|
|
// Cancel the layer upload process.
|
|
func (luc *layerUploadController) Cancel() error {
|
|
if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil {
|
|
return err
|
|
}
|
|
|
|
return luc.Close()
|
|
}
|
|
|
|
func (luc *layerUploadController) Write(p []byte) (int, error) {
|
|
wr, err := luc.file()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
n, err := wr.Write(p)
|
|
|
|
// Because we expect the reported offset to be consistent with the storage
|
|
// state, unfortunately, we need to Sync on every call to write.
|
|
if err := wr.Sync(); err != nil {
|
|
// Effectively, ignore the write state if the Sync fails. Report that
|
|
// no bytes were written and seek back to the starting offset.
|
|
offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET)
|
|
if seekErr != nil {
|
|
// What do we do here? Quite disasterous.
|
|
luc.reset()
|
|
|
|
return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr)
|
|
}
|
|
|
|
if offset != luc.Offset() {
|
|
return 0, fmt.Errorf("unexpected offset after seek")
|
|
}
|
|
|
|
return 0, err
|
|
}
|
|
|
|
luc.LayerUploadState.Offset += int64(n)
|
|
|
|
if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil {
|
|
// TODO(stevvooe): This failure case may require more thought.
|
|
return n, err
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
func (luc *layerUploadController) Close() error {
|
|
if luc.err != nil {
|
|
return luc.err
|
|
}
|
|
|
|
if luc.fp != nil {
|
|
luc.err = luc.fp.Close()
|
|
}
|
|
|
|
return luc.err
|
|
}
|
|
|
|
func (luc *layerUploadController) file() (layerFile, error) {
|
|
if luc.fp != nil {
|
|
return luc.fp, nil
|
|
}
|
|
|
|
fp, err := luc.uploadStore.Open(luc.UUID())
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO(stevvooe): We may need a more aggressive check here to ensure that
|
|
// the file length is equal to the current offset. We may want to sync the
|
|
// offset before return the layer upload to the client so it can be
|
|
// validated before proceeding with any writes.
|
|
|
|
// Seek to the current layer offset for good measure.
|
|
if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
luc.fp = fp
|
|
|
|
return luc.fp, nil
|
|
}
|
|
|
|
// reset closes and drops the current writer.
|
|
func (luc *layerUploadController) reset() {
|
|
if luc.fp != nil {
|
|
luc.fp.Close()
|
|
luc.fp = nil
|
|
}
|
|
}
|
|
|
|
// validateLayer runs several checks on the layer file to ensure its validity.
|
|
// This is currently very expensive and relies on fast io and fast seek on the
|
|
// local host. If successful, the latest digest is returned, which should be
|
|
// used over the passed in value.
|
|
func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) {
|
|
// First, check the incoming tarsum version of the digest.
|
|
version, err := tarsum.GetVersionFromTarsum(dgst.String())
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// TODO(stevvooe): Should we push this down into the digest type?
|
|
switch version {
|
|
case tarsum.Version1:
|
|
default:
|
|
// version 0 and dev, for now.
|
|
return "", ErrLayerTarSumVersionUnsupported
|
|
}
|
|
|
|
digestVerifier := digest.NewDigestVerifier(dgst)
|
|
lengthVerifier := digest.NewLengthVerifier(size)
|
|
|
|
// First, seek to the end of the file, checking the size is as expected.
|
|
end, err := fp.Seek(0, os.SEEK_END)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if end != size {
|
|
// Fast path length check.
|
|
return "", ErrLayerInvalidLength
|
|
}
|
|
|
|
// Now seek back to start and take care of the digest.
|
|
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
tr := io.TeeReader(fp, lengthVerifier)
|
|
tr = io.TeeReader(tr, digestVerifier)
|
|
|
|
// TODO(stevvooe): This is one of the places we need a Digester write
|
|
// sink. Instead, its read driven. This migth be okay.
|
|
|
|
// Calculate an updated digest with the latest version.
|
|
dgst, err = digest.FromReader(tr)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if !lengthVerifier.Verified() {
|
|
return "", ErrLayerInvalidLength
|
|
}
|
|
|
|
if !digestVerifier.Verified() {
|
|
return "", ErrLayerInvalidDigest
|
|
}
|
|
|
|
return dgst, nil
|
|
}
|
|
|
|
// writeLayer actually writes the the layer file into its final destination.
|
|
// The layer should be validated before commencing the write.
|
|
func (luc *layerUploadController) writeLayer(fp layerFile, size int64, digest digest.Digest) error {
|
|
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
|
|
alg: digest.Algorithm(),
|
|
digest: digest.Hex(),
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check for existence
|
|
if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil {
|
|
// TODO(stevvooe): This check is kind of problematic and very racy.
|
|
switch err := err.(type) {
|
|
case storagedriver.PathNotFoundError:
|
|
break // ensure that it doesn't exist.
|
|
default:
|
|
// TODO(stevvooe): This isn't actually an error: the blob store is
|
|
// content addressable and we should just use this to ensure we
|
|
// have it written. Although, we do need to verify that the
|
|
// content that is there is the correct length.
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Seek our local layer file back now.
|
|
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
|
// Cleanup?
|
|
return err
|
|
}
|
|
|
|
// Okay: we can write the file to the blob store.
|
|
if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// linkLayer links a valid, written layer blog into the registry, first
|
|
// linking the repository namespace, then adding it to the layerindex.
|
|
func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
|
|
layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{
|
|
name: luc.Name(),
|
|
digest: digest,
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)); err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Link the layer into the name index.
|
|
layerIndexLinkPath, err := luc.layerStore.pathMapper.path(layerIndexLinkPathSpec{
|
|
digest: digest,
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Read back the name index file. If it exists, create it. If not, add the
|
|
// new repo to the name list.
|
|
|
|
// TODO(stevvooe): This is very racy, as well. Reconsider using list for
|
|
// this operation?
|
|
layerIndexLinkContent, err := luc.layerStore.driver.GetContent(layerIndexLinkPath)
|
|
if err != nil {
|
|
switch err := err.(type) {
|
|
case storagedriver.PathNotFoundError:
|
|
layerIndexLinkContent = []byte(luc.Name())
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
layerIndexLinkContent = luc.maybeAddNameToLayerIndexLinkContent(layerIndexLinkContent)
|
|
|
|
// Write the index content back to the index.
|
|
return luc.layerStore.driver.PutContent(layerIndexLinkPath, layerIndexLinkContent)
|
|
}
|
|
|
|
func (luc *layerUploadController) maybeAddNameToLayerIndexLinkContent(content []byte) []byte {
|
|
names := strings.Split(string(content), "\n")
|
|
var found bool
|
|
// Search the names and find ours
|
|
for _, name := range names {
|
|
if name == luc.Name() {
|
|
found = true
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
names = append(names, luc.Name())
|
|
}
|
|
|
|
sort.Strings(names)
|
|
|
|
return []byte(strings.Join(names, "\n"))
|
|
}
|
|
|
|
// localFSLayerUploadStore implements a local layerUploadStore. There are some
|
|
// complexities around hashsums that make round tripping to the storage
|
|
// backend problematic, so we'll store and read locally for now. By GO-beta,
|
|
// this should be fully implemented on top of the backend storagedriver.
|
|
//
|
|
// For now, the directory layout is as follows:
|
|
//
|
|
// /<temp dir>/registry-layer-upload/
|
|
// <uuid>/
|
|
// -> state.json
|
|
// -> data
|
|
//
|
|
// Each upload, identified by uuid, has its own directory with a state file
|
|
// and a data file. The state file has a json representation of the current
|
|
// state. The data file is the in-progress upload data.
|
|
type localFSLayerUploadStore struct {
|
|
root string
|
|
}
|
|
|
|
func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) {
|
|
path, err := ioutil.TempDir("", "registry-layer-upload")
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &localFSLayerUploadStore{
|
|
root: path,
|
|
}, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) {
|
|
lus := LayerUploadState{
|
|
Name: name,
|
|
UUID: uuid.New(),
|
|
}
|
|
|
|
if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil {
|
|
return lus, err
|
|
}
|
|
|
|
return lus, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
|
|
fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return fp, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
|
|
// TODO(stevvoe): Storing this state on the local file system is an
|
|
// intermediate stop gap. This technique is unlikely to handle any kind of
|
|
// concurrency very well.
|
|
|
|
var lus LayerUploadState
|
|
fp, err := os.Open(llufs.path(uuid, "state.json"))
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return lus, ErrLayerUploadUnknown
|
|
}
|
|
|
|
return lus, err
|
|
}
|
|
defer fp.Close()
|
|
|
|
dec := json.NewDecoder(fp)
|
|
if err := dec.Decode(&lus); err != nil {
|
|
return lus, err
|
|
}
|
|
|
|
return lus, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error {
|
|
p, err := json.Marshal(lus)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644)
|
|
if os.IsNotExist(err) {
|
|
return ErrLayerUploadUnknown
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
|
|
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
|
|
if os.IsNotExist(err) {
|
|
return ErrLayerUploadUnknown
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) path(uuid, file string) string {
|
|
return filepath.Join(llufs.root, uuid, file)
|
|
}
|