Merge pull request #295 from jlhawn/use_resumable_digest
digest, registry/storage: use resumable digest
This commit is contained in:
commit
dff57726f9
3 changed files with 226 additions and 25 deletions
|
@ -142,6 +142,7 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di
|
||||||
layerStore: ls,
|
layerStore: ls,
|
||||||
uuid: uuid,
|
uuid: uuid,
|
||||||
startedAt: startedAt,
|
startedAt: startedAt,
|
||||||
|
resumableDigester: digest.NewCanonicalResumableDigester(),
|
||||||
bufferedFileWriter: *fw,
|
bufferedFileWriter: *fw,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,9 @@ package storage
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
|
@ -20,10 +22,11 @@ var _ distribution.LayerUpload = &layerWriter{}
|
||||||
type layerWriter struct {
|
type layerWriter struct {
|
||||||
layerStore *layerStore
|
layerStore *layerStore
|
||||||
|
|
||||||
uuid string
|
uuid string
|
||||||
startedAt time.Time
|
startedAt time.Time
|
||||||
|
resumableDigester digest.ResumableDigester
|
||||||
|
|
||||||
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisy
|
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
|
||||||
// LayerUpload Interface
|
// LayerUpload Interface
|
||||||
bufferedFileWriter
|
bufferedFileWriter
|
||||||
}
|
}
|
||||||
|
@ -83,37 +86,212 @@ func (lw *layerWriter) Cancel() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (lw *layerWriter) Write(p []byte) (int, error) {
|
||||||
|
// Ensure that the current write offset matches how many bytes have been
|
||||||
|
// written to the digester. If not, we need to update the digest state to
|
||||||
|
// match the current write position.
|
||||||
|
if err := lw.resumeHashAt(lw.offset); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return io.MultiWriter(&lw.bufferedFileWriter, lw.resumableDigester).Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lw *layerWriter) ReadFrom(r io.Reader) (n int64, err error) {
|
||||||
|
// Ensure that the current write offset matches how many bytes have been
|
||||||
|
// written to the digester. If not, we need to update the digest state to
|
||||||
|
// match the current write position.
|
||||||
|
if err := lw.resumeHashAt(lw.offset); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return lw.bufferedFileWriter.ReadFrom(io.TeeReader(r, lw.resumableDigester))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lw *layerWriter) Close() error {
|
||||||
|
if err := lw.storeHashState(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return lw.bufferedFileWriter.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
type hashStateEntry struct {
|
||||||
|
offset int64
|
||||||
|
path string
|
||||||
|
}
|
||||||
|
|
||||||
|
// getStoredHashStates returns a slice of hashStateEntries for this upload.
|
||||||
|
func (lw *layerWriter) getStoredHashStates() ([]hashStateEntry, error) {
|
||||||
|
uploadHashStatePathPrefix, err := lw.layerStore.repository.registry.pm.path(uploadHashStatePathSpec{
|
||||||
|
name: lw.layerStore.repository.Name(),
|
||||||
|
uuid: lw.uuid,
|
||||||
|
alg: lw.resumableDigester.Digest().Algorithm(),
|
||||||
|
list: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
paths, err := lw.driver.List(uploadHashStatePathPrefix)
|
||||||
|
if err != nil {
|
||||||
|
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Treat PathNotFoundError as no entries.
|
||||||
|
paths = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
hashStateEntries := make([]hashStateEntry, 0, len(paths))
|
||||||
|
|
||||||
|
for _, p := range paths {
|
||||||
|
pathSuffix := path.Base(p)
|
||||||
|
// The suffix should be the offset.
|
||||||
|
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
|
||||||
|
}
|
||||||
|
|
||||||
|
return hashStateEntries, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// resumeHashAt attempts to restore the state of the internal hash function
|
||||||
|
// by loading the most recent saved hash state less than or equal to the given
|
||||||
|
// offset. Any unhashed bytes remaining less than the given offset are hashed
|
||||||
|
// from the content uploaded so far.
|
||||||
|
func (lw *layerWriter) resumeHashAt(offset int64) error {
|
||||||
|
if offset < 0 {
|
||||||
|
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset == int64(lw.resumableDigester.Len()) {
|
||||||
|
// State of digester is already at the requseted offset.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List hash states from storage backend.
|
||||||
|
var hashStateMatch hashStateEntry
|
||||||
|
hashStates, err := lw.getStoredHashStates()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the highest stored hashState with offset less than or equal to
|
||||||
|
// the requested offset.
|
||||||
|
for _, hashState := range hashStates {
|
||||||
|
if hashState.offset == offset {
|
||||||
|
hashStateMatch = hashState
|
||||||
|
break // Found an exact offset match.
|
||||||
|
} else if hashState.offset < offset && hashState.offset > hashStateMatch.offset {
|
||||||
|
// This offset is closer to the requested offset.
|
||||||
|
hashStateMatch = hashState
|
||||||
|
} else if hashState.offset > offset {
|
||||||
|
// Remove any stored hash state with offsets higher than this one
|
||||||
|
// as writes to this resumed hasher will make those invalid. This
|
||||||
|
// is probably okay to skip for now since we don't expect anyone to
|
||||||
|
// use the API in this way. For that reason, we don't treat an
|
||||||
|
// an error here as a fatal error, but only log it.
|
||||||
|
if err := lw.driver.Delete(hashState.path); err != nil {
|
||||||
|
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if hashStateMatch.offset == 0 {
|
||||||
|
// No need to load any state, just reset the hasher.
|
||||||
|
lw.resumableDigester.Reset()
|
||||||
|
} else {
|
||||||
|
storedState, err := lw.driver.GetContent(hashStateMatch.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = lw.resumableDigester.Restore(storedState); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mind the gap.
|
||||||
|
if gapLen := offset - int64(lw.resumableDigester.Len()); gapLen > 0 {
|
||||||
|
// Need to read content from the upload to catch up to the desired
|
||||||
|
// offset.
|
||||||
|
fr, err := newFileReader(lw.driver, lw.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = fr.Seek(int64(lw.resumableDigester.Len()), os.SEEK_SET); err != nil {
|
||||||
|
return fmt.Errorf("unable to seek to layer reader offset %d: %s", lw.resumableDigester.Len(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := io.CopyN(lw.resumableDigester, fr, gapLen); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lw *layerWriter) storeHashState() error {
|
||||||
|
uploadHashStatePath, err := lw.layerStore.repository.registry.pm.path(uploadHashStatePathSpec{
|
||||||
|
name: lw.layerStore.repository.Name(),
|
||||||
|
uuid: lw.uuid,
|
||||||
|
alg: lw.resumableDigester.Digest().Algorithm(),
|
||||||
|
offset: int64(lw.resumableDigester.Len()),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
hashState, err := lw.resumableDigester.State()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return lw.driver.PutContent(uploadHashStatePath, hashState)
|
||||||
|
}
|
||||||
|
|
||||||
// validateLayer checks the layer data against the digest, returning an error
|
// validateLayer checks the layer data against the digest, returning an error
|
||||||
// if it does not match. The canonical digest is returned.
|
// if it does not match. The canonical digest is returned.
|
||||||
func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) {
|
func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) {
|
||||||
digestVerifier, err := digest.NewDigestVerifier(dgst)
|
// Restore the hasher state to the end of the upload.
|
||||||
if err != nil {
|
if err := lw.resumeHashAt(lw.size); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(stevvooe): Store resumable hash calculations in upload directory
|
var verified bool
|
||||||
// in driver. Something like a file at path <uuid>/resumablehash/<offest>
|
canonical := lw.resumableDigester.Digest()
|
||||||
// with the hash state up to that point would be perfect. The hasher would
|
|
||||||
// then only have to fetch the difference.
|
|
||||||
|
|
||||||
// Read the file from the backend driver and validate it.
|
if canonical.Algorithm() == dgst.Algorithm() {
|
||||||
fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path)
|
// Common case: client and server prefer the same canonical digest
|
||||||
if err != nil {
|
// algorithm - currently SHA256.
|
||||||
return "", err
|
verified = dgst == canonical
|
||||||
|
} else {
|
||||||
|
// The client wants to use a different digest algorithm. They'll just
|
||||||
|
// have to be patient and wait for us to download and re-hash the
|
||||||
|
// uploaded content using that digest algorithm.
|
||||||
|
digestVerifier, err := digest.NewDigestVerifier(dgst)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the file from the backend driver and validate it.
|
||||||
|
fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = io.Copy(digestVerifier, fr); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
verified = digestVerifier.Verified()
|
||||||
}
|
}
|
||||||
|
|
||||||
tr := io.TeeReader(fr, digestVerifier)
|
if !verified {
|
||||||
|
|
||||||
// TODO(stevvooe): This is one of the places we need a Digester write
|
|
||||||
// sink. Instead, its read driven. This might be okay.
|
|
||||||
|
|
||||||
// Calculate an updated digest with the latest version.
|
|
||||||
canonical, err := digest.FromReader(tr)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !digestVerifier.Verified() {
|
|
||||||
return "", distribution.ErrLayerInvalidDigest{
|
return "", distribution.ErrLayerInvalidDigest{
|
||||||
Digest: dgst,
|
Digest: dgst,
|
||||||
Reason: fmt.Errorf("content does not match digest"),
|
Reason: fmt.Errorf("content does not match digest"),
|
||||||
|
|
|
@ -33,6 +33,7 @@ const storagePathVersion = "v2"
|
||||||
// -> _uploads/<uuid>
|
// -> _uploads/<uuid>
|
||||||
// data
|
// data
|
||||||
// startedat
|
// startedat
|
||||||
|
// hashstates/<algorithm>/<offset>
|
||||||
// -> blob/<algorithm>
|
// -> blob/<algorithm>
|
||||||
// <split directory content addressable storage>
|
// <split directory content addressable storage>
|
||||||
//
|
//
|
||||||
|
@ -87,6 +88,7 @@ const storagePathVersion = "v2"
|
||||||
//
|
//
|
||||||
// uploadDataPathSpec: <root>/v2/repositories/<name>/_uploads/<uuid>/data
|
// uploadDataPathSpec: <root>/v2/repositories/<name>/_uploads/<uuid>/data
|
||||||
// uploadStartedAtPathSpec: <root>/v2/repositories/<name>/_uploads/<uuid>/startedat
|
// uploadStartedAtPathSpec: <root>/v2/repositories/<name>/_uploads/<uuid>/startedat
|
||||||
|
// uploadHashStatePathSpec: <root>/v2/repositories/<name>/_uploads/<uuid>/hashstates/<algorithm>/<offset>
|
||||||
//
|
//
|
||||||
// Blob Store:
|
// Blob Store:
|
||||||
//
|
//
|
||||||
|
@ -249,6 +251,12 @@ func (pm *pathMapper) path(spec pathSpec) (string, error) {
|
||||||
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "data")...), nil
|
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "data")...), nil
|
||||||
case uploadStartedAtPathSpec:
|
case uploadStartedAtPathSpec:
|
||||||
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "startedat")...), nil
|
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "startedat")...), nil
|
||||||
|
case uploadHashStatePathSpec:
|
||||||
|
offset := fmt.Sprintf("%d", v.offset)
|
||||||
|
if v.list {
|
||||||
|
offset = "" // Limit to the prefix for listing offsets.
|
||||||
|
}
|
||||||
|
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "hashstates", v.alg, offset)...), nil
|
||||||
default:
|
default:
|
||||||
// TODO(sday): This is an internal error. Ensure it doesn't escape (panic?).
|
// TODO(sday): This is an internal error. Ensure it doesn't escape (panic?).
|
||||||
return "", fmt.Errorf("unknown path spec: %#v", v)
|
return "", fmt.Errorf("unknown path spec: %#v", v)
|
||||||
|
@ -424,6 +432,20 @@ type uploadStartedAtPathSpec struct {
|
||||||
|
|
||||||
func (uploadStartedAtPathSpec) pathSpec() {}
|
func (uploadStartedAtPathSpec) pathSpec() {}
|
||||||
|
|
||||||
|
// uploadHashStatePathSpec defines the path parameters for the file that stores
|
||||||
|
// the hash function state of an upload at a specific byte offset. If `list` is
|
||||||
|
// set, then the path mapper will generate a list prefix for all hash state
|
||||||
|
// offsets for the upload identified by the name, uuid, and alg.
|
||||||
|
type uploadHashStatePathSpec struct {
|
||||||
|
name string
|
||||||
|
uuid string
|
||||||
|
alg string
|
||||||
|
offset int64
|
||||||
|
list bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uploadHashStatePathSpec) pathSpec() {}
|
||||||
|
|
||||||
// digestPathComponents provides a consistent path breakdown for a given
|
// digestPathComponents provides a consistent path breakdown for a given
|
||||||
// digest. For a generic digest, it will be as follows:
|
// digest. For a generic digest, it will be as follows:
|
||||||
//
|
//
|
||||||
|
|
Loading…
Reference in a new issue