distribution/storage/layerstore.go
Stephen J Day ba6b774aea Spool layer uploads to remote storage
To smooth initial implementation, uploads were spooled to local file storage,
validated, then pushed to remote storage. That approach was flawed in that it
present easy clustering of registry services that share a remote storage
backend. The original plan was to implement resumable hashes then implement
remote upload storage. After some thought, it was found to be better to get
remote spooling working, then optimize with resumable hashes.

Moving to this approach has tradeoffs: after storing the complete upload
remotely, the node must fetch the content and validate it before moving it to
the final location. This can double bandwidth usage to the remote backend.
Modifying the verification and upload code to store intermediate hashes should
be trivial once the layer digest format has settled.

The largest changes for users of the storage package (mostly the registry app)
are the LayerService interface and the LayerUpload interface. The LayerService
now takes qualified repository names to start and resume uploads. In corallry,
the concept of LayerUploadState has been complete removed, exposing all aspects
of that state as part of the LayerUpload object. The LayerUpload object has
been modified to work as an io.WriteSeeker and includes a StartedAt time, to
allow for upload timeout policies. Finish now only requires a digest, eliding
the requirement for a size parameter.

Resource cleanup has taken a turn for the better. Resources are cleaned up
after successful uploads and during a cancel call. Admittedly, this is probably
not completely where we want to be. It's recommend that we bolster this with a
periodic driver utility script that scans for partial uploads and deletes the
underlying data. As a small benefit, we can leave these around to better
understand how and why these uploads are failing, at the cost of some extra
disk space.

Many other changes follow from the changes above. The webapp needs to be
updated to meet the new interface requirements.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2015-01-09 14:50:39 -08:00

153 lines
3.5 KiB
Go

package storage
import (
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
)
type layerStore struct {
driver storagedriver.StorageDriver
pathMapper *pathMapper
}
func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) {
// Because this implementation just follows blob links, an existence check
// is pretty cheap by starting and closing a fetch.
_, err := ls.Fetch(name, digest)
if err != nil {
switch err.(type) {
case ErrUnknownLayer:
return false, nil
}
return false, err
}
return true, nil
}
func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) {
blobPath, err := resolveBlobPath(ls.driver, ls.pathMapper, name, digest)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError, *storagedriver.PathNotFoundError:
return nil, ErrUnknownLayer{manifest.FSLayer{BlobSum: digest}}
default:
return nil, err
}
}
fr, err := newFileReader(ls.driver, blobPath)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError, *storagedriver.PathNotFoundError:
return nil, ErrUnknownLayer{manifest.FSLayer{BlobSum: digest}}
default:
return nil, err
}
}
return &layerReader{
fileReader: *fr,
name: name,
digest: digest,
}, nil
}
// Upload begins a layer upload, returning a handle. If the layer upload
// is already in progress or the layer has already been uploaded, this
// will return an error.
func (ls *layerStore) Upload(name string) (LayerUpload, error) {
// NOTE(stevvooe): Consider the issues with allowing concurrent upload of
// the same two layers. Should it be disallowed? For now, we allow both
// parties to proceed and the the first one uploads the layer.
uuid := uuid.New()
startedAt := time.Now().UTC()
path, err := ls.pathMapper.path(uploadDataPathSpec{
name: name,
uuid: uuid,
})
if err != nil {
return nil, err
}
startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{
name: name,
uuid: uuid,
})
if err != nil {
return nil, err
}
// Write a startedat file for this upload
if err := ls.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
return nil, err
}
return ls.newLayerUpload(name, uuid, path, startedAt)
}
// Resume continues an in progress layer upload, returning the current
// state of the upload.
func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) {
startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{
name: name,
uuid: uuid,
})
if err != nil {
return nil, err
}
startedAtBytes, err := ls.driver.GetContent(startedAtPath)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
return nil, ErrLayerUploadUnknown
default:
return nil, err
}
}
startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
if err != nil {
return nil, err
}
path, err := ls.pathMapper.path(uploadDataPathSpec{
name: name,
uuid: uuid,
})
if err != nil {
return nil, err
}
return ls.newLayerUpload(name, uuid, path, startedAt)
}
// newLayerUpload allocates a new upload controller with the given state.
func (ls *layerStore) newLayerUpload(name, uuid, path string, startedAt time.Time) (LayerUpload, error) {
fw, err := newFileWriter(ls.driver, path)
if err != nil {
return nil, err
}
return &layerUploadController{
layerStore: ls,
name: name,
uuid: uuid,
startedAt: startedAt,
fileWriter: *fw,
}, nil
}