From 3f479b62b4b3f1dede3badc24e9ee1d57431b07c Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 20 Nov 2014 17:49:35 -0800 Subject: [PATCH 1/3] Refactor layerReader into fileReader This change separates out the remote file reader functionality from layer reprsentation data. More importantly, issues with seeking have been fixed and thoroughly tested. --- storage/filereader.go | 163 +++++++++++++++++++++++++++++++++++++ storage/filereader_test.go | 158 +++++++++++++++++++++++++++++++++++ storage/layer.go | 4 + storage/layer_test.go | 25 ------ storage/layerreader.go | 145 +-------------------------------- storage/layerstore.go | 33 +++----- storage/layerupload.go | 4 + storage/services.go | 3 +- 8 files changed, 344 insertions(+), 191 deletions(-) create mode 100644 storage/filereader.go create mode 100644 storage/filereader_test.go diff --git a/storage/filereader.go b/storage/filereader.go new file mode 100644 index 00000000..8f1f5205 --- /dev/null +++ b/storage/filereader.go @@ -0,0 +1,163 @@ +package storage + +import ( + "bufio" + "fmt" + "io" + "os" + + "github.com/docker/docker-registry/storagedriver" +) + +// remoteFileReader provides a read seeker interface to files stored in +// storagedriver. Used to implement part of layer interface and will be used +// to implement read side of LayerUpload. +type fileReader struct { + driver storagedriver.StorageDriver + + // identifying fields + path string + size int64 // size is the total layer size, must be set. + + // mutable fields + rc io.ReadCloser // remote read closer + brd *bufio.Reader // internal buffered io + offset int64 // offset is the current read offset + err error // terminal error, if set, reader is closed +} + +func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) { + // Grab the size of the layer file, ensuring existence. + size, err := driver.CurrentSize(path) + + if err != nil { + return nil, err + } + + return &fileReader{ + driver: driver, + path: path, + size: int64(size), + }, nil +} + +func (fr *fileReader) Read(p []byte) (n int, err error) { + if fr.err != nil { + return 0, fr.err + } + + rd, err := fr.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + fr.offset += int64(n) + + // Simulate io.EOR error if we reach filesize. + if err == nil && fr.offset >= fr.size { + err = io.EOF + } + + return n, err +} + +func (fr *fileReader) Seek(offset int64, whence int) (int64, error) { + if fr.err != nil { + return 0, fr.err + } + + var err error + newOffset := fr.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(offset) + case os.SEEK_END: + newOffset = fr.size + int64(offset) + case os.SEEK_SET: + newOffset = int64(offset) + } + + if newOffset < 0 { + err = fmt.Errorf("cannot seek to negative position") + } else if newOffset > fr.size { + err = fmt.Errorf("cannot seek passed end of file") + } else { + if fr.offset != newOffset { + fr.reset() + } + + // No problems, set the offset. + fr.offset = newOffset + } + + return fr.offset, err +} + +// Close the layer. Should be called when the resource is no longer needed. +func (fr *fileReader) Close() error { + if fr.err != nil { + return fr.err + } + + fr.err = ErrLayerClosed + + // close and release reader chain + if fr.rc != nil { + fr.rc.Close() + } + + fr.rc = nil + fr.brd = nil + + return fr.err +} + +// reader prepares the current reader at the lrs offset, ensuring its buffered +// and ready to go. +func (fr *fileReader) reader() (io.Reader, error) { + if fr.err != nil { + return nil, fr.err + } + + if fr.rc != nil { + return fr.brd, nil + } + + // If we don't have a reader, open one up. + rc, err := fr.driver.ReadStream(fr.path, uint64(fr.offset)) + + if err != nil { + return nil, err + } + + fr.rc = rc + + if fr.brd == nil { + // TODO(stevvooe): Set an optimal buffer size here. We'll have to + // understand the latency characteristics of the underlying network to + // set this correctly, so we may want to leave it to the driver. For + // out of process drivers, we'll have to optimize this buffer size for + // local communication. + fr.brd = bufio.NewReader(fr.rc) + } else { + fr.brd.Reset(fr.rc) + } + + return fr.brd, nil +} + +// resetReader resets the reader, forcing the read method to open up a new +// connection and rebuild the buffered reader. This should be called when the +// offset and the reader will become out of sync, such as during a seek +// operation. +func (fr *fileReader) reset() { + if fr.err != nil { + return + } + if fr.rc != nil { + fr.rc.Close() + fr.rc = nil + } +} diff --git a/storage/filereader_test.go b/storage/filereader_test.go new file mode 100644 index 00000000..cfc9d215 --- /dev/null +++ b/storage/filereader_test.go @@ -0,0 +1,158 @@ +package storage + +import ( + "bytes" + "crypto/rand" + "io" + mrand "math/rand" + "os" + "testing" + + "github.com/docker/docker-registry/digest" + + "github.com/docker/docker-registry/storagedriver/inmemory" +) + +func TestSimpleRead(t *testing.T) { + content := make([]byte, 1<<20) + n, err := rand.Read(content) + if err != nil { + t.Fatalf("unexpected error building random data: %v", err) + } + + if n != len(content) { + t.Fatalf("random read did't fill buffer") + } + + dgst, err := digest.FromReader(bytes.NewReader(content)) + if err != nil { + t.Fatalf("unexpected error digesting random content: %v", err) + } + + driver := inmemory.New() + path := "/random" + + if err := driver.PutContent(path, content); err != nil { + t.Fatalf("error putting patterned content: %v", err) + } + + fr, err := newFileReader(driver, path) + if err != nil { + t.Fatalf("error allocating file reader: %v", err) + } + + verifier := digest.NewDigestVerifier(dgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify read data") + } +} + +func TestFileReaderSeek(t *testing.T) { + driver := inmemory.New() + pattern := "01234567890ab" // prime length block + repititions := 1024 + path := "/patterned" + content := bytes.Repeat([]byte(pattern), repititions) + + if err := driver.PutContent(path, content); err != nil { + t.Fatalf("error putting patterned content: %v", err) + } + + fr, err := newFileReader(driver, path) + + if err != nil { + t.Fatalf("unexpected error creating file reader: %v", err) + } + + // Seek all over the place, in blocks of pattern size and make sure we get + // the right data. + for _, repitition := range mrand.Perm(repititions - 1) { + targetOffset := int64(len(pattern) * repitition) + // Seek to a multiple of pattern size and read pattern size bytes + offset, err := fr.Seek(targetOffset, os.SEEK_SET) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if offset != targetOffset { + t.Fatalf("did not seek to correct offset: %d != %d", offset, targetOffset) + } + + p := make([]byte, len(pattern)) + + n, err := fr.Read(p) + if err != nil { + t.Fatalf("error reading pattern: %v", err) + } + + if n != len(pattern) { + t.Fatalf("incorrect read length: %d != %d", n, len(pattern)) + } + + if string(p) != pattern { + t.Fatalf("incorrect read content: %q != %q", p, pattern) + } + + // Check offset + current, err := fr.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("error checking current offset: %v", err) + } + + if current != targetOffset+int64(len(pattern)) { + t.Fatalf("unexpected offset after read: %v", err) + } + } + + start, err := fr.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatalf("error seeking to start: %v", err) + } + + if start != 0 { + t.Fatalf("expected to seek to start: %v != 0", start) + } + + end, err := fr.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("error checking current offset: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("expected to seek to end: %v != %v", end, len(content)) + } + + // 4. Seek past end and before start, ensure error. + + // seek before start + before, err := fr.Seek(-1, os.SEEK_SET) + if err == nil { + t.Fatalf("error expected, returned offset=%v", before) + } + + after, err := fr.Seek(int64(len(content)+1), os.SEEK_END) + if err == nil { + t.Fatalf("error expected, returned offset=%v", after) + } +} + +// TestLayerReadErrors covers the various error return type for different +// conditions that can arise when reading a layer. +func TestFileReaderErrors(t *testing.T) { + // TODO(stevvooe): We need to cover error return types, driven by the + // errors returned via the HTTP API. For now, here is a incomplete list: + // + // 1. Layer Not Found: returned when layer is not found or access is + // denied. + // 2. Layer Unavailable: returned when link references are unresolved, + // but layer is known to the registry. + // 3. Layer Invalid: This may more split into more errors, but should be + // returned when name or tarsum does not reference a valid error. We + // may also need something to communication layer verification errors + // for the inline tarsum check. + // 4. Timeout: timeouts to backend. Need to better understand these + // failure cases and how the storage driver propagates these errors + // up the stack. +} diff --git a/storage/layer.go b/storage/layer.go index 6c45f401..d2ddfb07 100644 --- a/storage/layer.go +++ b/storage/layer.go @@ -87,4 +87,8 @@ var ( // ErrLayerInvalidLength returned when length check fails. ErrLayerInvalidLength = fmt.Errorf("invalid layer length") + + // ErrLayerClosed returned when an operation is attempted on a closed + // Layer or LayerUpload. + ErrLayerClosed = fmt.Errorf("layer closed") ) diff --git a/storage/layer_test.go b/storage/layer_test.go index 335793d2..03cba9b9 100644 --- a/storage/layer_test.go +++ b/storage/layer_test.go @@ -241,31 +241,6 @@ func TestSimpleLayerRead(t *testing.T) { } } -func TestLayerReaderSeek(t *testing.T) { - // TODO(stevvooe): Ensure that all relative seeks work as advertised. - // Readers must close and re-open on command. This is important to support - // resumable and concurrent downloads via HTTP range requests. -} - -// TestLayerReadErrors covers the various error return type for different -// conditions that can arise when reading a layer. -func TestLayerReadErrors(t *testing.T) { - // TODO(stevvooe): We need to cover error return types, driven by the - // errors returned via the HTTP API. For now, here is a incomplete list: - // - // 1. Layer Not Found: returned when layer is not found or access is - // denied. - // 2. Layer Unavailable: returned when link references are unresolved, - // but layer is known to the registry. - // 3. Layer Invalid: This may more split into more errors, but should be - // returned when name or tarsum does not reference a valid error. We - // may also need something to communication layer verification errors - // for the inline tarsum check. - // 4. Timeout: timeouts to backend. Need to better understand these - // failure cases and how the storage driver propagates these errors - // up the stack. -} - // writeRandomLayer creates a random layer under name and tarSum using driver // and pathMapper. An io.ReadSeeker with the data is returned, along with the // sha256 hex digest. diff --git a/storage/layerreader.go b/storage/layerreader.go index 396940d0..2cc184fd 100644 --- a/storage/layerreader.go +++ b/storage/layerreader.go @@ -1,10 +1,6 @@ package storage import ( - "bufio" - "fmt" - "io" - "os" "time" "github.com/docker/docker-registry/digest" @@ -13,22 +9,11 @@ import ( // layerReadSeeker implements Layer and provides facilities for reading and // seeking. type layerReader struct { - layerStore *layerStore - rc io.ReadCloser - brd *bufio.Reader + fileReader name string // repo name of this layer digest digest.Digest - path string createdAt time.Time - - // offset is the current read offset - offset int64 - - // size is the total layer size, if available. - size int64 - - closedErr error // terminal error, if set, reader is closed } var _ Layer = &layerReader{} @@ -44,131 +29,3 @@ func (lrs *layerReader) Digest() digest.Digest { func (lrs *layerReader) CreatedAt() time.Time { return lrs.createdAt } - -func (lrs *layerReader) Read(p []byte) (n int, err error) { - if err := lrs.closed(); err != nil { - return 0, err - } - - rd, err := lrs.reader() - if err != nil { - return 0, err - } - - n, err = rd.Read(p) - lrs.offset += int64(n) - - // Simulate io.EOR error if we reach filesize. - if err == nil && lrs.offset >= lrs.size { - err = io.EOF - } - - // TODO(stevvooe): More error checking is required here. If the reader - // times out for some reason, we should reset the reader so we re-open the - // connection. - - return n, err -} - -func (lrs *layerReader) Seek(offset int64, whence int) (int64, error) { - if err := lrs.closed(); err != nil { - return 0, err - } - - var err error - newOffset := lrs.offset - - switch whence { - case os.SEEK_CUR: - newOffset += int64(whence) - case os.SEEK_END: - newOffset = lrs.size + int64(whence) - case os.SEEK_SET: - newOffset = int64(whence) - } - - if newOffset < 0 { - err = fmt.Errorf("cannot seek to negative position") - } else if newOffset >= lrs.size { - err = fmt.Errorf("cannot seek passed end of layer") - } else { - if lrs.offset != newOffset { - lrs.resetReader() - } - - // No problems, set the offset. - lrs.offset = newOffset - } - - return lrs.offset, err -} - -// Close the layer. Should be called when the resource is no longer needed. -func (lrs *layerReader) Close() error { - if lrs.closedErr != nil { - return lrs.closedErr - } - // TODO(sday): Must export this error. - lrs.closedErr = fmt.Errorf("layer closed") - - // close and release reader chain - if lrs.rc != nil { - lrs.rc.Close() - lrs.rc = nil - } - lrs.brd = nil - - return lrs.closedErr -} - -// reader prepares the current reader at the lrs offset, ensuring its buffered -// and ready to go. -func (lrs *layerReader) reader() (io.Reader, error) { - if err := lrs.closed(); err != nil { - return nil, err - } - - if lrs.rc != nil { - return lrs.brd, nil - } - - // If we don't have a reader, open one up. - rc, err := lrs.layerStore.driver.ReadStream(lrs.path, uint64(lrs.offset)) - - if err != nil { - return nil, err - } - - lrs.rc = rc - - if lrs.brd == nil { - // TODO(stevvooe): Set an optimal buffer size here. We'll have to - // understand the latency characteristics of the underlying network to - // set this correctly, so we may want to leave it to the driver. For - // out of process drivers, we'll have to optimize this buffer size for - // local communication. - lrs.brd = bufio.NewReader(lrs.rc) - } else { - lrs.brd.Reset(lrs.rc) - } - - return lrs.brd, nil -} - -// resetReader resets the reader, forcing the read method to open up a new -// connection and rebuild the buffered reader. This should be called when the -// offset and the reader will become out of sync, such as during a seek -// operation. -func (lrs *layerReader) resetReader() { - if err := lrs.closed(); err != nil { - return - } - if lrs.rc != nil { - lrs.rc.Close() - lrs.rc = nil - } -} - -func (lrs *layerReader) closed() error { - return lrs.closedErr -} diff --git a/storage/layerstore.go b/storage/layerstore.go index c9662ffd..6abd50e3 100644 --- a/storage/layerstore.go +++ b/storage/layerstore.go @@ -57,33 +57,26 @@ func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) { return nil, err } - // Grab the size of the layer file, ensuring that it exists, among other - // things. - size, err := ls.driver.CurrentSize(p) - + fr, err := newFileReader(ls.driver, p) if err != nil { - // TODO(stevvooe): Handle blob/path does not exist here. - // TODO(stevvooe): Get a better understanding of the error cases here - // that don't stem from unknown path. - return nil, err + switch err := err.(type) { + case storagedriver.PathNotFoundError, *storagedriver.PathNotFoundError: + return nil, ErrLayerUnknown + default: + return nil, err + } } - // Build the layer reader and return to the client. - layer := &layerReader{ - layerStore: ls, - path: p, + return &layerReader{ + fileReader: *fr, name: name, digest: digest, // TODO(stevvooe): Storage backend does not support modification time - // queries yet. Layers "never" change, so just return the zero value. - createdAt: time.Time{}, - - offset: 0, - size: int64(size), - } - - return layer, nil + // queries yet. Layers "never" change, so just return the zero value + // plus a nano-second. + createdAt: (time.Time{}).Add(time.Nanosecond), + }, nil } // Upload begins a layer upload, returning a handle. If the layer upload diff --git a/storage/layerupload.go b/storage/layerupload.go index c07927f1..f134aa19 100644 --- a/storage/layerupload.go +++ b/storage/layerupload.go @@ -429,6 +429,10 @@ func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) return lus, err } + if err := llufs.SaveState(lus); err != nil { + return lus, err + } + return lus, nil } diff --git a/storage/services.go b/storage/services.go index dbe5dc75..afb26d94 100644 --- a/storage/services.go +++ b/storage/services.go @@ -15,7 +15,6 @@ type Services struct { // NewServices creates a new Services object to access docker objects stored // in the underlying driver. func NewServices(driver storagedriver.StorageDriver) *Services { - layerUploadStore, err := newTemporaryLocalFSLayerUploadStore() if err != nil { @@ -40,5 +39,5 @@ func NewServices(driver storagedriver.StorageDriver) *Services { // may be context sensitive in the future. The instance should be used similar // to a request local. func (ss *Services) Layers() LayerService { - return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper} + return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper, uploadStore: ss.layerUploadStore} } From 195568017ad6d722a226d0dfcf0183a481cddce3 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 20 Nov 2014 19:15:09 -0800 Subject: [PATCH 2/3] Update error declarations and add missing test This updates API error codes to coincide with changes to the proposal. Mostly, redundant error codes were merged and missing ones were added. The set in the main errors.go file will flow back into the specification. A test case has been added to ensure ErrorCodeUnknown is included in marshaled json. --- errors.go | 43 +++++++++++++++++++++++++++---------------- errors_test.go | 17 +++++++++++++++-- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/errors.go b/errors.go index 9a28e5b6..e2f16ba0 100644 --- a/errors.go +++ b/errors.go @@ -17,20 +17,14 @@ const ( // The following errors can happen during a layer upload. - // ErrorCodeInvalidChecksum is returned when uploading a layer if the - // provided checksum does not match the layer contents. - ErrorCodeInvalidChecksum + // ErrorCodeInvalidDigest is returned when uploading a layer if the + // provided digest does not match the layer contents. + ErrorCodeInvalidDigest // ErrorCodeInvalidLength is returned when uploading a layer if the provided // length does not match the content length. ErrorCodeInvalidLength - // ErrorCodeInvalidTarsum is returned when the provided tarsum does not - // match the computed tarsum of the contents. - ErrorCodeInvalidTarsum - - // The following errors can happen during manifest upload. - // ErrorCodeInvalidName is returned when the name in the manifest does not // match the provided name. ErrorCodeInvalidName @@ -47,6 +41,9 @@ const ( // nonexistent layer. ErrorCodeUnknownLayer + // ErrorCodeUnknownLayerUpload is returned when an upload is accessed. + ErrorCodeUnknownLayerUpload + // ErrorCodeUntrustedSignature is returned when the manifest is signed by an // untrusted source. ErrorCodeUntrustedSignature @@ -54,25 +51,25 @@ const ( var errorCodeStrings = map[ErrorCode]string{ ErrorCodeUnknown: "UNKNOWN", - ErrorCodeInvalidChecksum: "INVALID_CHECKSUM", + ErrorCodeInvalidDigest: "INVALID_DIGEST", ErrorCodeInvalidLength: "INVALID_LENGTH", - ErrorCodeInvalidTarsum: "INVALID_TARSUM", ErrorCodeInvalidName: "INVALID_NAME", ErrorCodeInvalidTag: "INVALID_TAG", ErrorCodeUnverifiedManifest: "UNVERIFIED_MANIFEST", ErrorCodeUnknownLayer: "UNKNOWN_LAYER", + ErrorCodeUnknownLayerUpload: "UNKNOWN_LAYER_UPLOAD", ErrorCodeUntrustedSignature: "UNTRUSTED_SIGNATURE", } var errorCodesMessages = map[ErrorCode]string{ ErrorCodeUnknown: "unknown error", - ErrorCodeInvalidChecksum: "provided checksum did not match uploaded content", + ErrorCodeInvalidDigest: "provided digest did not match uploaded content", ErrorCodeInvalidLength: "provided length did not match content length", - ErrorCodeInvalidTarsum: "provided tarsum did not match binary content", ErrorCodeInvalidName: "Manifest name did not match URI", ErrorCodeInvalidTag: "Manifest tag did not match URI", ErrorCodeUnverifiedManifest: "Manifest failed signature validation", ErrorCodeUnknownLayer: "Referenced layer not available", + ErrorCodeUnknownLayerUpload: "cannot resume unknown layer upload", ErrorCodeUntrustedSignature: "Manifest signed by untrusted source", } @@ -136,7 +133,7 @@ func (ec *ErrorCode) UnmarshalText(text []byte) error { // Error provides a wrapper around ErrorCode with extra Details provided. type Error struct { - Code ErrorCode `json:"code,omitempty"` + Code ErrorCode `json:"code"` Message string `json:"message,omitempty"` Detail interface{} `json:"detail,omitempty"` } @@ -144,7 +141,7 @@ type Error struct { // Error returns a human readable representation of the error. func (e Error) Error() string { return fmt.Sprintf("%s: %s", - strings.Title(strings.Replace(e.Code.String(), "_", " ", -1)), + strings.ToLower(strings.Replace(e.Code.String(), "_", " ", -1)), e.Message) } @@ -167,6 +164,10 @@ func (errs *Errors) Push(code ErrorCode, details ...interface{}) { detail = details[0] } + if err, ok := detail.(error); ok { + detail = err.Error() + } + errs.PushErr(Error{ Code: code, Message: code.Message(), @@ -180,7 +181,7 @@ func (errs *Errors) PushErr(err error) { } func (errs *Errors) Error() string { - switch len(errs.Errors) { + switch errs.Len() { case 0: return "" case 1: @@ -194,6 +195,16 @@ func (errs *Errors) Error() string { } } +// Clear clears the errors. +func (errs *Errors) Clear() { + errs.Errors = errs.Errors[:0] +} + +// Len returns the current number of errors. +func (errs *Errors) Len() int { + return len(errs.Errors) +} + // DetailUnknownLayer provides detail for unknown layer errors, returned by // image manifest push for layers that are not yet transferred. This intended // to only be used on the backend to return detail for this specific error. diff --git a/errors_test.go b/errors_test.go index e6ec72f9..709b6ced 100644 --- a/errors_test.go +++ b/errors_test.go @@ -56,7 +56,7 @@ func TestErrorCodes(t *testing.T) { func TestErrorsManagement(t *testing.T) { var errs Errors - errs.Push(ErrorCodeInvalidChecksum) + errs.Push(ErrorCodeInvalidDigest) var detail DetailUnknownLayer detail.Unknown.BlobSum = "sometestblobsumdoesntmatter" @@ -69,7 +69,20 @@ func TestErrorsManagement(t *testing.T) { t.Fatalf("error marashaling errors: %v", err) } - expectedJSON := "{\"errors\":[{\"code\":\"INVALID_CHECKSUM\",\"message\":\"provided checksum did not match uploaded content\"},{\"code\":\"UNKNOWN_LAYER\",\"message\":\"Referenced layer not available\",\"detail\":{\"unknown\":{\"blobSum\":\"sometestblobsumdoesntmatter\"}}}]}" + expectedJSON := "{\"errors\":[{\"code\":\"INVALID_DIGEST\",\"message\":\"provided digest did not match uploaded content\"},{\"code\":\"UNKNOWN_LAYER\",\"message\":\"Referenced layer not available\",\"detail\":{\"unknown\":{\"blobSum\":\"sometestblobsumdoesntmatter\"}}}]}" + + if string(p) != expectedJSON { + t.Fatalf("unexpected json: %q != %q", string(p), expectedJSON) + } + + errs.Clear() + errs.Push(ErrorCodeUnknown) + expectedJSON = "{\"errors\":[{\"code\":\"UNKNOWN\",\"message\":\"unknown error\"}]}" + p, err = json.Marshal(errs) + + if err != nil { + t.Fatalf("error marashaling errors: %v", err) + } if string(p) != expectedJSON { t.Fatalf("unexpected json: %q != %q", string(p), expectedJSON) From e158e3cd65e4236d758883477fab83fa4fbc7ca9 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 20 Nov 2014 19:57:01 -0800 Subject: [PATCH 3/3] Initial implementation of Layer API The http API has its first set of endpoints to implement the core aspects of fetching and uploading layers. Uploads can be started and completed in a single chunk and the content can be fetched via tarsum. Most proposed error conditions should be represented but edge cases likely remain. In this version, note that the layers are still called layers, even though the routes are pointing to blobs. This will change with backend refactoring over the next few weeks. The unit tests are a bit of a shamble but these need to be carefully written along with the core specification process. As the the client-server interaction solidifies, we can port this into a verification suite for registry providers. --- api_test.go | 236 +++++++++++++++++++++++++++++++++++++++++++++++++ app.go | 49 +++++++++- context.go | 9 +- helpers.go | 21 +++++ layer.go | 54 +++++++++-- layerupload.go | 191 +++++++++++++++++++++++++++++++++++---- 6 files changed, 528 insertions(+), 32 deletions(-) create mode 100644 api_test.go diff --git a/api_test.go b/api_test.go new file mode 100644 index 00000000..c850f141 --- /dev/null +++ b/api_test.go @@ -0,0 +1,236 @@ +package registry + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/http/httputil" + "net/url" + "os" + "testing" + + "github.com/Sirupsen/logrus" + _ "github.com/docker/docker-registry/storagedriver/inmemory" + + "github.com/gorilla/handlers" + + "github.com/docker/docker-registry/common/testutil" + "github.com/docker/docker-registry/configuration" + "github.com/docker/docker-registry/digest" +) + +// TestLayerAPI conducts a full of the of the layer api. +func TestLayerAPI(t *testing.T) { + // TODO(stevvooe): This test code is complete junk but it should cover the + // complete flow. This must be broken down and checked against the + // specification *before* we submit the final to docker core. + + config := configuration.Configuration{ + Storage: configuration.Storage{ + "inmemory": configuration.Parameters{}, + }, + } + + app := NewApp(config) + server := httptest.NewServer(handlers.CombinedLoggingHandler(os.Stderr, app)) + router := v2APIRouter() + + u, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("error parsing server url: %v", err) + } + + imageName := "foo/bar" + // "build" our layer file + layerFile, tarSumStr, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("error creating random layer file: %v", err) + } + + layerDigest := digest.Digest(tarSumStr) + + // ----------------------------------- + // Test fetch for non-existent content + r, err := router.GetRoute(routeNameBlob).Host(u.Host). + URL("name", imageName, + "digest", tarSumStr) + + resp, err := http.Get(r.String()) + if err != nil { + t.Fatalf("unexpected error fetching non-existent layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusNotFound: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status fetching non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status fetching non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + // ------------------------------------------ + // Test head request for non-existent content + resp, err = http.Head(r.String()) + if err != nil { + t.Fatalf("unexpected error checking head on non-existent layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusNotFound: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status checking head on non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status checking head on non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + // ------------------------------------------ + // Upload a layer + r, err = router.GetRoute(routeNameBlobUpload).Host(u.Host). + URL("name", imageName) + if err != nil { + t.Fatalf("error starting layer upload: %v", err) + } + + resp, err = http.Post(r.String(), "", nil) + if err != nil { + t.Fatalf("error starting layer upload: %v", err) + } + + if resp.StatusCode != http.StatusAccepted { + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status starting layer upload: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status starting layer upload: %v, %v", resp.StatusCode, resp.Status) + } + + if resp.Header.Get("Location") == "" { // TODO(stevvooe): Need better check here. + t.Fatalf("unexpected Location: %q != %q", resp.Header.Get("Location"), "foo") + } + + if resp.Header.Get("Content-Length") != "0" { + t.Fatalf("unexpected content-length: %q != %q", resp.Header.Get("Content-Length"), "0") + } + + layerLength, _ := layerFile.Seek(0, os.SEEK_END) + layerFile.Seek(0, os.SEEK_SET) + + uploadURLStr := resp.Header.Get("Location") + + // TODO(sday): Cancel the layer upload here and restart. + + query := url.Values{ + "digest": []string{layerDigest.String()}, + "length": []string{fmt.Sprint(layerLength)}, + } + + uploadURL, err := url.Parse(uploadURLStr) + if err != nil { + t.Fatalf("unexpected error parsing url: %v", err) + } + + uploadURL.RawQuery = query.Encode() + + // Just do a monolithic upload + req, err := http.NewRequest("PUT", uploadURL.String(), layerFile) + if err != nil { + t.Fatalf("unexpected error creating new request: %v", err) + } + + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("unexpected error doing put: %v", err) + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusCreated: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status putting chunk: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status putting chunk: %v, %v", resp.StatusCode, resp.Status) + } + + if resp.Header.Get("Location") == "" { + t.Fatalf("unexpected Location: %q", resp.Header.Get("Location")) + } + + if resp.Header.Get("Content-Length") != "0" { + t.Fatalf("unexpected content-length: %q != %q", resp.Header.Get("Content-Length"), "0") + } + + layerURL := resp.Header.Get("Location") + + // ------------------------ + // Use a head request to see if the layer exists. + resp, err = http.Head(layerURL) + if err != nil { + t.Fatalf("unexpected error checking head on non-existent layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusOK: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status checking head on layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status checking head on layer: %v, %v", resp.StatusCode, resp.Status) + } + + logrus.Infof("fetch the layer") + // ---------------- + // Fetch the layer! + resp, err = http.Get(layerURL) + if err != nil { + t.Fatalf("unexpected error fetching layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusOK: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status fetching layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status fetching layer: %v, %v", resp.StatusCode, resp.Status) + } + + // Verify the body + verifier := digest.NewDigestVerifier(layerDigest) + io.Copy(verifier, resp.Body) + + if !verifier.Verified() { + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status checking head on layer ayo!: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("response body did not pass verification") + } +} diff --git a/app.go b/app.go index bc7df554..25bf572d 100644 --- a/app.go +++ b/app.go @@ -3,7 +3,11 @@ package registry import ( "net/http" + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/factory" + "github.com/docker/docker-registry/configuration" + "github.com/docker/docker-registry/storage" log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" @@ -16,6 +20,12 @@ type App struct { Config configuration.Configuration router *mux.Router + + // driver maintains the app global storage driver instance. + driver storagedriver.StorageDriver + + // services contains the main services instance for the application. + services *storage.Services } // NewApp takes a configuration and returns a configured app, ready to serve @@ -29,11 +39,23 @@ func NewApp(configuration configuration.Configuration) *App { // Register the handler dispatchers. app.register(routeNameImageManifest, imageManifestDispatcher) - app.register(routeNameBlob, layerDispatcher) app.register(routeNameTags, tagsDispatcher) + app.register(routeNameBlob, layerDispatcher) app.register(routeNameBlobUpload, layerUploadDispatcher) app.register(routeNameBlobUploadResume, layerUploadDispatcher) + driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) + + if err != nil { + // TODO(stevvooe): Move the creation of a service into a protected + // method, where this is created lazily. Its status can be queried via + // a health check. + panic(err) + } + + app.driver = driver + app.services = storage.NewServices(app.driver) + return app } @@ -64,6 +86,22 @@ type dispatchFunc func(ctx *Context, r *http.Request) http.Handler // TODO(stevvooe): dispatchers should probably have some validation error // chain with proper error reporting. +// singleStatusResponseWriter only allows the first status to be written to be +// the valid request status. The current use case of this class should be +// factored out. +type singleStatusResponseWriter struct { + http.ResponseWriter + status int +} + +func (ssrw *singleStatusResponseWriter) WriteHeader(status int) { + if ssrw.status != 0 { + return + } + ssrw.status = status + ssrw.ResponseWriter.WriteHeader(status) +} + // dispatcher returns a handler that constructs a request specific context and // handler, using the dispatch factory function. func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { @@ -80,14 +118,17 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { context.log = log.WithField("name", context.Name) handler := dispatch(context, r) + ssrw := &singleStatusResponseWriter{ResponseWriter: w} context.log.Infoln("handler", resolveHandlerName(r.Method, handler)) - handler.ServeHTTP(w, r) + handler.ServeHTTP(ssrw, r) // Automated error response handling here. Handlers may return their // own errors if they need different behavior (such as range errors // for layer upload). - if len(context.Errors.Errors) > 0 { - w.WriteHeader(http.StatusBadRequest) + if context.Errors.Len() > 0 { + if ssrw.status == 0 { + w.WriteHeader(http.StatusBadRequest) + } serveJSON(w, context.Errors) } }) diff --git a/context.go b/context.go index a5706b4e..c246d6ac 100644 --- a/context.go +++ b/context.go @@ -1,8 +1,6 @@ package registry -import ( - "github.com/Sirupsen/logrus" -) +import "github.com/Sirupsen/logrus" // Context should contain the request specific context for use in across // handlers. Resources that don't need to be shared across handlers should not @@ -20,11 +18,6 @@ type Context struct { // handler *must not* start the response via http.ResponseWriter. Errors Errors - // TODO(stevvooe): Context would be a good place to create a - // representation of the "authorized resource". Perhaps, rather than - // having fields like "name", the context should be a set of parameters - // then we do routing from there. - // vars contains the extracted gorilla/mux variables that can be used for // assignment. vars map[string]string diff --git a/helpers.go b/helpers.go index b3b9d744..7714d029 100644 --- a/helpers.go +++ b/helpers.go @@ -2,7 +2,10 @@ package registry import ( "encoding/json" + "io" "net/http" + + "github.com/gorilla/mux" ) // serveJSON marshals v and sets the content-type header to @@ -18,3 +21,21 @@ func serveJSON(w http.ResponseWriter, v interface{}) error { return nil } + +// closeResources closes all the provided resources after running the target +// handler. +func closeResources(handler http.Handler, closers ...io.Closer) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for _, closer := range closers { + defer closer.Close() + } + handler.ServeHTTP(w, r) + }) +} + +// clondedRoute returns a clone of the named route from the router. +func clonedRoute(router *mux.Router, name string) *mux.Route { + route := new(mux.Route) + *route = *router.GetRoute(name) // clone the route + return route +} diff --git a/layer.go b/layer.go index 82a1e6d9..38fdfe39 100644 --- a/layer.go +++ b/layer.go @@ -3,17 +3,28 @@ package registry import ( "net/http" + "github.com/docker/docker-registry/digest" + "github.com/docker/docker-registry/storage" "github.com/gorilla/handlers" + "github.com/gorilla/mux" ) // layerDispatcher uses the request context to build a layerHandler. func layerDispatcher(ctx *Context, r *http.Request) http.Handler { - layerHandler := &layerHandler{ - Context: ctx, - TarSum: ctx.vars["tarsum"], + dgst, err := digest.ParseDigest(ctx.vars["digest"]) + + if err != nil { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx.Errors.Push(ErrorCodeInvalidDigest, err) + }) } - layerHandler.log = layerHandler.log.WithField("tarsum", layerHandler.TarSum) + layerHandler := &layerHandler{ + Context: ctx, + Digest: dgst, + } + + layerHandler.log = layerHandler.log.WithField("digest", dgst) return handlers.MethodHandler{ "GET": http.HandlerFunc(layerHandler.GetLayer), @@ -25,11 +36,44 @@ func layerDispatcher(ctx *Context, r *http.Request) http.Handler { type layerHandler struct { *Context - TarSum string + Digest digest.Digest } // GetLayer fetches the binary data from backend storage returns it in the // response. func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) { + layers := lh.services.Layers() + layer, err := layers.Fetch(lh.Name, lh.Digest) + + if err != nil { + switch err { + case storage.ErrLayerUnknown: + w.WriteHeader(http.StatusNotFound) + lh.Errors.Push(ErrorCodeUnknownLayer, + map[string]interface{}{ + "unknown": FSLayer{BlobSum: lh.Digest}, + }) + return + default: + lh.Errors.Push(ErrorCodeUnknown, err) + return + } + } + defer layer.Close() + + http.ServeContent(w, r, layer.Digest().String(), layer.CreatedAt(), layer) +} + +func buildLayerURL(router *mux.Router, r *http.Request, layer storage.Layer) (string, error) { + route := clonedRoute(router, routeNameBlob) + + layerURL, err := route.Schemes(r.URL.Scheme).Host(r.Host). + URL("name", layer.Name(), + "digest", layer.Digest().String()) + if err != nil { + return "", err + } + + return layerURL.String(), nil } diff --git a/layerupload.go b/layerupload.go index 8916b552..d1ec4206 100644 --- a/layerupload.go +++ b/layerupload.go @@ -1,64 +1,225 @@ package registry import ( + "fmt" + "io" "net/http" + "strconv" + "github.com/Sirupsen/logrus" + "github.com/docker/docker-registry/digest" + "github.com/docker/docker-registry/storage" "github.com/gorilla/handlers" + "github.com/gorilla/mux" ) // layerUploadDispatcher constructs and returns the layer upload handler for // the given request context. func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler { - layerUploadHandler := &layerUploadHandler{ + luh := &layerUploadHandler{ Context: ctx, - TarSum: ctx.vars["tarsum"], UUID: ctx.vars["uuid"], } - layerUploadHandler.log = layerUploadHandler.log.WithField("tarsum", layerUploadHandler.TarSum) + handler := http.Handler(handlers.MethodHandler{ + "POST": http.HandlerFunc(luh.StartLayerUpload), + "GET": http.HandlerFunc(luh.GetUploadStatus), + "HEAD": http.HandlerFunc(luh.GetUploadStatus), + "PUT": http.HandlerFunc(luh.PutLayerChunk), + "DELETE": http.HandlerFunc(luh.CancelLayerUpload), + }) - if layerUploadHandler.UUID != "" { - layerUploadHandler.log = layerUploadHandler.log.WithField("uuid", layerUploadHandler.UUID) + if luh.UUID != "" { + luh.log = luh.log.WithField("uuid", luh.UUID) + + layers := ctx.services.Layers() + upload, err := layers.Resume(luh.UUID) + + if err != nil && err != storage.ErrLayerUploadUnknown { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logrus.Infof("error resolving upload: %v", err) + w.WriteHeader(http.StatusInternalServerError) + luh.Errors.Push(ErrorCodeUnknown, err) + }) + } + + luh.Upload = upload + handler = closeResources(handler, luh.Upload) } - return handlers.MethodHandler{ - "POST": http.HandlerFunc(layerUploadHandler.StartLayerUpload), - "GET": http.HandlerFunc(layerUploadHandler.GetUploadStatus), - "HEAD": http.HandlerFunc(layerUploadHandler.GetUploadStatus), - "PUT": http.HandlerFunc(layerUploadHandler.PutLayerChunk), - "DELETE": http.HandlerFunc(layerUploadHandler.CancelLayerUpload), - } + return handler } // layerUploadHandler handles the http layer upload process. type layerUploadHandler struct { *Context - // TarSum is the unique identifier of the layer being uploaded. - TarSum string - // UUID identifies the upload instance for the current request. UUID string + + Upload storage.LayerUpload } // StartLayerUpload begins the layer upload process and allocates a server- // side upload session. func (luh *layerUploadHandler) StartLayerUpload(w http.ResponseWriter, r *http.Request) { + layers := luh.services.Layers() + upload, err := layers.Upload(luh.Name) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + luh.Upload = upload + defer luh.Upload.Close() + + if err := luh.layerUploadResponse(w, r); err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + w.WriteHeader(http.StatusAccepted) } // GetUploadStatus returns the status of a given upload, identified by uuid. func (luh *layerUploadHandler) GetUploadStatus(w http.ResponseWriter, r *http.Request) { + if luh.Upload == nil { + w.WriteHeader(http.StatusNotFound) + luh.Errors.Push(ErrorCodeUnknownLayerUpload) + } + if err := luh.layerUploadResponse(w, r); err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + + w.WriteHeader(http.StatusNoContent) } // PutLayerChunk receives a layer chunk during the layer upload process, // possible completing the upload with a checksum and length. func (luh *layerUploadHandler) PutLayerChunk(w http.ResponseWriter, r *http.Request) { + if luh.Upload == nil { + w.WriteHeader(http.StatusNotFound) + luh.Errors.Push(ErrorCodeUnknownLayerUpload) + } + var finished bool + + // TODO(stevvooe): This is woefully incomplete. Missing stuff: + // + // 1. Extract information from range header, if present. + // 2. Check offset of current layer. + // 3. Emit correct error responses. + + // Read in the chunk + io.Copy(luh.Upload, r.Body) + + if err := luh.maybeCompleteUpload(w, r); err != nil { + if err != errNotReadyToComplete { + w.WriteHeader(http.StatusInternalServerError) + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + } + + if err := luh.layerUploadResponse(w, r); err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + + if finished { + w.WriteHeader(http.StatusCreated) + } else { + w.WriteHeader(http.StatusAccepted) + } } // CancelLayerUpload cancels an in-progress upload of a layer. func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http.Request) { + if luh.Upload == nil { + w.WriteHeader(http.StatusNotFound) + luh.Errors.Push(ErrorCodeUnknownLayerUpload) + } } + +// layerUploadResponse provides a standard request for uploading layers and +// chunk responses. This sets the correct headers but the response status is +// left to the caller. +func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error { + uploadURL, err := buildLayerUploadURL(luh.router, r, luh.Upload) + if err != nil { + logrus.Infof("error building upload url: %s", err) + return err + } + + w.Header().Set("Location", uploadURL) + w.Header().Set("Content-Length", "0") + w.Header().Set("Range", fmt.Sprintf("0-%d", luh.Upload.Offset())) + + return nil +} + +var errNotReadyToComplete = fmt.Errorf("not ready to complete upload") + +// maybeCompleteUpload tries to complete the upload if the correct parameters +// are available. Returns errNotReadyToComplete if not ready to complete. +func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *http.Request) error { + // If we get a digest and length, we can finish the upload. + dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters! + sizeStr := r.FormValue("length") + + if dgstStr == "" || sizeStr == "" { + return errNotReadyToComplete + } + + dgst, err := digest.ParseDigest(dgstStr) + if err != nil { + return err + } + + size, err := strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + return err + } + + luh.completeUpload(w, r, size, dgst) + return nil +} + +// completeUpload finishes out the upload with the correct response. +func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, size int64, dgst digest.Digest) { + layer, err := luh.Upload.Finish(size, dgst) + if err != nil { + luh.Errors.Push(ErrorCodeUnknown, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + layerURL, err := buildLayerURL(luh.router, r, layer) + if err != nil { + luh.Errors.Push(ErrorCodeUnknown, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Set("Location", layerURL) + w.Header().Set("Content-Length", "0") + w.WriteHeader(http.StatusCreated) +} + +func buildLayerUploadURL(router *mux.Router, r *http.Request, upload storage.LayerUpload) (string, error) { + route := clonedRoute(router, routeNameBlobUploadResume) + + uploadURL, err := route.Schemes(r.URL.Scheme).Host(r.Host). + URL("name", upload.Name(), "uuid", upload.UUID()) + if err != nil { + return "", err + } + + return uploadURL.String(), nil +}