Spool layer uploads to remote storage
To smooth initial implementation, uploads were spooled to local file storage, validated, then pushed to remote storage. That approach was flawed in that it present easy clustering of registry services that share a remote storage backend. The original plan was to implement resumable hashes then implement remote upload storage. After some thought, it was found to be better to get remote spooling working, then optimize with resumable hashes. Moving to this approach has tradeoffs: after storing the complete upload remotely, the node must fetch the content and validate it before moving it to the final location. This can double bandwidth usage to the remote backend. Modifying the verification and upload code to store intermediate hashes should be trivial once the layer digest format has settled. The largest changes for users of the storage package (mostly the registry app) are the LayerService interface and the LayerUpload interface. The LayerService now takes qualified repository names to start and resume uploads. In corallry, the concept of LayerUploadState has been complete removed, exposing all aspects of that state as part of the LayerUpload object. The LayerUpload object has been modified to work as an io.WriteSeeker and includes a StartedAt time, to allow for upload timeout policies. Finish now only requires a digest, eliding the requirement for a size parameter. Resource cleanup has taken a turn for the better. Resources are cleaned up after successful uploads and during a cancel call. Admittedly, this is probably not completely where we want to be. It's recommend that we bolster this with a periodic driver utility script that scans for partial uploads and deletes the underlying data. As a small benefit, we can leave these around to better understand how and why these uploads are failing, at the cost of some extra disk space. Many other changes follow from the changes above. The webapp needs to be updated to meet the new interface requirements. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
219bd48c24
commit
ba6b774aea
6 changed files with 181 additions and 353 deletions
|
@ -24,8 +24,7 @@ type Layer interface {
|
||||||
// layers.
|
// layers.
|
||||||
Digest() digest.Digest
|
Digest() digest.Digest
|
||||||
|
|
||||||
// CreatedAt returns the time this layer was created. Until we implement
|
// CreatedAt returns the time this layer was created.
|
||||||
// Stat call on storagedriver, this just returns the zero time.
|
|
||||||
CreatedAt() time.Time
|
CreatedAt() time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,26 +32,22 @@ type Layer interface {
|
||||||
// Instances can be obtained from the LayerService.Upload and
|
// Instances can be obtained from the LayerService.Upload and
|
||||||
// LayerService.Resume.
|
// LayerService.Resume.
|
||||||
type LayerUpload interface {
|
type LayerUpload interface {
|
||||||
io.WriteCloser
|
io.WriteSeeker
|
||||||
|
io.Closer
|
||||||
// UUID returns the identifier for this upload.
|
|
||||||
UUID() string
|
|
||||||
|
|
||||||
// Name of the repository under which the layer will be linked.
|
// Name of the repository under which the layer will be linked.
|
||||||
Name() string
|
Name() string
|
||||||
|
|
||||||
// Offset returns the position of the last byte written to this layer.
|
// UUID returns the identifier for this upload.
|
||||||
Offset() int64
|
UUID() string
|
||||||
|
|
||||||
// TODO(stevvooe): Consider completely removing the size check from this
|
// StartedAt returns the time this layer upload was started.
|
||||||
// interface. The digest check may be adequate and we are making it
|
StartedAt() time.Time
|
||||||
// optional in the HTTP API.
|
|
||||||
|
|
||||||
// Finish marks the upload as completed, returning a valid handle to the
|
// Finish marks the upload as completed, returning a valid handle to the
|
||||||
// uploaded layer. The final size and digest are validated against the
|
// uploaded layer. The digest is validated against the contents of the
|
||||||
// contents of the uploaded layer. If the size is negative, only the
|
// uploaded layer.
|
||||||
// digest will be checked.
|
Finish(digest digest.Digest) (Layer, error)
|
||||||
Finish(size int64, digest digest.Digest) (Layer, error)
|
|
||||||
|
|
||||||
// Cancel the layer upload process.
|
// Cancel the layer upload process.
|
||||||
Cancel() error
|
Cancel() error
|
||||||
|
@ -84,11 +79,11 @@ func (err ErrUnknownLayer) Error() string {
|
||||||
|
|
||||||
// ErrLayerInvalidDigest returned when tarsum check fails.
|
// ErrLayerInvalidDigest returned when tarsum check fails.
|
||||||
type ErrLayerInvalidDigest struct {
|
type ErrLayerInvalidDigest struct {
|
||||||
FSLayer manifest.FSLayer
|
Digest digest.Digest
|
||||||
}
|
}
|
||||||
|
|
||||||
func (err ErrLayerInvalidDigest) Error() string {
|
func (err ErrLayerInvalidDigest) Error() string {
|
||||||
return fmt.Sprintf("invalid digest for referenced layer: %v", err.FSLayer.BlobSum)
|
return fmt.Sprintf("invalid digest for referenced layer: %v", err.Digest)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrLayerInvalidSize returned when length check fails.
|
// ErrLayerInvalidSize returned when length check fails.
|
||||||
|
|
|
@ -26,21 +26,18 @@ func TestSimpleLayerUpload(t *testing.T) {
|
||||||
|
|
||||||
dgst := digest.Digest(tarSumStr)
|
dgst := digest.Digest(tarSumStr)
|
||||||
|
|
||||||
uploadStore, err := newTemporaryLocalFSLayerUploadStore()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error allocating upload store: %v", err)
|
t.Fatalf("error allocating upload store: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
imageName := "foo/bar"
|
imageName := "foo/bar"
|
||||||
driver := inmemory.New()
|
|
||||||
|
|
||||||
ls := &layerStore{
|
ls := &layerStore{
|
||||||
driver: driver,
|
driver: inmemory.New(),
|
||||||
pathMapper: &pathMapper{
|
pathMapper: &pathMapper{
|
||||||
root: "/storage/testing",
|
root: "/storage/testing",
|
||||||
version: storagePathVersion,
|
version: storagePathVersion,
|
||||||
},
|
},
|
||||||
uploadStore: uploadStore,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
h := sha256.New()
|
h := sha256.New()
|
||||||
|
@ -58,7 +55,7 @@ func TestSimpleLayerUpload(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do a resume, get unknown upload
|
// Do a resume, get unknown upload
|
||||||
layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()})
|
layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID())
|
||||||
if err != ErrLayerUploadUnknown {
|
if err != ErrLayerUploadUnknown {
|
||||||
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
|
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -84,26 +81,31 @@ func TestSimpleLayerUpload(t *testing.T) {
|
||||||
t.Fatalf("layer data write incomplete")
|
t.Fatalf("layer data write incomplete")
|
||||||
}
|
}
|
||||||
|
|
||||||
if layerUpload.Offset() != nn {
|
offset, err := layerUpload.Seek(0, os.SEEK_CUR)
|
||||||
t.Fatalf("layerUpload not updated with correct offset: %v != %v", layerUpload.Offset(), nn)
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error seeking layer upload: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset != nn {
|
||||||
|
t.Fatalf("layerUpload not updated with correct offset: %v != %v", offset, nn)
|
||||||
}
|
}
|
||||||
layerUpload.Close()
|
layerUpload.Close()
|
||||||
|
|
||||||
// Do a resume, for good fun
|
// Do a resume, for good fun
|
||||||
layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()})
|
layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error resuming upload: %v", err)
|
t.Fatalf("unexpected error resuming upload: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sha256Digest := digest.NewDigest("sha256", h)
|
sha256Digest := digest.NewDigest("sha256", h)
|
||||||
layer, err := layerUpload.Finish(randomDataSize, dgst)
|
layer, err := layerUpload.Finish(dgst)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error finishing layer upload: %v", err)
|
t.Fatalf("unexpected error finishing layer upload: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// After finishing an upload, it should no longer exist.
|
// After finishing an upload, it should no longer exist.
|
||||||
if _, err := ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}); err != ErrLayerUploadUnknown {
|
if _, err := ls.Resume(layerUpload.Name(), layerUpload.UUID()); err != ErrLayerUploadUnknown {
|
||||||
t.Fatalf("expected layer upload to be unknown, got %v", err)
|
t.Fatalf("expected layer upload to be unknown, got %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,15 +1,17 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.google.com/p/go-uuid/uuid"
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
"github.com/docker/distribution/manifest"
|
"github.com/docker/distribution/manifest"
|
||||||
"github.com/docker/distribution/storagedriver"
|
"github.com/docker/distribution/storagedriver"
|
||||||
)
|
)
|
||||||
|
|
||||||
type layerStore struct {
|
type layerStore struct {
|
||||||
driver storagedriver.StorageDriver
|
driver storagedriver.StorageDriver
|
||||||
pathMapper *pathMapper
|
pathMapper *pathMapper
|
||||||
uploadStore layerUploadStore
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) {
|
func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) {
|
||||||
|
@ -66,31 +68,86 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) {
|
||||||
// the same two layers. Should it be disallowed? For now, we allow both
|
// the same two layers. Should it be disallowed? For now, we allow both
|
||||||
// parties to proceed and the the first one uploads the layer.
|
// parties to proceed and the the first one uploads the layer.
|
||||||
|
|
||||||
lus, err := ls.uploadStore.New(name)
|
uuid := uuid.New()
|
||||||
|
startedAt := time.Now().UTC()
|
||||||
|
|
||||||
|
path, err := ls.pathMapper.path(uploadDataPathSpec{
|
||||||
|
name: name,
|
||||||
|
uuid: uuid,
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return ls.newLayerUpload(lus), nil
|
startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{
|
||||||
|
name: name,
|
||||||
|
uuid: uuid,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write a startedat file for this upload
|
||||||
|
if err := ls.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ls.newLayerUpload(name, uuid, path, startedAt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resume continues an in progress layer upload, returning the current
|
// Resume continues an in progress layer upload, returning the current
|
||||||
// state of the upload.
|
// state of the upload.
|
||||||
func (ls *layerStore) Resume(lus LayerUploadState) (LayerUpload, error) {
|
func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) {
|
||||||
_, err := ls.uploadStore.GetState(lus.UUID)
|
startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{
|
||||||
|
name: name,
|
||||||
|
uuid: uuid,
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return ls.newLayerUpload(lus), nil
|
startedAtBytes, err := ls.driver.GetContent(startedAtPath)
|
||||||
|
if err != nil {
|
||||||
|
switch err := err.(type) {
|
||||||
|
case storagedriver.PathNotFoundError:
|
||||||
|
return nil, ErrLayerUploadUnknown
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
path, err := ls.pathMapper.path(uploadDataPathSpec{
|
||||||
|
name: name,
|
||||||
|
uuid: uuid,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ls.newLayerUpload(name, uuid, path, startedAt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLayerUpload allocates a new upload controller with the given state.
|
// newLayerUpload allocates a new upload controller with the given state.
|
||||||
func (ls *layerStore) newLayerUpload(lus LayerUploadState) LayerUpload {
|
func (ls *layerStore) newLayerUpload(name, uuid, path string, startedAt time.Time) (LayerUpload, error) {
|
||||||
return &layerUploadController{
|
fw, err := newFileWriter(ls.driver, path)
|
||||||
LayerUploadState: lus,
|
if err != nil {
|
||||||
layerStore: ls,
|
return nil, err
|
||||||
uploadStore: ls.uploadStore,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return &layerUploadController{
|
||||||
|
layerStore: ls,
|
||||||
|
name: name,
|
||||||
|
uuid: uuid,
|
||||||
|
startedAt: startedAt,
|
||||||
|
fileWriter: *fw,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,229 +1,84 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"path"
|
||||||
"os"
|
"time"
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"code.google.com/p/go-uuid/uuid"
|
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
"github.com/docker/distribution/manifest"
|
|
||||||
"github.com/docker/distribution/storagedriver"
|
"github.com/docker/distribution/storagedriver"
|
||||||
"github.com/docker/docker/pkg/tarsum"
|
"github.com/docker/docker/pkg/tarsum"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LayerUploadState captures the state serializable state of the layer upload.
|
|
||||||
type LayerUploadState struct {
|
|
||||||
// name is the primary repository under which the layer will be linked.
|
|
||||||
Name string
|
|
||||||
|
|
||||||
// UUID identifies the upload.
|
|
||||||
UUID string
|
|
||||||
|
|
||||||
// offset contains the current progress of the upload.
|
|
||||||
Offset int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// layerUploadController is used to control the various aspects of resumable
|
// layerUploadController is used to control the various aspects of resumable
|
||||||
// layer upload. It implements the LayerUpload interface.
|
// layer upload. It implements the LayerUpload interface.
|
||||||
type layerUploadController struct {
|
type layerUploadController struct {
|
||||||
LayerUploadState
|
layerStore *layerStore
|
||||||
|
|
||||||
layerStore *layerStore
|
name string
|
||||||
uploadStore layerUploadStore
|
uuid string
|
||||||
fp layerFile
|
startedAt time.Time
|
||||||
err error // terminal error, if set, controller is closed
|
|
||||||
}
|
|
||||||
|
|
||||||
// layerFile documents the interface used while writing layer files, similar
|
fileWriter
|
||||||
// to *os.File. This is separate from layerReader, for now, because we want to
|
|
||||||
// store uploads on the local file system until we have write-through hashing
|
|
||||||
// support. They should be combined once this is worked out.
|
|
||||||
type layerFile interface {
|
|
||||||
io.WriteSeeker
|
|
||||||
io.Reader
|
|
||||||
io.Closer
|
|
||||||
|
|
||||||
// Sync commits the contents of the writer to storage.
|
|
||||||
Sync() (err error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// layerUploadStore provides storage for temporary files and upload state of
|
|
||||||
// layers. This is be used by the LayerService to manage the state of ongoing
|
|
||||||
// uploads. This interface will definitely change and will most likely end up
|
|
||||||
// being exported to the app layer. Move the layer.go when it's ready to go.
|
|
||||||
type layerUploadStore interface {
|
|
||||||
New(name string) (LayerUploadState, error)
|
|
||||||
Open(uuid string) (layerFile, error)
|
|
||||||
GetState(uuid string) (LayerUploadState, error)
|
|
||||||
// TODO: factor this method back in
|
|
||||||
// SaveState(lus LayerUploadState) error
|
|
||||||
DeleteState(uuid string) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ LayerUpload = &layerUploadController{}
|
var _ LayerUpload = &layerUploadController{}
|
||||||
|
|
||||||
// Name of the repository under which the layer will be linked.
|
// Name of the repository under which the layer will be linked.
|
||||||
func (luc *layerUploadController) Name() string {
|
func (luc *layerUploadController) Name() string {
|
||||||
return luc.LayerUploadState.Name
|
return luc.name
|
||||||
}
|
}
|
||||||
|
|
||||||
// UUID returns the identifier for this upload.
|
// UUID returns the identifier for this upload.
|
||||||
func (luc *layerUploadController) UUID() string {
|
func (luc *layerUploadController) UUID() string {
|
||||||
return luc.LayerUploadState.UUID
|
return luc.uuid
|
||||||
}
|
}
|
||||||
|
|
||||||
// Offset returns the position of the last byte written to this layer.
|
func (luc *layerUploadController) StartedAt() time.Time {
|
||||||
func (luc *layerUploadController) Offset() int64 {
|
return luc.startedAt
|
||||||
return luc.LayerUploadState.Offset
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finish marks the upload as completed, returning a valid handle to the
|
// Finish marks the upload as completed, returning a valid handle to the
|
||||||
// uploaded layer. The final size and checksum are validated against the
|
// uploaded layer. The final size and checksum are validated against the
|
||||||
// contents of the uploaded layer. The checksum should be provided in the
|
// contents of the uploaded layer. The checksum should be provided in the
|
||||||
// format <algorithm>:<hex digest>.
|
// format <algorithm>:<hex digest>.
|
||||||
func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) {
|
func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) {
|
||||||
|
canonical, err := luc.validateLayer(digest)
|
||||||
// This section is going to be pretty ugly now. We will have to read the
|
|
||||||
// file twice. First, to get the tarsum and checksum. When those are
|
|
||||||
// available, and validated, we will upload it to the blob store and link
|
|
||||||
// it into the repository. In the future, we need to use resumable hash
|
|
||||||
// calculations for tarsum and checksum that can be calculated during the
|
|
||||||
// upload. This will allow us to cut the data directly into a temporary
|
|
||||||
// directory in the storage backend.
|
|
||||||
|
|
||||||
fp, err := luc.file()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
// Cleanup?
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
digest, err = luc.validateLayer(fp, size, digest)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if nn, err := luc.writeLayer(fp, digest); err != nil {
|
if err := luc.moveLayer(canonical); err != nil {
|
||||||
// Cleanup?
|
// TODO(stevvooe): Cleanup?
|
||||||
return nil, err
|
|
||||||
} else if size >= 0 && nn != size {
|
|
||||||
// TODO(stevvooe): Short write. Will have to delete the location and
|
|
||||||
// report an error. This error needs to be reported to the client.
|
|
||||||
return nil, fmt.Errorf("short write writing layer")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Yes! We have written some layer data. Let's make it visible. Link the
|
|
||||||
// layer blob into the repository.
|
|
||||||
if err := luc.linkLayer(digest); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ok, the upload has completed and finished. Delete the state.
|
// Link the layer blob into the repository.
|
||||||
if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil {
|
if err := luc.linkLayer(canonical); err != nil {
|
||||||
// Can we ignore this error?
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return luc.layerStore.Fetch(luc.Name(), digest)
|
if err := luc.removeResources(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return luc.layerStore.Fetch(luc.Name(), canonical)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cancel the layer upload process.
|
// Cancel the layer upload process.
|
||||||
func (luc *layerUploadController) Cancel() error {
|
func (luc *layerUploadController) Cancel() error {
|
||||||
if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil {
|
if err := luc.removeResources(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return luc.Close()
|
luc.Close()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (luc *layerUploadController) Write(p []byte) (int, error) {
|
// validateLayer checks the layer data against the digest, returning an error
|
||||||
wr, err := luc.file()
|
// if it does not match. The canonical digest is returned.
|
||||||
if err != nil {
|
func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) {
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := wr.Write(p)
|
|
||||||
|
|
||||||
// Because we expect the reported offset to be consistent with the storage
|
|
||||||
// state, unfortunately, we need to Sync on every call to write.
|
|
||||||
if err := wr.Sync(); err != nil {
|
|
||||||
// Effectively, ignore the write state if the Sync fails. Report that
|
|
||||||
// no bytes were written and seek back to the starting offset.
|
|
||||||
offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET)
|
|
||||||
if seekErr != nil {
|
|
||||||
// What do we do here? Quite disasterous.
|
|
||||||
luc.reset()
|
|
||||||
|
|
||||||
return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
if offset != luc.Offset() {
|
|
||||||
return 0, fmt.Errorf("unexpected offset after seek")
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
luc.LayerUploadState.Offset += int64(n)
|
|
||||||
|
|
||||||
return n, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (luc *layerUploadController) Close() error {
|
|
||||||
if luc.err != nil {
|
|
||||||
return luc.err
|
|
||||||
}
|
|
||||||
|
|
||||||
if luc.fp != nil {
|
|
||||||
luc.err = luc.fp.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
return luc.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (luc *layerUploadController) file() (layerFile, error) {
|
|
||||||
if luc.fp != nil {
|
|
||||||
return luc.fp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
fp, err := luc.uploadStore.Open(luc.UUID())
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(stevvooe): We may need a more aggressive check here to ensure that
|
|
||||||
// the file length is equal to the current offset. We may want to sync the
|
|
||||||
// offset before return the layer upload to the client so it can be
|
|
||||||
// validated before proceeding with any writes.
|
|
||||||
|
|
||||||
// Seek to the current layer offset for good measure.
|
|
||||||
if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
luc.fp = fp
|
|
||||||
|
|
||||||
return luc.fp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// reset closes and drops the current writer.
|
|
||||||
func (luc *layerUploadController) reset() {
|
|
||||||
if luc.fp != nil {
|
|
||||||
luc.fp.Close()
|
|
||||||
luc.fp = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// validateLayer runs several checks on the layer file to ensure its validity.
|
|
||||||
// This is currently very expensive and relies on fast io and fast seek on the
|
|
||||||
// local host. If successful, the latest digest is returned, which should be
|
|
||||||
// used over the passed in value.
|
|
||||||
func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) {
|
|
||||||
// First, check the incoming tarsum version of the digest.
|
// First, check the incoming tarsum version of the digest.
|
||||||
version, err := tarsum.GetVersionFromTarsum(dgst.String())
|
version, err := tarsum.GetVersionFromTarsum(dgst.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -239,87 +94,65 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d
|
||||||
}
|
}
|
||||||
|
|
||||||
digestVerifier := digest.NewDigestVerifier(dgst)
|
digestVerifier := digest.NewDigestVerifier(dgst)
|
||||||
lengthVerifier := digest.NewLengthVerifier(size)
|
|
||||||
|
|
||||||
// First, seek to the end of the file, checking the size is as expected.
|
// TODO(stevvooe): Store resumable hash calculations in upload directory
|
||||||
end, err := fp.Seek(0, os.SEEK_END)
|
// in driver. Something like a file at path <uuid>/resumablehash/<offest>
|
||||||
|
// with the hash state up to that point would be perfect. The hasher would
|
||||||
|
// then only have to fetch the difference.
|
||||||
|
|
||||||
|
// Read the file from the backend driver and validate it.
|
||||||
|
fr, err := newFileReader(luc.fileWriter.driver, luc.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only check size if it is greater than
|
tr := io.TeeReader(fr, digestVerifier)
|
||||||
if size >= 0 && end != size {
|
|
||||||
// Fast path length check.
|
|
||||||
return "", ErrLayerInvalidSize{Size: size}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now seek back to start and take care of the digest.
|
|
||||||
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
tr := io.TeeReader(fp, digestVerifier)
|
|
||||||
|
|
||||||
// Only verify the size if a positive size argument has been passed.
|
|
||||||
if size >= 0 {
|
|
||||||
tr = io.TeeReader(tr, lengthVerifier)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(stevvooe): This is one of the places we need a Digester write
|
// TODO(stevvooe): This is one of the places we need a Digester write
|
||||||
// sink. Instead, its read driven. This migth be okay.
|
// sink. Instead, its read driven. This might be okay.
|
||||||
|
|
||||||
// Calculate an updated digest with the latest version.
|
// Calculate an updated digest with the latest version.
|
||||||
dgst, err = digest.FromReader(tr)
|
canonical, err := digest.FromReader(tr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
if size >= 0 && !lengthVerifier.Verified() {
|
|
||||||
return "", ErrLayerInvalidSize{Size: size}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !digestVerifier.Verified() {
|
if !digestVerifier.Verified() {
|
||||||
return "", ErrLayerInvalidDigest{manifest.FSLayer{BlobSum: dgst}}
|
return "", ErrLayerInvalidDigest{Digest: dgst}
|
||||||
}
|
}
|
||||||
|
|
||||||
return dgst, nil
|
return canonical, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeLayer actually writes the the layer file into its final destination,
|
// moveLayer moves the data into its final, hash-qualified destination,
|
||||||
// identified by dgst. The layer should be validated before commencing the
|
// identified by dgst. The layer should be validated before commencing the
|
||||||
// write.
|
// move.
|
||||||
func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) {
|
func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
|
||||||
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
|
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
|
||||||
digest: dgst,
|
digest: dgst,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for existence
|
// Check for existence
|
||||||
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
|
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
|
||||||
// TODO(stevvooe): This check is kind of problematic and very racy.
|
|
||||||
switch err := err.(type) {
|
switch err := err.(type) {
|
||||||
case storagedriver.PathNotFoundError:
|
case storagedriver.PathNotFoundError:
|
||||||
break // ensure that it doesn't exist.
|
break // ensure that it doesn't exist.
|
||||||
default:
|
default:
|
||||||
// TODO(stevvooe): This isn't actually an error: the blob store is
|
return err
|
||||||
// content addressable and we should just use this to ensure we
|
|
||||||
// have it written. Although, we do need to verify that the
|
|
||||||
// content that is there is the correct length.
|
|
||||||
return 0, err
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// If the path exists, we can assume that the content has already
|
||||||
|
// been uploaded, since the blob storage is content-addressable.
|
||||||
|
// While it may be corrupted, detection of such corruption belongs
|
||||||
|
// elsewhere.
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek our local layer file back now.
|
return luc.driver.Move(luc.path, blobPath)
|
||||||
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
|
||||||
// Cleanup?
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Okay: we can write the file to the blob store.
|
|
||||||
return luc.layerStore.driver.WriteStream(blobPath, 0, fp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// linkLayer links a valid, written layer blob into the registry under the
|
// linkLayer links a valid, written layer blob into the registry under the
|
||||||
|
@ -337,85 +170,35 @@ func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
|
||||||
return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest))
|
return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest))
|
||||||
}
|
}
|
||||||
|
|
||||||
// localFSLayerUploadStore implements a local layerUploadStore. There are some
|
// removeResources should clean up all resources associated with the upload
|
||||||
// complexities around hashsums that make round tripping to the storage
|
// instance. An error will be returned if the clean up cannot proceed. If the
|
||||||
// backend problematic, so we'll store and read locally for now. By GO-beta,
|
// resources are already not present, no error will be returned.
|
||||||
// this should be fully implemented on top of the backend storagedriver.
|
func (luc *layerUploadController) removeResources() error {
|
||||||
//
|
dataPath, err := luc.layerStore.pathMapper.path(uploadDataPathSpec{
|
||||||
// For now, the directory layout is as follows:
|
name: luc.name,
|
||||||
//
|
uuid: luc.uuid,
|
||||||
// /<temp dir>/registry-layer-upload/
|
})
|
||||||
// <uuid>/
|
|
||||||
// -> state.json
|
|
||||||
// -> data
|
|
||||||
//
|
|
||||||
// Each upload, identified by uuid, has its own directory with a state file
|
|
||||||
// and a data file. The state file has a json representation of the current
|
|
||||||
// state. The data file is the in-progress upload data.
|
|
||||||
type localFSLayerUploadStore struct {
|
|
||||||
root string
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) {
|
|
||||||
path, err := ioutil.TempDir("", "registry-layer-upload")
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &localFSLayerUploadStore{
|
|
||||||
root: path,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) {
|
|
||||||
lus := LayerUploadState{
|
|
||||||
Name: name,
|
|
||||||
UUID: uuid.New(),
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil {
|
|
||||||
return lus, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return lus, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
|
|
||||||
fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return fp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
|
|
||||||
var lus LayerUploadState
|
|
||||||
|
|
||||||
if _, err := os.Stat(llufs.path(uuid, "")); err != nil {
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return lus, ErrLayerUploadUnknown
|
|
||||||
}
|
|
||||||
|
|
||||||
return lus, err
|
|
||||||
}
|
|
||||||
return lus, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
|
|
||||||
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return ErrLayerUploadUnknown
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Resolve and delete the containing directory, which should include any
|
||||||
|
// upload related files.
|
||||||
|
dirPath := path.Dir(dataPath)
|
||||||
|
|
||||||
|
if err := luc.driver.Delete(dirPath); err != nil {
|
||||||
|
switch err := err.(type) {
|
||||||
|
case storagedriver.PathNotFoundError:
|
||||||
|
break // already gone!
|
||||||
|
default:
|
||||||
|
// This should be uncommon enough such that returning an error
|
||||||
|
// should be okay. At this point, the upload should be mostly
|
||||||
|
// complete, but perhaps the backend became unaccessible.
|
||||||
|
logrus.Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (llufs *localFSLayerUploadStore) path(uuid, file string) string {
|
|
||||||
return filepath.Join(llufs.root, uuid, file)
|
|
||||||
}
|
|
||||||
|
|
|
@ -153,6 +153,6 @@ func (mockedExistenceLayerService) Upload(name string) (LayerUpload, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mockedExistenceLayerService) Resume(lus LayerUploadState) (LayerUpload, error) {
|
func (mockedExistenceLayerService) Resume(name, uuid string) (LayerUpload, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,28 +9,18 @@ import (
|
||||||
// Services provides various services with application-level operations for
|
// Services provides various services with application-level operations for
|
||||||
// use across backend storage drivers.
|
// use across backend storage drivers.
|
||||||
type Services struct {
|
type Services struct {
|
||||||
driver storagedriver.StorageDriver
|
driver storagedriver.StorageDriver
|
||||||
pathMapper *pathMapper
|
pathMapper *pathMapper
|
||||||
layerUploadStore layerUploadStore
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServices creates a new Services object to access docker objects stored
|
// NewServices creates a new Services object to access docker objects stored
|
||||||
// in the underlying driver.
|
// in the underlying driver.
|
||||||
func NewServices(driver storagedriver.StorageDriver) *Services {
|
func NewServices(driver storagedriver.StorageDriver) *Services {
|
||||||
layerUploadStore, err := newTemporaryLocalFSLayerUploadStore()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
// TODO(stevvooe): This failure needs to be understood in the context
|
|
||||||
// of the lifecycle of the services object, which is uncertain at this
|
|
||||||
// point.
|
|
||||||
panic("unable to allocate layerUploadStore: " + err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Services{
|
return &Services{
|
||||||
driver: driver,
|
driver: driver,
|
||||||
// TODO(sday): This should be configurable.
|
// TODO(sday): This should be configurable.
|
||||||
pathMapper: defaultPathMapper,
|
pathMapper: defaultPathMapper,
|
||||||
layerUploadStore: layerUploadStore,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,7 +28,7 @@ func NewServices(driver storagedriver.StorageDriver) *Services {
|
||||||
// may be context sensitive in the future. The instance should be used similar
|
// may be context sensitive in the future. The instance should be used similar
|
||||||
// to a request local.
|
// to a request local.
|
||||||
func (ss *Services) Layers() LayerService {
|
func (ss *Services) Layers() LayerService {
|
||||||
return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper, uploadStore: ss.layerUploadStore}
|
return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Manifests returns an instance of ManifestService. Instantiation is cheap and
|
// Manifests returns an instance of ManifestService. Instantiation is cheap and
|
||||||
|
@ -78,7 +68,8 @@ type LayerService interface {
|
||||||
// returning a handle.
|
// returning a handle.
|
||||||
Upload(name string) (LayerUpload, error)
|
Upload(name string) (LayerUpload, error)
|
||||||
|
|
||||||
// Resume continues an in progress layer upload, returning the current
|
// Resume continues an in progress layer upload, returning a handle to the
|
||||||
// state of the upload.
|
// upload. The caller should seek to the latest desired upload location
|
||||||
Resume(layerUploadState LayerUploadState) (LayerUpload, error)
|
// before proceeding.
|
||||||
|
Resume(name, uuid string) (LayerUpload, error)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue