From b96de45be83506f195903c7ab85d61a1003d5b96 Mon Sep 17 00:00:00 2001 From: Josh Hawn Date: Tue, 24 Mar 2015 10:35:01 -0700 Subject: [PATCH] Use resumable digest for efficient upload finish By using a resumable digester and storing the state of upload digests between subsequent upload chunks, finalizing an upload no longer requires reading back all of the uploaded data to verify the client's expected digest. Docker-DCO-1.1-Signed-off-by: Josh Hawn (github: jlhawn) --- docs/storage/layerstore.go | 1 + docs/storage/layerwriter.go | 228 ++++++++++++++++++++++++++++++++---- docs/storage/paths.go | 22 ++++ 3 files changed, 226 insertions(+), 25 deletions(-) diff --git a/docs/storage/layerstore.go b/docs/storage/layerstore.go index 05881749e..77c235aaa 100644 --- a/docs/storage/layerstore.go +++ b/docs/storage/layerstore.go @@ -142,6 +142,7 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di layerStore: ls, uuid: uuid, startedAt: startedAt, + resumableDigester: digest.NewCanonicalResumableDigester(), bufferedFileWriter: *fw, }, nil } diff --git a/docs/storage/layerwriter.go b/docs/storage/layerwriter.go index 27bbade12..ccd8679be 100644 --- a/docs/storage/layerwriter.go +++ b/docs/storage/layerwriter.go @@ -3,7 +3,9 @@ package storage import ( "fmt" "io" + "os" "path" + "strconv" "time" "github.com/Sirupsen/logrus" @@ -20,10 +22,11 @@ var _ distribution.LayerUpload = &layerWriter{} type layerWriter struct { layerStore *layerStore - uuid string - startedAt time.Time + uuid string + startedAt time.Time + resumableDigester digest.ResumableDigester - // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisy + // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy // LayerUpload Interface bufferedFileWriter } @@ -83,37 +86,212 @@ func (lw *layerWriter) Cancel() error { return nil } +func (lw *layerWriter) Write(p []byte) (int, error) { + // Ensure that the current write offset matches how many bytes have been + // written to the digester. If not, we need to update the digest state to + // match the current write position. + if err := lw.resumeHashAt(lw.offset); err != nil { + return 0, err + } + + return io.MultiWriter(&lw.bufferedFileWriter, lw.resumableDigester).Write(p) +} + +func (lw *layerWriter) ReadFrom(r io.Reader) (n int64, err error) { + // Ensure that the current write offset matches how many bytes have been + // written to the digester. If not, we need to update the digest state to + // match the current write position. + if err := lw.resumeHashAt(lw.offset); err != nil { + return 0, err + } + + return lw.bufferedFileWriter.ReadFrom(io.TeeReader(r, lw.resumableDigester)) +} + +func (lw *layerWriter) Close() error { + if err := lw.storeHashState(); err != nil { + return err + } + + return lw.bufferedFileWriter.Close() +} + +type hashStateEntry struct { + offset int64 + path string +} + +// getStoredHashStates returns a slice of hashStateEntries for this upload. +func (lw *layerWriter) getStoredHashStates() ([]hashStateEntry, error) { + uploadHashStatePathPrefix, err := lw.layerStore.repository.registry.pm.path(uploadHashStatePathSpec{ + name: lw.layerStore.repository.Name(), + uuid: lw.uuid, + alg: lw.resumableDigester.Digest().Algorithm(), + list: true, + }) + if err != nil { + return nil, err + } + + paths, err := lw.driver.List(uploadHashStatePathPrefix) + if err != nil { + if _, ok := err.(storagedriver.PathNotFoundError); !ok { + return nil, err + } + // Treat PathNotFoundError as no entries. + paths = nil + } + + hashStateEntries := make([]hashStateEntry, 0, len(paths)) + + for _, p := range paths { + pathSuffix := path.Base(p) + // The suffix should be the offset. + offset, err := strconv.ParseInt(pathSuffix, 0, 64) + if err != nil { + logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err) + } + + hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p}) + } + + return hashStateEntries, nil +} + +// resumeHashAt attempts to restore the state of the internal hash function +// by loading the most recent saved hash state less than or equal to the given +// offset. Any unhashed bytes remaining less than the given offset are hashed +// from the content uploaded so far. +func (lw *layerWriter) resumeHashAt(offset int64) error { + if offset < 0 { + return fmt.Errorf("cannot resume hash at negative offset: %d", offset) + } + + if offset == int64(lw.resumableDigester.Len()) { + // State of digester is already at the requseted offset. + return nil + } + + // List hash states from storage backend. + var hashStateMatch hashStateEntry + hashStates, err := lw.getStoredHashStates() + if err != nil { + return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err) + } + + // Find the highest stored hashState with offset less than or equal to + // the requested offset. + for _, hashState := range hashStates { + if hashState.offset == offset { + hashStateMatch = hashState + break // Found an exact offset match. + } else if hashState.offset < offset && hashState.offset > hashStateMatch.offset { + // This offset is closer to the requested offset. + hashStateMatch = hashState + } else if hashState.offset > offset { + // Remove any stored hash state with offsets higher than this one + // as writes to this resumed hasher will make those invalid. This + // is probably okay to skip for now since we don't expect anyone to + // use the API in this way. For that reason, we don't treat an + // an error here as a fatal error, but only log it. + if err := lw.driver.Delete(hashState.path); err != nil { + logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err) + } + } + } + + if hashStateMatch.offset == 0 { + // No need to load any state, just reset the hasher. + lw.resumableDigester.Reset() + } else { + storedState, err := lw.driver.GetContent(hashStateMatch.path) + if err != nil { + return err + } + + if err = lw.resumableDigester.Restore(storedState); err != nil { + return err + } + } + + // Mind the gap. + if gapLen := offset - int64(lw.resumableDigester.Len()); gapLen > 0 { + // Need to read content from the upload to catch up to the desired + // offset. + fr, err := newFileReader(lw.driver, lw.path) + if err != nil { + return err + } + + if _, err = fr.Seek(int64(lw.resumableDigester.Len()), os.SEEK_SET); err != nil { + return fmt.Errorf("unable to seek to layer reader offset %d: %s", lw.resumableDigester.Len(), err) + } + + if _, err := io.CopyN(lw.resumableDigester, fr, gapLen); err != nil { + return err + } + } + + return nil +} + +func (lw *layerWriter) storeHashState() error { + uploadHashStatePath, err := lw.layerStore.repository.registry.pm.path(uploadHashStatePathSpec{ + name: lw.layerStore.repository.Name(), + uuid: lw.uuid, + alg: lw.resumableDigester.Digest().Algorithm(), + offset: int64(lw.resumableDigester.Len()), + }) + if err != nil { + return err + } + + hashState, err := lw.resumableDigester.State() + if err != nil { + return err + } + + return lw.driver.PutContent(uploadHashStatePath, hashState) +} + // validateLayer checks the layer data against the digest, returning an error // if it does not match. The canonical digest is returned. func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) { - digestVerifier, err := digest.NewDigestVerifier(dgst) - if err != nil { + // Restore the hasher state to the end of the upload. + if err := lw.resumeHashAt(lw.size); err != nil { return "", err } - // TODO(stevvooe): Store resumable hash calculations in upload directory - // in driver. Something like a file at path /resumablehash/ - // with the hash state up to that point would be perfect. The hasher would - // then only have to fetch the difference. + var verified bool + canonical := lw.resumableDigester.Digest() - // Read the file from the backend driver and validate it. - fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path) - if err != nil { - return "", err + if canonical.Algorithm() == dgst.Algorithm() { + // Common case: client and server prefer the same canonical digest + // algorithm - currently SHA256. + verified = dgst == canonical + } else { + // The client wants to use a different digest algorithm. They'll just + // have to be patient and wait for us to download and re-hash the + // uploaded content using that digest algorithm. + digestVerifier, err := digest.NewDigestVerifier(dgst) + if err != nil { + return "", err + } + + // Read the file from the backend driver and validate it. + fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path) + if err != nil { + return "", err + } + + if _, err = io.Copy(digestVerifier, fr); err != nil { + return "", err + } + + verified = digestVerifier.Verified() } - tr := io.TeeReader(fr, digestVerifier) - - // TODO(stevvooe): This is one of the places we need a Digester write - // sink. Instead, its read driven. This might be okay. - - // Calculate an updated digest with the latest version. - canonical, err := digest.FromReader(tr) - if err != nil { - return "", err - } - - if !digestVerifier.Verified() { + if !verified { return "", distribution.ErrLayerInvalidDigest{ Digest: dgst, Reason: fmt.Errorf("content does not match digest"), diff --git a/docs/storage/paths.go b/docs/storage/paths.go index 179e7b783..f541f0794 100644 --- a/docs/storage/paths.go +++ b/docs/storage/paths.go @@ -33,6 +33,7 @@ const storagePathVersion = "v2" // -> _uploads/ // data // startedat +// hashstates// // -> blob/ // // @@ -87,6 +88,7 @@ const storagePathVersion = "v2" // // uploadDataPathSpec: /v2/repositories//_uploads//data // uploadStartedAtPathSpec: /v2/repositories//_uploads//startedat +// uploadHashStatePathSpec: /v2/repositories//_uploads//hashstates// // // Blob Store: // @@ -249,6 +251,12 @@ func (pm *pathMapper) path(spec pathSpec) (string, error) { return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "data")...), nil case uploadStartedAtPathSpec: return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "startedat")...), nil + case uploadHashStatePathSpec: + offset := fmt.Sprintf("%d", v.offset) + if v.list { + offset = "" // Limit to the prefix for listing offsets. + } + return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "hashstates", v.alg, offset)...), nil default: // TODO(sday): This is an internal error. Ensure it doesn't escape (panic?). return "", fmt.Errorf("unknown path spec: %#v", v) @@ -424,6 +432,20 @@ type uploadStartedAtPathSpec struct { func (uploadStartedAtPathSpec) pathSpec() {} +// uploadHashStatePathSpec defines the path parameters for the file that stores +// the hash function state of an upload at a specific byte offset. If `list` is +// set, then the path mapper will generate a list prefix for all hash state +// offsets for the upload identified by the name, uuid, and alg. +type uploadHashStatePathSpec struct { + name string + uuid string + alg string + offset int64 + list bool +} + +func (uploadHashStatePathSpec) pathSpec() {} + // digestPathComponents provides a consistent path breakdown for a given // digest. For a generic digest, it will be as follows: //