Merge pull request #546 from stevvooe/resumable-digest-refactor

Remove digest package's dependency on external sha implementation
This commit is contained in:
Stephen Day 2015-05-22 18:15:37 -07:00
commit 8ce4dcaef8
36 changed files with 515 additions and 500 deletions

4
Godeps/Godeps.json generated
View file

@ -85,8 +85,8 @@
"Rev": "e444e69cbd2e2e3e0749a2f3c717cec491552bbf"
},
{
"ImportPath": "github.com/jlhawn/go-crypto",
"Rev": "cd738dde20f0b3782516181b0866c9bb9db47401"
"ImportPath": "github.com/stevvooe/resumable",
"Rev": "51ad44105773cafcbe91927f70ac68e1bf78f8b4"
},
{
"ImportPath": "github.com/yvasiyarov/go-metrics",

View file

@ -1,87 +0,0 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package crypto is a Subset of the Go `crypto` Package with a Resumable Hash
package crypto
import (
"hash"
"strconv"
)
// Hash identifies a cryptographic hash function that is implemented in another
// package.
type Hash uint
// HashFunc simply returns the value of h so that Hash implements SignerOpts.
func (h Hash) HashFunc() Hash {
return h
}
const (
SHA224 Hash = 1 + iota // import crypto/sha256
SHA256 // import crypto/sha256
SHA384 // import crypto/sha512
SHA512 // import crypto/sha512
maxHash
)
var digestSizes = []uint8{
SHA224: 28,
SHA256: 32,
SHA384: 48,
SHA512: 64,
}
// Size returns the length, in bytes, of a digest resulting from the given hash
// function. It doesn't require that the hash function in question be linked
// into the program.
func (h Hash) Size() int {
if h > 0 && h < maxHash {
return int(digestSizes[h])
}
panic("crypto: Size of unknown hash function")
}
// ResumableHash is the common interface implemented by all resumable hash
// functions.
type ResumableHash interface {
// ResumableHash is a superset of hash.Hash
hash.Hash
// Len returns the number of bytes written to the Hash so far.
Len() uint64
// State returns a snapshot of the state of the Hash.
State() ([]byte, error)
// Restore resets the Hash to the given state.
Restore(state []byte) error
}
var hashes = make([]func() ResumableHash, maxHash)
// New returns a new ResumableHash calculating the given hash function. New panics
// if the hash function is not linked into the binary.
func (h Hash) New() ResumableHash {
if h > 0 && h < maxHash {
f := hashes[h]
if f != nil {
return f()
}
}
panic("crypto: requested hash function #" + strconv.Itoa(int(h)) + " is unavailable")
}
// Available reports whether the given hash function is linked into the binary.
func (h Hash) Available() bool {
return h < maxHash && hashes[h] != nil
}
// RegisterHash registers a function that returns a new instance of the given
// hash function. This is intended to be called from the init function in
// packages that implement hash functions.
func RegisterHash(h Hash, f func() ResumableHash) {
if h >= maxHash {
panic("crypto: RegisterHash of unknown hash function")
}
hashes[h] = f
}

View file

@ -1,23 +0,0 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// This file defines flags attached to various functions
// and data objects. The compilers, assemblers, and linker must
// all agree on these values.
// Don't profile the marked routine. This flag is deprecated.
#define NOPROF 1
// It is ok for the linker to get multiple of these symbols. It will
// pick one of the duplicates to use.
#define DUPOK 2
// Don't insert stack check preamble.
#define NOSPLIT 4
// Put this data in a read-only section.
#define RODATA 8
// This data contains no pointers.
#define NOPTR 16
// This is a wrapper function and should not count as disabling 'recover'.
#define WRAPPER 32
// This function uses its incoming context register.
#define NEEDCTXT 64

View file

@ -3,4 +3,4 @@ A Subset of the Go `crypto` Package with a Resumable Hash Interface
### Documentation
GoDocs: http://godoc.org/github.com/jlhawn/go-crypto
GoDocs: http://godoc.org/github.com/stevvooe/resumable

View file

@ -0,0 +1,43 @@
// Package resumable registers resumable versions of hash functions. Resumable
// varieties of hash functions are available via the standard crypto package.
// Support can be checked by type assertion against the resumable.Hash
// interface.
//
// While one can use these sub-packages directly, it makes more sense to
// register them using side-effect imports:
//
// import _ "github.com/stevvooe/resumable/sha256"
//
// This will make the resumable hashes available to the application through
// the standard crypto package. For example, if a new sha256 is required, one
// should use the following:
//
// h := crypto.SHA256.New()
//
// Such a features allows one to control the inclusion of resumable hash
// support in a single file. Applications that require the resumable hash
// implementation can type switch to detect support, while other parts of the
// application can be completely oblivious to the presence of the alternative
// hash functions.
//
// Also note that the implementations available in this package are completely
// untouched from their Go counterparts in the standard library. Only an extra
// file is added to each package to implement the extra resumable hash
// functions.
package resumable
import "hash"
// Hash is the common interface implemented by all resumable hash functions.
type Hash interface {
hash.Hash
// Len returns the number of bytes written to the Hash so far.
Len() int64
// State returns a snapshot of the state of the Hash.
State() ([]byte, error)
// Restore resets the Hash to the given state.
Restore(state []byte) error
}

View file

@ -3,11 +3,14 @@ package sha256
import (
"bytes"
"encoding/gob"
// import to ensure that our init function runs after the standard package
_ "crypto/sha256"
)
// Len returns the number of bytes which have been written to the digest.
func (d *digest) Len() uint64 {
return d.len
func (d *digest) Len() int64 {
return int64(d.len)
}
// State returns a snapshot of the state of the digest.

View file

@ -7,7 +7,8 @@
package sha256
import (
"github.com/jlhawn/go-crypto"
"crypto"
"hash"
)
func init() {
@ -77,15 +78,15 @@ func (d *digest) Reset() {
d.len = 0
}
// New returns a new crypto.ResumableHash computing the SHA256 checksum.
func New() crypto.ResumableHash {
// New returns a new hash.Hash computing the SHA256 checksum.
func New() hash.Hash {
d := new(digest)
d.Reset()
return d
}
// New224 returns a new crypto.ResumableHash computing the SHA224 checksum.
func New224() crypto.ResumableHash {
// New224 returns a new hash.Hash computing the SHA224 checksum.
func New224() hash.Hash {
d := new(digest)
d.is224 = true
d.Reset()

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
#include "../textflag.h"
#include "textflag.h"
// SHA256 block routine. See sha256block.go for Go equivalent.
//

View file

@ -2,15 +2,17 @@ package sha256
import (
"bytes"
stdlib "crypto"
"crypto"
"crypto/rand"
_ "crypto/sha256" // To register the stdlib sha224 and sha256 algs.
resumable "github.com/jlhawn/go-crypto"
"crypto/sha256" // To register the stdlib sha224 and sha256 algs.
"hash"
"io"
"testing"
"github.com/stevvooe/resumable"
)
func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) {
func compareResumableHash(t *testing.T, newResumable func() hash.Hash, newStdlib func() hash.Hash) {
// Read 3 Kilobytes of random data into a buffer.
buf := make([]byte, 3*1024)
if _, err := io.ReadFull(rand.Reader, buf); err != nil {
@ -20,8 +22,8 @@ func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) {
// Use two Hash objects to consume prefixes of the data. One will be
// snapshotted and resumed with each additional byte, then both will write
// that byte. The digests should be equal after each byte is digested.
resumableHasher := r.New()
stdlibHasher := h.New()
resumableHasher := newResumable().(resumable.Hash)
stdlibHasher := newStdlib()
// First, assert that the initial distest is the same.
if !bytes.Equal(resumableHasher.Sum(nil), stdlibHasher.Sum(nil)) {
@ -52,6 +54,21 @@ func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) {
}
func TestResumable(t *testing.T) {
compareResumableHash(t, resumable.SHA224, stdlib.SHA224)
compareResumableHash(t, resumable.SHA256, stdlib.SHA256)
compareResumableHash(t, New224, sha256.New224)
compareResumableHash(t, New, sha256.New)
}
func TestResumableRegistered(t *testing.T) {
for _, hf := range []crypto.Hash{crypto.SHA224, crypto.SHA256} {
// make sure that the hash gets the resumable version from the global
// registry in crypto library.
h := hf.New()
if rh, ok := h.(resumable.Hash); !ok {
t.Fatalf("non-resumable hash function registered: %#v %#v", rh, crypto.SHA256)
}
}
}

View file

@ -3,11 +3,14 @@ package sha512
import (
"bytes"
"encoding/gob"
// import to ensure that our init function runs after the standard package
_ "crypto/sha512"
)
// Len returns the number of bytes which have been written to the digest.
func (d *digest) Len() uint64 {
return d.len
func (d *digest) Len() int64 {
return int64(d.len)
}
// State returns a snapshot of the state of the digest.

View file

@ -7,7 +7,8 @@
package sha512
import (
"github.com/jlhawn/go-crypto"
"crypto"
"hash"
)
func init() {
@ -77,15 +78,15 @@ func (d *digest) Reset() {
d.len = 0
}
// New returns a new crypto.ResumableHash computing the SHA512 checksum.
func New() crypto.ResumableHash {
// New returns a new hash.Hash computing the SHA512 checksum.
func New() hash.Hash {
d := new(digest)
d.Reset()
return d
}
// New384 returns a new crypto.ResumableHash computing the SHA384 checksum.
func New384() crypto.ResumableHash {
// New384 returns a new hash.Hash computing the SHA384 checksum.
func New384() hash.Hash {
d := new(digest)
d.is384 = true
d.Reset()

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
#include "../textflag.h"
#include "textflag.h"
// SHA512 block routine. See sha512block.go for Go equivalent.
//

View file

@ -2,15 +2,17 @@ package sha512
import (
"bytes"
stdlib "crypto"
"crypto/rand"
_ "crypto/sha512" // To register the stdlib sha224 and sha256 algs.
resumable "github.com/jlhawn/go-crypto"
"crypto"
"crypto/rand" // To register the stdlib sha224 and sha256 algs.
"crypto/sha512"
"hash"
"io"
"testing"
"github.com/stevvooe/resumable"
)
func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) {
func compareResumableHash(t *testing.T, newResumable func() hash.Hash, newStdlib func() hash.Hash) {
// Read 3 Kilobytes of random data into a buffer.
buf := make([]byte, 3*1024)
if _, err := io.ReadFull(rand.Reader, buf); err != nil {
@ -20,8 +22,8 @@ func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) {
// Use two Hash objects to consume prefixes of the data. One will be
// snapshotted and resumed with each additional byte, then both will write
// that byte. The digests should be equal after each byte is digested.
resumableHasher := r.New()
stdlibHasher := h.New()
resumableHasher := newResumable().(resumable.Hash)
stdlibHasher := newStdlib()
// First, assert that the initial distest is the same.
if !bytes.Equal(resumableHasher.Sum(nil), stdlibHasher.Sum(nil)) {
@ -52,6 +54,21 @@ func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) {
}
func TestResumable(t *testing.T) {
compareResumableHash(t, resumable.SHA384, stdlib.SHA384)
compareResumableHash(t, resumable.SHA512, stdlib.SHA512)
compareResumableHash(t, New384, sha512.New384)
compareResumableHash(t, New, sha512.New)
}
func TestResumableRegistered(t *testing.T) {
for _, hf := range []crypto.Hash{crypto.SHA384, crypto.SHA512} {
// make sure that the hash gets the resumable version from the global
// registry in crypto library.
h := hf.New()
if rh, ok := h.(resumable.Hash); !ok {
t.Fatalf("non-resumable hash function registered: %#v %#v", rh, crypto.SHA256)
}
}
}

View file

@ -15,6 +15,7 @@ import (
const (
// DigestTarSumV1EmptyTar is the digest for the empty tar file.
DigestTarSumV1EmptyTar = "tarsum.v1+sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
// DigestSha256EmptyTar is the canonical sha256 digest of empty data
DigestSha256EmptyTar = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
)
@ -38,7 +39,7 @@ const (
type Digest string
// NewDigest returns a Digest from alg and a hash.Hash object.
func NewDigest(alg string, h hash.Hash) Digest {
func NewDigest(alg Algorithm, h hash.Hash) Digest {
return Digest(fmt.Sprintf("%s:%x", alg, h.Sum(nil)))
}
@ -71,9 +72,9 @@ func ParseDigest(s string) (Digest, error) {
// FromReader returns the most valid digest for the underlying content.
func FromReader(rd io.Reader) (Digest, error) {
digester := NewCanonicalDigester()
digester := Canonical.New()
if _, err := io.Copy(digester, rd); err != nil {
if _, err := io.Copy(digester.Hash(), rd); err != nil {
return "", err
}
@ -130,8 +131,8 @@ func (d Digest) Validate() error {
return ErrDigestInvalidFormat
}
switch s[:i] {
case "sha256", "sha384", "sha512":
switch Algorithm(s[:i]) {
case SHA256, SHA384, SHA512:
break
default:
return ErrDigestUnsupported
@ -142,8 +143,8 @@ func (d Digest) Validate() error {
// Algorithm returns the algorithm portion of the digest. This will panic if
// the underlying digest is not in a valid format.
func (d Digest) Algorithm() string {
return string(d[:d.sepIndex()])
func (d Digest) Algorithm() Algorithm {
return Algorithm(d[:d.sepIndex()])
}
// Hex returns the hex digest portion of the digest. This will panic if the

View file

@ -10,7 +10,7 @@ func TestParseDigest(t *testing.T) {
for _, testcase := range []struct {
input string
err error
algorithm string
algorithm Algorithm
hex string
}{
{

View file

@ -1,54 +1,95 @@
package digest
import (
"crypto/sha256"
"crypto"
"hash"
)
// Digester calculates the digest of written data. It is functionally
// equivalent to hash.Hash but provides methods for returning the Digest type
// rather than raw bytes.
type Digester struct {
alg string
hash.Hash
// Algorithm identifies and implementation of a digester by an identifier.
// Note the that this defines both the hash algorithm used and the string
// encoding.
type Algorithm string
// supported digest types
const (
SHA256 Algorithm = "sha256" // sha256 with hex encoding
SHA384 Algorithm = "sha384" // sha384 with hex encoding
SHA512 Algorithm = "sha512" // sha512 with hex encoding
TarsumV1SHA256 Algorithm = "tarsum+v1+sha256" // supported tarsum version, verification only
// Canonical is the primary digest algorithm used with the distribution
// project. Other digests may be used but this one is the primary storage
// digest.
Canonical = SHA256
)
var (
// TODO(stevvooe): Follow the pattern of the standard crypto package for
// registration of digests. Effectively, we are a registerable set and
// common symbol access.
// algorithms maps values to hash.Hash implementations. Other algorithms
// may be available but they cannot be calculated by the digest package.
algorithms = map[Algorithm]crypto.Hash{
SHA256: crypto.SHA256,
SHA384: crypto.SHA384,
SHA512: crypto.SHA512,
}
)
// Available returns true if the digest type is available for use. If this
// returns false, New and Hash will return nil.
func (a Algorithm) Available() bool {
h, ok := algorithms[a]
if !ok {
return false
}
// check availability of the hash, as well
return h.Available()
}
// NewDigester create a new Digester with the given hashing algorithm and instance
// of that algo's hasher.
func NewDigester(alg string, h hash.Hash) Digester {
return Digester{
alg: alg,
Hash: h,
// New returns a new digester for the specified algorithm. If the algorithm
// does not have a digester implementation, nil will be returned. This can be
// checked by calling Available before calling New.
func (a Algorithm) New() Digester {
return &digester{
alg: a,
hash: a.Hash(),
}
}
// NewCanonicalDigester is a convenience function to create a new Digester with
// our default settings.
func NewCanonicalDigester() Digester {
return NewDigester("sha256", sha256.New())
// Hash returns a new hash as used by the algorithm. If not available, nil is
// returned. Make sure to check Available before calling.
func (a Algorithm) Hash() hash.Hash {
if !a.Available() {
return nil
}
return algorithms[a].New()
}
// Digest returns the current digest for this digester.
func (d *Digester) Digest() Digest {
return NewDigest(d.alg, d.Hash)
}
// TODO(stevvooe): Allow resolution of verifiers using the digest type and
// this registration system.
// ResumableHash is the common interface implemented by all resumable hash
// functions.
type ResumableHash interface {
// ResumableHash is a superset of hash.Hash
hash.Hash
// Len returns the number of bytes written to the Hash so far.
Len() uint64
// State returns a snapshot of the state of the Hash.
State() ([]byte, error)
// Restore resets the Hash to the given state.
Restore(state []byte) error
}
// ResumableDigester is a digester that can export its internal state and be
// restored from saved state.
type ResumableDigester interface {
ResumableHash
// Digester calculates the digest of written data. Writes should go directly
// to the return value of Hash, while calling Digest will return the current
// value of the digest.
type Digester interface {
Hash() hash.Hash // provides direct access to underlying hash instance.
Digest() Digest
}
// digester provides a simple digester definition that embeds a hasher.
type digester struct {
alg Algorithm
hash hash.Hash
}
func (d *digester) Hash() hash.Hash {
return d.hash
}
func (d *digester) Digest() Digest {
return NewDigest(d.alg, d.hash)
}

View file

@ -1,52 +0,0 @@
// +build !noresumabledigest
package digest
import (
"fmt"
"github.com/jlhawn/go-crypto"
// For ResumableHash
_ "github.com/jlhawn/go-crypto/sha256" // For Resumable SHA256
_ "github.com/jlhawn/go-crypto/sha512" // For Resumable SHA384, SHA512
)
// resumableDigester implements ResumableDigester.
type resumableDigester struct {
alg string
crypto.ResumableHash
}
var resumableHashAlgs = map[string]crypto.Hash{
"sha256": crypto.SHA256,
"sha384": crypto.SHA384,
"sha512": crypto.SHA512,
}
// NewResumableDigester creates a new ResumableDigester with the given hashing
// algorithm.
func NewResumableDigester(alg string) (ResumableDigester, error) {
hash, supported := resumableHashAlgs[alg]
if !supported {
return resumableDigester{}, fmt.Errorf("unsupported resumable hash algorithm: %s", alg)
}
return resumableDigester{
alg: alg,
ResumableHash: hash.New(),
}, nil
}
// NewCanonicalResumableDigester creates a ResumableDigester using the default
// digest algorithm.
func NewCanonicalResumableDigester() ResumableDigester {
return resumableDigester{
alg: "sha256",
ResumableHash: crypto.SHA256.New(),
}
}
// Digest returns the current digest for this resumable digester.
func (d resumableDigester) Digest() Digest {
return NewDigest(d.alg, d.ResumableHash)
}

View file

@ -0,0 +1,21 @@
// +build !noresumabledigest
package digest
import (
"testing"
"github.com/stevvooe/resumable"
_ "github.com/stevvooe/resumable/sha256"
)
// TestResumableDetection just ensures that the resumable capability of a hash
// is exposed through the digester type, which is just a hash plus a Digest
// method.
func TestResumableDetection(t *testing.T) {
d := Canonical.New()
if _, ok := d.Hash().(resumable.Hash); !ok {
t.Fatalf("expected digester to implement resumable.Hash: %#v, %v", d, d.Hash())
}
}

View file

@ -42,17 +42,17 @@ func NewSet() *Set {
// values or short values. This function does not test equality,
// rather whether the second value could match against the first
// value.
func checkShortMatch(alg, hex, shortAlg, shortHex string) bool {
func checkShortMatch(alg Algorithm, hex, shortAlg, shortHex string) bool {
if len(hex) == len(shortHex) {
if hex != shortHex {
return false
}
if len(shortAlg) > 0 && alg != shortAlg {
if len(shortAlg) > 0 && string(alg) != shortAlg {
return false
}
} else if !strings.HasPrefix(hex, shortHex) {
return false
} else if len(shortAlg) > 0 && alg != shortAlg {
} else if len(shortAlg) > 0 && string(alg) != shortAlg {
return false
}
return true
@ -68,7 +68,7 @@ func (dst *Set) Lookup(d string) (Digest, error) {
}
var (
searchFunc func(int) bool
alg string
alg Algorithm
hex string
)
dgst, err := ParseDigest(d)
@ -88,13 +88,13 @@ func (dst *Set) Lookup(d string) (Digest, error) {
}
}
idx := sort.Search(len(dst.entries), searchFunc)
if idx == len(dst.entries) || !checkShortMatch(dst.entries[idx].alg, dst.entries[idx].val, alg, hex) {
if idx == len(dst.entries) || !checkShortMatch(dst.entries[idx].alg, dst.entries[idx].val, string(alg), hex) {
return "", ErrDigestNotFound
}
if dst.entries[idx].alg == alg && dst.entries[idx].val == hex {
return dst.entries[idx].digest, nil
}
if idx+1 < len(dst.entries) && checkShortMatch(dst.entries[idx+1].alg, dst.entries[idx+1].val, alg, hex) {
if idx+1 < len(dst.entries) && checkShortMatch(dst.entries[idx+1].alg, dst.entries[idx+1].val, string(alg), hex) {
return "", ErrDigestAmbiguous
}
@ -172,7 +172,7 @@ func ShortCodeTable(dst *Set, length int) map[Digest]string {
}
type digestEntry struct {
alg string
alg Algorithm
val string
digest Digest
}

View file

@ -1,8 +1,6 @@
package digest
import (
"crypto/sha256"
"crypto/sha512"
"hash"
"io"
"io/ioutil"
@ -33,7 +31,7 @@ func NewDigestVerifier(d Digest) (Verifier, error) {
switch alg {
case "sha256", "sha384", "sha512":
return hashVerifier{
hash: newHash(alg),
hash: alg.Hash(),
digest: d,
}, nil
default:
@ -95,19 +93,6 @@ func (lv *lengthVerifier) Verified() bool {
return lv.expected == lv.len
}
func newHash(name string) hash.Hash {
switch name {
case "sha256":
return sha256.New()
case "sha384":
return sha512.New384()
case "sha512":
return sha512.New()
default:
panic("unsupport algorithm: " + name)
}
}
type hashVerifier struct {
digest Digest
hash hash.Hash

View file

@ -322,8 +322,8 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
if err != nil {
return distribution.Descriptor{}, err
}
dgstr := digest.NewCanonicalDigester()
n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr))
dgstr := digest.Canonical.New()
n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr.Hash()))
if err != nil {
return distribution.Descriptor{}, err
}

View file

@ -213,8 +213,8 @@ func TestBlobAPI(t *testing.T) {
// Now, push just a chunk
layerFile.Seek(0, 0)
canonicalDigester := digest.NewCanonicalDigester()
if _, err := io.Copy(canonicalDigester, layerFile); err != nil {
canonicalDigester := digest.Canonical.New()
if _, err := io.Copy(canonicalDigester.Hash(), layerFile); err != nil {
t.Fatalf("error copying to digest: %v", err)
}
canonicalDigest := canonicalDigester.Digest()
@ -637,9 +637,9 @@ func doPushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Diges
// pushLayer pushes the layer content returning the url on success.
func pushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Digest, uploadURLBase string, body io.Reader) string {
digester := digest.NewCanonicalDigester()
digester := digest.Canonical.New()
resp, err := doPushLayer(t, ub, name, dgst, uploadURLBase, io.TeeReader(body, &digester))
resp, err := doPushLayer(t, ub, name, dgst, uploadURLBase, io.TeeReader(body, digester.Hash()))
if err != nil {
t.Fatalf("unexpected error doing push layer request: %v", err)
}
@ -702,9 +702,9 @@ func doPushChunk(t *testing.T, uploadURLBase string, body io.Reader) (*http.Resp
uploadURL := u.String()
digester := digest.NewCanonicalDigester()
digester := digest.Canonical.New()
req, err := http.NewRequest("PATCH", uploadURL, io.TeeReader(body, digester))
req, err := http.NewRequest("PATCH", uploadURL, io.TeeReader(body, digester.Hash()))
if err != nil {
t.Fatalf("unexpected error creating new request: %v", err)
}

View file

@ -1,11 +1,9 @@
package storage
import (
"errors"
"fmt"
"io"
"os"
"path"
"strconv"
"time"
"github.com/Sirupsen/logrus"
@ -15,14 +13,19 @@ import (
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
var (
errResumableDigestNotAvailable = errors.New("resumable digest not available")
)
// layerWriter is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface.
type blobWriter struct {
blobStore *linkedBlobStore
id string
startedAt time.Time
resumableDigester digest.ResumableDigester
id string
startedAt time.Time
digester digest.Digester
written int64 // track the contiguous write
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
// LayerUpload Interface
@ -82,33 +85,31 @@ func (bw *blobWriter) Cancel(ctx context.Context) error {
}
func (bw *blobWriter) Write(p []byte) (int, error) {
if bw.resumableDigester == nil {
return bw.bufferedFileWriter.Write(p)
}
// Ensure that the current write offset matches how many bytes have been
// written to the digester. If not, we need to update the digest state to
// match the current write position.
if err := bw.resumeHashAt(bw.blobStore.ctx, bw.offset); err != nil {
if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
return 0, err
}
return io.MultiWriter(&bw.bufferedFileWriter, bw.resumableDigester).Write(p)
n, err := io.MultiWriter(&bw.bufferedFileWriter, bw.digester.Hash()).Write(p)
bw.written += int64(n)
return n, err
}
func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
if bw.resumableDigester == nil {
return bw.bufferedFileWriter.ReadFrom(r)
}
// Ensure that the current write offset matches how many bytes have been
// written to the digester. If not, we need to update the digest state to
// match the current write position.
if err := bw.resumeHashAt(bw.blobStore.ctx, bw.offset); err != nil {
if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
return 0, err
}
return bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.resumableDigester))
nn, err := bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash()))
bw.written += nn
return nn, err
}
func (bw *blobWriter) Close() error {
@ -116,10 +117,8 @@ func (bw *blobWriter) Close() error {
return bw.err
}
if bw.resumableDigester != nil {
if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
return err
}
if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
return err
}
return bw.bufferedFileWriter.Close()
@ -171,13 +170,11 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
desc.Length = bw.size
}
if bw.resumableDigester != nil {
// Restore the hasher state to the end of the upload.
if err := bw.resumeHashAt(ctx, bw.size); err != nil {
return distribution.Descriptor{}, err
}
// TODO(stevvooe): This section is very meandering. Need to be broken down
// to be a lot more clear.
canonical = bw.resumableDigester.Digest()
if err := bw.resumeDigestAt(ctx, bw.size); err == nil {
canonical = bw.digester.Digest()
if canonical.Algorithm() == desc.Digest.Algorithm() {
// Common case: client and server prefer the same canonical digest
@ -189,33 +186,49 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
// uploaded content using that digest algorithm.
fullHash = true
}
} else {
} else if err == errResumableDigestNotAvailable {
// Not using resumable digests, so we need to hash the entire layer.
fullHash = true
} else {
return distribution.Descriptor{}, err
}
if fullHash {
digester := digest.NewCanonicalDigester()
digestVerifier, err := digest.NewDigestVerifier(desc.Digest)
if err != nil {
return distribution.Descriptor{}, err
// a fantastic optimization: if the the written data and the size are
// the same, we don't need to read the data from the backend. This is
// because we've written the entire file in the lifecycle of the
// current instance.
if bw.written == bw.size && digest.Canonical == desc.Digest.Algorithm() {
canonical = bw.digester.Digest()
verified = desc.Digest == canonical
}
// Read the file from the backend driver and validate it.
fr, err := newFileReader(ctx, bw.bufferedFileWriter.driver, bw.path, desc.Length)
if err != nil {
return distribution.Descriptor{}, err
// If the check based on size fails, we fall back to the slowest of
// paths. We may be able to make the size-based check a stronger
// guarantee, so this may be defensive.
if !verified {
digester := digest.Canonical.New()
digestVerifier, err := digest.NewDigestVerifier(desc.Digest)
if err != nil {
return distribution.Descriptor{}, err
}
// Read the file from the backend driver and validate it.
fr, err := newFileReader(ctx, bw.bufferedFileWriter.driver, bw.path, desc.Length)
if err != nil {
return distribution.Descriptor{}, err
}
tr := io.TeeReader(fr, digester.Hash())
if _, err := io.Copy(digestVerifier, tr); err != nil {
return distribution.Descriptor{}, err
}
canonical = digester.Digest()
verified = digestVerifier.Verified()
}
tr := io.TeeReader(fr, digester)
if _, err := io.Copy(digestVerifier, tr); err != nil {
return distribution.Descriptor{}, err
}
canonical = digester.Digest()
verified = digestVerifier.Verified()
}
if !verified {
@ -298,172 +311,3 @@ func (bw *blobWriter) moveBlob(ctx context.Context, desc distribution.Descriptor
return bw.blobStore.driver.Move(ctx, bw.path, blobPath)
}
type hashStateEntry struct {
offset int64
path string
}
// getStoredHashStates returns a slice of hashStateEntries for this upload.
func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
uploadHashStatePathPrefix, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.resumableDigester.Digest().Algorithm(),
list: true,
})
if err != nil {
return nil, err
}
paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
if err != nil {
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
return nil, err
}
// Treat PathNotFoundError as no entries.
paths = nil
}
hashStateEntries := make([]hashStateEntry, 0, len(paths))
for _, p := range paths {
pathSuffix := path.Base(p)
// The suffix should be the offset.
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
if err != nil {
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
}
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
}
return hashStateEntries, nil
}
// resumeHashAt attempts to restore the state of the internal hash function
// by loading the most recent saved hash state less than or equal to the given
// offset. Any unhashed bytes remaining less than the given offset are hashed
// from the content uploaded so far.
func (bw *blobWriter) resumeHashAt(ctx context.Context, offset int64) error {
if offset < 0 {
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
}
if offset == int64(bw.resumableDigester.Len()) {
// State of digester is already at the requested offset.
return nil
}
// List hash states from storage backend.
var hashStateMatch hashStateEntry
hashStates, err := bw.getStoredHashStates(ctx)
if err != nil {
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
}
// Find the highest stored hashState with offset less than or equal to
// the requested offset.
for _, hashState := range hashStates {
if hashState.offset == offset {
hashStateMatch = hashState
break // Found an exact offset match.
} else if hashState.offset < offset && hashState.offset > hashStateMatch.offset {
// This offset is closer to the requested offset.
hashStateMatch = hashState
} else if hashState.offset > offset {
// Remove any stored hash state with offsets higher than this one
// as writes to this resumed hasher will make those invalid. This
// is probably okay to skip for now since we don't expect anyone to
// use the API in this way. For that reason, we don't treat an
// an error here as a fatal error, but only log it.
if err := bw.driver.Delete(ctx, hashState.path); err != nil {
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
}
}
}
if hashStateMatch.offset == 0 {
// No need to load any state, just reset the hasher.
bw.resumableDigester.Reset()
} else {
storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
if err != nil {
return err
}
if err = bw.resumableDigester.Restore(storedState); err != nil {
return err
}
}
// Mind the gap.
if gapLen := offset - int64(bw.resumableDigester.Len()); gapLen > 0 {
// Need to read content from the upload to catch up to the desired offset.
fr, err := newFileReader(ctx, bw.driver, bw.path, bw.size)
if err != nil {
return err
}
if _, err = fr.Seek(int64(bw.resumableDigester.Len()), os.SEEK_SET); err != nil {
return fmt.Errorf("unable to seek to layer reader offset %d: %s", bw.resumableDigester.Len(), err)
}
if _, err := io.CopyN(bw.resumableDigester, fr, gapLen); err != nil {
return err
}
}
return nil
}
func (bw *blobWriter) storeHashState(ctx context.Context) error {
uploadHashStatePath, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.resumableDigester.Digest().Algorithm(),
offset: int64(bw.resumableDigester.Len()),
})
if err != nil {
return err
}
hashState, err := bw.resumableDigester.State()
if err != nil {
return err
}
return bw.driver.PutContent(ctx, uploadHashStatePath, hashState)
}
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (bw *blobWriter) removeResources(ctx context.Context) error {
dataPath, err := bw.blobStore.pm.path(uploadDataPathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
})
if err != nil {
return err
}
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!
default:
// This should be uncommon enough such that returning an error
// should be okay. At this point, the upload should be mostly
// complete, but perhaps the backend became unaccessible.
context.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
return err
}
}
return nil
}

View file

@ -2,5 +2,16 @@
package storage
func (bw *blobWriter) setupResumableDigester() {
import (
"github.com/docker/distribution/context"
)
// resumeHashAt is a noop when resumable digest support is disabled.
func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error {
return errResumableDigestNotAvailable
}
// storeHashState is a noop when resumable digest support is disabled.
func (bw *blobWriter) storeHashState(ctx context.Context) error {
return errResumableDigestNotAvailable
}

View file

@ -2,8 +2,198 @@
package storage
import "github.com/docker/distribution/digest"
import (
"fmt"
"io"
"os"
"path"
"strconv"
func (bw *blobWriter) setupResumableDigester() {
bw.resumableDigester = digest.NewCanonicalResumableDigester()
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/stevvooe/resumable"
// register resumable hashes with import
_ "github.com/stevvooe/resumable/sha256"
_ "github.com/stevvooe/resumable/sha512"
)
// resumeDigestAt attempts to restore the state of the internal hash function
// by loading the most recent saved hash state less than or equal to the given
// offset. Any unhashed bytes remaining less than the given offset are hashed
// from the content uploaded so far.
func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error {
if offset < 0 {
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
}
h, ok := bw.digester.Hash().(resumable.Hash)
if !ok {
return errResumableDigestNotAvailable
}
if offset == int64(h.Len()) {
// State of digester is already at the requested offset.
return nil
}
// List hash states from storage backend.
var hashStateMatch hashStateEntry
hashStates, err := bw.getStoredHashStates(ctx)
if err != nil {
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
}
// Find the highest stored hashState with offset less than or equal to
// the requested offset.
for _, hashState := range hashStates {
if hashState.offset == offset {
hashStateMatch = hashState
break // Found an exact offset match.
} else if hashState.offset < offset && hashState.offset > hashStateMatch.offset {
// This offset is closer to the requested offset.
hashStateMatch = hashState
} else if hashState.offset > offset {
// Remove any stored hash state with offsets higher than this one
// as writes to this resumed hasher will make those invalid. This
// is probably okay to skip for now since we don't expect anyone to
// use the API in this way. For that reason, we don't treat an
// an error here as a fatal error, but only log it.
if err := bw.driver.Delete(ctx, hashState.path); err != nil {
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
}
}
}
if hashStateMatch.offset == 0 {
// No need to load any state, just reset the hasher.
h.Reset()
} else {
storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
if err != nil {
return err
}
if err = h.Restore(storedState); err != nil {
return err
}
}
// Mind the gap.
if gapLen := offset - int64(h.Len()); gapLen > 0 {
// Need to read content from the upload to catch up to the desired offset.
fr, err := newFileReader(ctx, bw.driver, bw.path, bw.size)
if err != nil {
return err
}
if _, err = fr.Seek(int64(h.Len()), os.SEEK_SET); err != nil {
return fmt.Errorf("unable to seek to layer reader offset %d: %s", h.Len(), err)
}
if _, err := io.CopyN(h, fr, gapLen); err != nil {
return err
}
}
return nil
}
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (bw *blobWriter) removeResources(ctx context.Context) error {
dataPath, err := bw.blobStore.pm.path(uploadDataPathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
})
if err != nil {
return err
}
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!
default:
// This should be uncommon enough such that returning an error
// should be okay. At this point, the upload should be mostly
// complete, but perhaps the backend became unaccessible.
context.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
return err
}
}
return nil
}
type hashStateEntry struct {
offset int64
path string
}
// getStoredHashStates returns a slice of hashStateEntries for this upload.
func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
uploadHashStatePathPrefix, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.digester.Digest().Algorithm(),
list: true,
})
if err != nil {
return nil, err
}
paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
if err != nil {
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
return nil, err
}
// Treat PathNotFoundError as no entries.
paths = nil
}
hashStateEntries := make([]hashStateEntry, 0, len(paths))
for _, p := range paths {
pathSuffix := path.Base(p)
// The suffix should be the offset.
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
if err != nil {
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
}
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
}
return hashStateEntries, nil
}
func (bw *blobWriter) storeHashState(ctx context.Context) error {
h, ok := bw.digester.Hash().(resumable.Hash)
if !ok {
return errResumableDigestNotAvailable
}
uploadHashStatePath, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.digester.Digest().Algorithm(),
offset: int64(h.Len()),
})
if err != nil {
return err
}
hashState, err := h.State()
if err != nil {
return err
}
return bw.driver.PutContent(ctx, uploadHashStatePath, hashState)
}

View file

@ -164,11 +164,10 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
blobStore: lbs,
id: uuid,
startedAt: startedAt,
digester: digest.Canonical.New(),
bufferedFileWriter: *fw,
}
bw.setupResumableDigester()
return bw, nil
}

View file

@ -262,7 +262,7 @@ func (pm *pathMapper) path(spec pathSpec) (string, error) {
if v.list {
offset = "" // Limit to the prefix for listing offsets.
}
return path.Join(append(repoPrefix, v.name, "_uploads", v.id, "hashstates", v.alg, offset)...), nil
return path.Join(append(repoPrefix, v.name, "_uploads", v.id, "hashstates", string(v.alg), offset)...), nil
case repositoriesRootPathSpec:
return path.Join(repoPrefix...), nil
default:
@ -447,7 +447,7 @@ func (uploadStartedAtPathSpec) pathSpec() {}
type uploadHashStatePathSpec struct {
name string
id string
alg string
alg digest.Algorithm
offset int64
list bool
}
@ -479,7 +479,7 @@ func digestPathComponents(dgst digest.Digest, multilevel bool) ([]string, error)
return nil, err
}
algorithm := blobAlgorithmReplacer.Replace(dgst.Algorithm())
algorithm := blobAlgorithmReplacer.Replace(string(dgst.Algorithm()))
hex := dgst.Hex()
prefix := []string{algorithm}