forked from TrueCloudLab/distribution
1a508d67d9
Mostly, we've made superficial changes to the storage package to start using the Digest type. Many of the exported interface methods have been changed to reflect this in addition to changes in the way layer uploads will be initiated. Further work here is necessary but will come with a separate PR.
497 lines
13 KiB
Go
497 lines
13 KiB
Go
package storage
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
|
|
"code.google.com/p/go-uuid/uuid"
|
|
|
|
"github.com/docker/docker-registry/digest"
|
|
"github.com/docker/docker-registry/storagedriver"
|
|
"github.com/docker/docker/pkg/tarsum"
|
|
|
|
"io"
|
|
)
|
|
|
|
// LayerUploadState captures the state serializable state of the layer upload.
|
|
type LayerUploadState struct {
|
|
// name is the primary repository under which the layer will be linked.
|
|
Name string
|
|
|
|
// UUID identifies the upload.
|
|
UUID string
|
|
|
|
// offset contains the current progress of the upload.
|
|
Offset int64
|
|
}
|
|
|
|
// layerUploadController is used to control the various aspects of resumable
|
|
// layer upload. It implements the LayerUpload interface.
|
|
type layerUploadController struct {
|
|
LayerUploadState
|
|
|
|
layerStore *layerStore
|
|
uploadStore layerUploadStore
|
|
fp layerFile
|
|
err error // terminal error, if set, controller is closed
|
|
}
|
|
|
|
// layerFile documents the interface used while writing layer files, similar
|
|
// to *os.File. This is separate from layerReader, for now, because we want to
|
|
// store uploads on the local file system until we have write-through hashing
|
|
// support. They should be combined once this is worked out.
|
|
type layerFile interface {
|
|
io.WriteSeeker
|
|
io.Reader
|
|
io.Closer
|
|
|
|
// Sync commits the contents of the writer to storage.
|
|
Sync() (err error)
|
|
}
|
|
|
|
// layerUploadStore provides storage for temporary files and upload state of
|
|
// layers. This is be used by the LayerService to manage the state of ongoing
|
|
// uploads. This interface will definitely change and will most likely end up
|
|
// being exported to the app layer. Move the layer.go when it's ready to go.
|
|
type layerUploadStore interface {
|
|
New(name string) (LayerUploadState, error)
|
|
Open(uuid string) (layerFile, error)
|
|
GetState(uuid string) (LayerUploadState, error)
|
|
SaveState(lus LayerUploadState) error
|
|
DeleteState(uuid string) error
|
|
}
|
|
|
|
var _ LayerUpload = &layerUploadController{}
|
|
|
|
// Name of the repository under which the layer will be linked.
|
|
func (luc *layerUploadController) Name() string {
|
|
return luc.LayerUploadState.Name
|
|
}
|
|
|
|
// UUID returns the identifier for this upload.
|
|
func (luc *layerUploadController) UUID() string {
|
|
return luc.LayerUploadState.UUID
|
|
}
|
|
|
|
// Offset returns the position of the last byte written to this layer.
|
|
func (luc *layerUploadController) Offset() int64 {
|
|
return luc.LayerUploadState.Offset
|
|
}
|
|
|
|
// Finish marks the upload as completed, returning a valid handle to the
|
|
// uploaded layer. The final size and checksum are validated against the
|
|
// contents of the uploaded layer. The checksum should be provided in the
|
|
// format <algorithm>:<hex digest>.
|
|
func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) {
|
|
|
|
// This section is going to be pretty ugly now. We will have to read the
|
|
// file twice. First, to get the tarsum and checksum. When those are
|
|
// available, and validated, we will upload it to the blob store and link
|
|
// it into the repository. In the future, we need to use resumable hash
|
|
// calculations for tarsum and checksum that can be calculated during the
|
|
// upload. This will allow us to cut the data directly into a temporary
|
|
// directory in the storage backend.
|
|
|
|
fp, err := luc.file()
|
|
|
|
if err != nil {
|
|
// Cleanup?
|
|
return nil, err
|
|
}
|
|
|
|
digest, err = luc.validateLayer(fp, size, digest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := luc.writeLayer(fp, size, digest); err != nil {
|
|
// Cleanup?
|
|
return nil, err
|
|
}
|
|
|
|
// Yes! We have written some layer data. Let's make it visible. Link the
|
|
// layer blob into the repository.
|
|
if err := luc.linkLayer(digest); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Ok, the upload has completed and finished. Delete the state.
|
|
if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil {
|
|
// Can we ignore this error?
|
|
return nil, err
|
|
}
|
|
|
|
return luc.layerStore.Fetch(luc.Name(), digest)
|
|
}
|
|
|
|
// Cancel the layer upload process.
|
|
func (luc *layerUploadController) Cancel() error {
|
|
if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil {
|
|
return err
|
|
}
|
|
|
|
return luc.Close()
|
|
}
|
|
|
|
func (luc *layerUploadController) Write(p []byte) (int, error) {
|
|
wr, err := luc.file()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
n, err := wr.Write(p)
|
|
|
|
// Because we expect the reported offset to be consistent with the storage
|
|
// state, unfortunately, we need to Sync on every call to write.
|
|
if err := wr.Sync(); err != nil {
|
|
// Effectively, ignore the write state if the Sync fails. Report that
|
|
// no bytes were written and seek back to the starting offset.
|
|
offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET)
|
|
if seekErr != nil {
|
|
// What do we do here? Quite disasterous.
|
|
luc.reset()
|
|
|
|
return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr)
|
|
}
|
|
|
|
if offset != luc.Offset() {
|
|
return 0, fmt.Errorf("unexpected offset after seek")
|
|
}
|
|
|
|
return 0, err
|
|
}
|
|
|
|
luc.LayerUploadState.Offset += int64(n)
|
|
|
|
if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil {
|
|
// TODO(stevvooe): This failure case may require more thought.
|
|
return n, err
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
func (luc *layerUploadController) Close() error {
|
|
if luc.err != nil {
|
|
return luc.err
|
|
}
|
|
|
|
if luc.fp != nil {
|
|
luc.err = luc.fp.Close()
|
|
}
|
|
|
|
return luc.err
|
|
}
|
|
|
|
func (luc *layerUploadController) file() (layerFile, error) {
|
|
if luc.fp != nil {
|
|
return luc.fp, nil
|
|
}
|
|
|
|
fp, err := luc.uploadStore.Open(luc.UUID())
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO(stevvooe): We may need a more aggressive check here to ensure that
|
|
// the file length is equal to the current offset. We may want to sync the
|
|
// offset before return the layer upload to the client so it can be
|
|
// validated before proceeding with any writes.
|
|
|
|
// Seek to the current layer offset for good measure.
|
|
if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
luc.fp = fp
|
|
|
|
return luc.fp, nil
|
|
}
|
|
|
|
// reset closes and drops the current writer.
|
|
func (luc *layerUploadController) reset() {
|
|
if luc.fp != nil {
|
|
luc.fp.Close()
|
|
luc.fp = nil
|
|
}
|
|
}
|
|
|
|
// validateLayer runs several checks on the layer file to ensure its validity.
|
|
// This is currently very expensive and relies on fast io and fast seek on the
|
|
// local host. If successful, the latest digest is returned, which should be
|
|
// used over the passed in value.
|
|
func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) {
|
|
// First, check the incoming tarsum version of the digest.
|
|
version, err := tarsum.GetVersionFromTarsum(dgst.String())
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// TODO(stevvooe): Should we push this down into the digest type?
|
|
switch version {
|
|
case tarsum.Version1:
|
|
default:
|
|
// version 0 and dev, for now.
|
|
return "", ErrLayerTarSumVersionUnsupported
|
|
}
|
|
|
|
digestVerifier := digest.DigestVerifier(dgst)
|
|
lengthVerifier := digest.LengthVerifier(size)
|
|
|
|
// First, seek to the end of the file, checking the size is as expected.
|
|
end, err := fp.Seek(0, os.SEEK_END)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if end != size {
|
|
// Fast path length check.
|
|
return "", ErrLayerInvalidLength
|
|
}
|
|
|
|
// Now seek back to start and take care of the digest.
|
|
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
tr := io.TeeReader(fp, lengthVerifier)
|
|
tr = io.TeeReader(tr, digestVerifier)
|
|
|
|
// TODO(stevvooe): This is one of the places we need a Digester write
|
|
// sink. Instead, its read driven. This migth be okay.
|
|
|
|
// Calculate an updated digest with the latest version.
|
|
dgst, err = digest.DigestReader(tr)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if !lengthVerifier.Verified() {
|
|
return "", ErrLayerInvalidLength
|
|
}
|
|
|
|
if !digestVerifier.Verified() {
|
|
return "", ErrLayerInvalidDigest
|
|
}
|
|
|
|
return dgst, nil
|
|
}
|
|
|
|
// writeLayer actually writes the the layer file into its final destination.
|
|
// The layer should be validated before commencing the write.
|
|
func (luc *layerUploadController) writeLayer(fp layerFile, size int64, digest digest.Digest) error {
|
|
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
|
|
alg: digest.Algorithm(),
|
|
digest: digest.Hex(),
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check for existence
|
|
if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil {
|
|
// TODO(stevvooe): This check is kind of problematic and very racy.
|
|
switch err := err.(type) {
|
|
case storagedriver.PathNotFoundError:
|
|
break // ensure that it doesn't exist.
|
|
default:
|
|
// TODO(stevvooe): This isn't actually an error: the blob store is
|
|
// content addressable and we should just use this to ensure we
|
|
// have it written. Although, we do need to verify that the
|
|
// content that is there is the correct length.
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Seek our local layer file back now.
|
|
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
|
// Cleanup?
|
|
return err
|
|
}
|
|
|
|
// Okay: we can write the file to the blob store.
|
|
if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// linkLayer links a valid, written layer blog into the registry, first
|
|
// linking the repository namespace, then adding it to the layerindex.
|
|
func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
|
|
layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{
|
|
name: luc.Name(),
|
|
digest: digest,
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)); err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Link the layer into the name index.
|
|
layerIndexLinkPath, err := luc.layerStore.pathMapper.path(layerIndexLinkPathSpec{
|
|
digest: digest,
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Read back the name index file. If it exists, create it. If not, add the
|
|
// new repo to the name list.
|
|
|
|
// TODO(stevvooe): This is very racy, as well. Reconsider using list for
|
|
// this operation?
|
|
layerIndexLinkContent, err := luc.layerStore.driver.GetContent(layerIndexLinkPath)
|
|
if err != nil {
|
|
switch err := err.(type) {
|
|
case storagedriver.PathNotFoundError:
|
|
layerIndexLinkContent = []byte(luc.Name())
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
layerIndexLinkContent = luc.maybeAddNameToLayerIndexLinkContent(layerIndexLinkContent)
|
|
|
|
// Write the index content back to the index.
|
|
return luc.layerStore.driver.PutContent(layerIndexLinkPath, layerIndexLinkContent)
|
|
}
|
|
|
|
func (luc *layerUploadController) maybeAddNameToLayerIndexLinkContent(content []byte) []byte {
|
|
names := strings.Split(string(content), "\n")
|
|
var found bool
|
|
// Search the names and find ours
|
|
for _, name := range names {
|
|
if name == luc.Name() {
|
|
found = true
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
names = append(names, luc.Name())
|
|
}
|
|
|
|
sort.Strings(names)
|
|
|
|
return []byte(strings.Join(names, "\n"))
|
|
}
|
|
|
|
// localFSLayerUploadStore implements a local layerUploadStore. There are some
|
|
// complexities around hashsums that make round tripping to the storage
|
|
// backend problematic, so we'll store and read locally for now. By GO-beta,
|
|
// this should be fully implemented on top of the backend storagedriver.
|
|
//
|
|
// For now, the directory layout is as follows:
|
|
//
|
|
// /<temp dir>/registry-layer-upload/
|
|
// <uuid>/
|
|
// -> state.json
|
|
// -> data
|
|
//
|
|
// Each upload, identified by uuid, has its own directory with a state file
|
|
// and a data file. The state file has a json representation of the current
|
|
// state. The data file is the in-progress upload data.
|
|
type localFSLayerUploadStore struct {
|
|
root string
|
|
}
|
|
|
|
func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) {
|
|
path, err := ioutil.TempDir("", "registry-layer-upload")
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &localFSLayerUploadStore{
|
|
root: path,
|
|
}, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) {
|
|
lus := LayerUploadState{
|
|
Name: name,
|
|
UUID: uuid.New(),
|
|
}
|
|
|
|
if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil {
|
|
return lus, err
|
|
}
|
|
|
|
return lus, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
|
|
fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return fp, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
|
|
// TODO(stevvoe): Storing this state on the local file system is an
|
|
// intermediate stop gap. This technique is unlikely to handle any kind of
|
|
// concurrency very well.
|
|
|
|
var lus LayerUploadState
|
|
fp, err := os.Open(llufs.path(uuid, "state.json"))
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return lus, ErrLayerUploadUnknown
|
|
}
|
|
|
|
return lus, err
|
|
}
|
|
defer fp.Close()
|
|
|
|
dec := json.NewDecoder(fp)
|
|
if err := dec.Decode(&lus); err != nil {
|
|
return lus, err
|
|
}
|
|
|
|
return lus, nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error {
|
|
p, err := json.Marshal(lus)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644)
|
|
if os.IsNotExist(err) {
|
|
return ErrLayerUploadUnknown
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
|
|
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
|
|
if os.IsNotExist(err) {
|
|
return ErrLayerUploadUnknown
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (llufs *localFSLayerUploadStore) path(uuid, file string) string {
|
|
return filepath.Join(llufs.root, uuid, file)
|
|
}
|