From e158e3cd65e4236d758883477fab83fa4fbc7ca9 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 20 Nov 2014 19:57:01 -0800 Subject: [PATCH] Initial implementation of Layer API The http API has its first set of endpoints to implement the core aspects of fetching and uploading layers. Uploads can be started and completed in a single chunk and the content can be fetched via tarsum. Most proposed error conditions should be represented but edge cases likely remain. In this version, note that the layers are still called layers, even though the routes are pointing to blobs. This will change with backend refactoring over the next few weeks. The unit tests are a bit of a shamble but these need to be carefully written along with the core specification process. As the the client-server interaction solidifies, we can port this into a verification suite for registry providers. --- api_test.go | 236 +++++++++++++++++++++++++++++++++++++++++++++++++ app.go | 49 +++++++++- context.go | 9 +- helpers.go | 21 +++++ layer.go | 54 +++++++++-- layerupload.go | 191 +++++++++++++++++++++++++++++++++++---- 6 files changed, 528 insertions(+), 32 deletions(-) create mode 100644 api_test.go diff --git a/api_test.go b/api_test.go new file mode 100644 index 000000000..c850f141c --- /dev/null +++ b/api_test.go @@ -0,0 +1,236 @@ +package registry + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/http/httputil" + "net/url" + "os" + "testing" + + "github.com/Sirupsen/logrus" + _ "github.com/docker/docker-registry/storagedriver/inmemory" + + "github.com/gorilla/handlers" + + "github.com/docker/docker-registry/common/testutil" + "github.com/docker/docker-registry/configuration" + "github.com/docker/docker-registry/digest" +) + +// TestLayerAPI conducts a full of the of the layer api. +func TestLayerAPI(t *testing.T) { + // TODO(stevvooe): This test code is complete junk but it should cover the + // complete flow. This must be broken down and checked against the + // specification *before* we submit the final to docker core. + + config := configuration.Configuration{ + Storage: configuration.Storage{ + "inmemory": configuration.Parameters{}, + }, + } + + app := NewApp(config) + server := httptest.NewServer(handlers.CombinedLoggingHandler(os.Stderr, app)) + router := v2APIRouter() + + u, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("error parsing server url: %v", err) + } + + imageName := "foo/bar" + // "build" our layer file + layerFile, tarSumStr, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("error creating random layer file: %v", err) + } + + layerDigest := digest.Digest(tarSumStr) + + // ----------------------------------- + // Test fetch for non-existent content + r, err := router.GetRoute(routeNameBlob).Host(u.Host). + URL("name", imageName, + "digest", tarSumStr) + + resp, err := http.Get(r.String()) + if err != nil { + t.Fatalf("unexpected error fetching non-existent layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusNotFound: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status fetching non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status fetching non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + // ------------------------------------------ + // Test head request for non-existent content + resp, err = http.Head(r.String()) + if err != nil { + t.Fatalf("unexpected error checking head on non-existent layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusNotFound: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status checking head on non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status checking head on non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + // ------------------------------------------ + // Upload a layer + r, err = router.GetRoute(routeNameBlobUpload).Host(u.Host). + URL("name", imageName) + if err != nil { + t.Fatalf("error starting layer upload: %v", err) + } + + resp, err = http.Post(r.String(), "", nil) + if err != nil { + t.Fatalf("error starting layer upload: %v", err) + } + + if resp.StatusCode != http.StatusAccepted { + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status starting layer upload: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status starting layer upload: %v, %v", resp.StatusCode, resp.Status) + } + + if resp.Header.Get("Location") == "" { // TODO(stevvooe): Need better check here. + t.Fatalf("unexpected Location: %q != %q", resp.Header.Get("Location"), "foo") + } + + if resp.Header.Get("Content-Length") != "0" { + t.Fatalf("unexpected content-length: %q != %q", resp.Header.Get("Content-Length"), "0") + } + + layerLength, _ := layerFile.Seek(0, os.SEEK_END) + layerFile.Seek(0, os.SEEK_SET) + + uploadURLStr := resp.Header.Get("Location") + + // TODO(sday): Cancel the layer upload here and restart. + + query := url.Values{ + "digest": []string{layerDigest.String()}, + "length": []string{fmt.Sprint(layerLength)}, + } + + uploadURL, err := url.Parse(uploadURLStr) + if err != nil { + t.Fatalf("unexpected error parsing url: %v", err) + } + + uploadURL.RawQuery = query.Encode() + + // Just do a monolithic upload + req, err := http.NewRequest("PUT", uploadURL.String(), layerFile) + if err != nil { + t.Fatalf("unexpected error creating new request: %v", err) + } + + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("unexpected error doing put: %v", err) + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusCreated: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status putting chunk: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status putting chunk: %v, %v", resp.StatusCode, resp.Status) + } + + if resp.Header.Get("Location") == "" { + t.Fatalf("unexpected Location: %q", resp.Header.Get("Location")) + } + + if resp.Header.Get("Content-Length") != "0" { + t.Fatalf("unexpected content-length: %q != %q", resp.Header.Get("Content-Length"), "0") + } + + layerURL := resp.Header.Get("Location") + + // ------------------------ + // Use a head request to see if the layer exists. + resp, err = http.Head(layerURL) + if err != nil { + t.Fatalf("unexpected error checking head on non-existent layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusOK: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status checking head on layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status checking head on layer: %v, %v", resp.StatusCode, resp.Status) + } + + logrus.Infof("fetch the layer") + // ---------------- + // Fetch the layer! + resp, err = http.Get(layerURL) + if err != nil { + t.Fatalf("unexpected error fetching layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusOK: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status fetching layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status fetching layer: %v, %v", resp.StatusCode, resp.Status) + } + + // Verify the body + verifier := digest.NewDigestVerifier(layerDigest) + io.Copy(verifier, resp.Body) + + if !verifier.Verified() { + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status checking head on layer ayo!: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("response body did not pass verification") + } +} diff --git a/app.go b/app.go index bc7df554e..25bf572da 100644 --- a/app.go +++ b/app.go @@ -3,7 +3,11 @@ package registry import ( "net/http" + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/factory" + "github.com/docker/docker-registry/configuration" + "github.com/docker/docker-registry/storage" log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" @@ -16,6 +20,12 @@ type App struct { Config configuration.Configuration router *mux.Router + + // driver maintains the app global storage driver instance. + driver storagedriver.StorageDriver + + // services contains the main services instance for the application. + services *storage.Services } // NewApp takes a configuration and returns a configured app, ready to serve @@ -29,11 +39,23 @@ func NewApp(configuration configuration.Configuration) *App { // Register the handler dispatchers. app.register(routeNameImageManifest, imageManifestDispatcher) - app.register(routeNameBlob, layerDispatcher) app.register(routeNameTags, tagsDispatcher) + app.register(routeNameBlob, layerDispatcher) app.register(routeNameBlobUpload, layerUploadDispatcher) app.register(routeNameBlobUploadResume, layerUploadDispatcher) + driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) + + if err != nil { + // TODO(stevvooe): Move the creation of a service into a protected + // method, where this is created lazily. Its status can be queried via + // a health check. + panic(err) + } + + app.driver = driver + app.services = storage.NewServices(app.driver) + return app } @@ -64,6 +86,22 @@ type dispatchFunc func(ctx *Context, r *http.Request) http.Handler // TODO(stevvooe): dispatchers should probably have some validation error // chain with proper error reporting. +// singleStatusResponseWriter only allows the first status to be written to be +// the valid request status. The current use case of this class should be +// factored out. +type singleStatusResponseWriter struct { + http.ResponseWriter + status int +} + +func (ssrw *singleStatusResponseWriter) WriteHeader(status int) { + if ssrw.status != 0 { + return + } + ssrw.status = status + ssrw.ResponseWriter.WriteHeader(status) +} + // dispatcher returns a handler that constructs a request specific context and // handler, using the dispatch factory function. func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { @@ -80,14 +118,17 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { context.log = log.WithField("name", context.Name) handler := dispatch(context, r) + ssrw := &singleStatusResponseWriter{ResponseWriter: w} context.log.Infoln("handler", resolveHandlerName(r.Method, handler)) - handler.ServeHTTP(w, r) + handler.ServeHTTP(ssrw, r) // Automated error response handling here. Handlers may return their // own errors if they need different behavior (such as range errors // for layer upload). - if len(context.Errors.Errors) > 0 { - w.WriteHeader(http.StatusBadRequest) + if context.Errors.Len() > 0 { + if ssrw.status == 0 { + w.WriteHeader(http.StatusBadRequest) + } serveJSON(w, context.Errors) } }) diff --git a/context.go b/context.go index a5706b4e1..c246d6ac1 100644 --- a/context.go +++ b/context.go @@ -1,8 +1,6 @@ package registry -import ( - "github.com/Sirupsen/logrus" -) +import "github.com/Sirupsen/logrus" // Context should contain the request specific context for use in across // handlers. Resources that don't need to be shared across handlers should not @@ -20,11 +18,6 @@ type Context struct { // handler *must not* start the response via http.ResponseWriter. Errors Errors - // TODO(stevvooe): Context would be a good place to create a - // representation of the "authorized resource". Perhaps, rather than - // having fields like "name", the context should be a set of parameters - // then we do routing from there. - // vars contains the extracted gorilla/mux variables that can be used for // assignment. vars map[string]string diff --git a/helpers.go b/helpers.go index b3b9d7446..7714d0297 100644 --- a/helpers.go +++ b/helpers.go @@ -2,7 +2,10 @@ package registry import ( "encoding/json" + "io" "net/http" + + "github.com/gorilla/mux" ) // serveJSON marshals v and sets the content-type header to @@ -18,3 +21,21 @@ func serveJSON(w http.ResponseWriter, v interface{}) error { return nil } + +// closeResources closes all the provided resources after running the target +// handler. +func closeResources(handler http.Handler, closers ...io.Closer) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for _, closer := range closers { + defer closer.Close() + } + handler.ServeHTTP(w, r) + }) +} + +// clondedRoute returns a clone of the named route from the router. +func clonedRoute(router *mux.Router, name string) *mux.Route { + route := new(mux.Route) + *route = *router.GetRoute(name) // clone the route + return route +} diff --git a/layer.go b/layer.go index 82a1e6d97..38fdfe39b 100644 --- a/layer.go +++ b/layer.go @@ -3,17 +3,28 @@ package registry import ( "net/http" + "github.com/docker/docker-registry/digest" + "github.com/docker/docker-registry/storage" "github.com/gorilla/handlers" + "github.com/gorilla/mux" ) // layerDispatcher uses the request context to build a layerHandler. func layerDispatcher(ctx *Context, r *http.Request) http.Handler { - layerHandler := &layerHandler{ - Context: ctx, - TarSum: ctx.vars["tarsum"], + dgst, err := digest.ParseDigest(ctx.vars["digest"]) + + if err != nil { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx.Errors.Push(ErrorCodeInvalidDigest, err) + }) } - layerHandler.log = layerHandler.log.WithField("tarsum", layerHandler.TarSum) + layerHandler := &layerHandler{ + Context: ctx, + Digest: dgst, + } + + layerHandler.log = layerHandler.log.WithField("digest", dgst) return handlers.MethodHandler{ "GET": http.HandlerFunc(layerHandler.GetLayer), @@ -25,11 +36,44 @@ func layerDispatcher(ctx *Context, r *http.Request) http.Handler { type layerHandler struct { *Context - TarSum string + Digest digest.Digest } // GetLayer fetches the binary data from backend storage returns it in the // response. func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) { + layers := lh.services.Layers() + layer, err := layers.Fetch(lh.Name, lh.Digest) + + if err != nil { + switch err { + case storage.ErrLayerUnknown: + w.WriteHeader(http.StatusNotFound) + lh.Errors.Push(ErrorCodeUnknownLayer, + map[string]interface{}{ + "unknown": FSLayer{BlobSum: lh.Digest}, + }) + return + default: + lh.Errors.Push(ErrorCodeUnknown, err) + return + } + } + defer layer.Close() + + http.ServeContent(w, r, layer.Digest().String(), layer.CreatedAt(), layer) +} + +func buildLayerURL(router *mux.Router, r *http.Request, layer storage.Layer) (string, error) { + route := clonedRoute(router, routeNameBlob) + + layerURL, err := route.Schemes(r.URL.Scheme).Host(r.Host). + URL("name", layer.Name(), + "digest", layer.Digest().String()) + if err != nil { + return "", err + } + + return layerURL.String(), nil } diff --git a/layerupload.go b/layerupload.go index 8916b5524..d1ec4206c 100644 --- a/layerupload.go +++ b/layerupload.go @@ -1,64 +1,225 @@ package registry import ( + "fmt" + "io" "net/http" + "strconv" + "github.com/Sirupsen/logrus" + "github.com/docker/docker-registry/digest" + "github.com/docker/docker-registry/storage" "github.com/gorilla/handlers" + "github.com/gorilla/mux" ) // layerUploadDispatcher constructs and returns the layer upload handler for // the given request context. func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler { - layerUploadHandler := &layerUploadHandler{ + luh := &layerUploadHandler{ Context: ctx, - TarSum: ctx.vars["tarsum"], UUID: ctx.vars["uuid"], } - layerUploadHandler.log = layerUploadHandler.log.WithField("tarsum", layerUploadHandler.TarSum) + handler := http.Handler(handlers.MethodHandler{ + "POST": http.HandlerFunc(luh.StartLayerUpload), + "GET": http.HandlerFunc(luh.GetUploadStatus), + "HEAD": http.HandlerFunc(luh.GetUploadStatus), + "PUT": http.HandlerFunc(luh.PutLayerChunk), + "DELETE": http.HandlerFunc(luh.CancelLayerUpload), + }) - if layerUploadHandler.UUID != "" { - layerUploadHandler.log = layerUploadHandler.log.WithField("uuid", layerUploadHandler.UUID) + if luh.UUID != "" { + luh.log = luh.log.WithField("uuid", luh.UUID) + + layers := ctx.services.Layers() + upload, err := layers.Resume(luh.UUID) + + if err != nil && err != storage.ErrLayerUploadUnknown { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logrus.Infof("error resolving upload: %v", err) + w.WriteHeader(http.StatusInternalServerError) + luh.Errors.Push(ErrorCodeUnknown, err) + }) + } + + luh.Upload = upload + handler = closeResources(handler, luh.Upload) } - return handlers.MethodHandler{ - "POST": http.HandlerFunc(layerUploadHandler.StartLayerUpload), - "GET": http.HandlerFunc(layerUploadHandler.GetUploadStatus), - "HEAD": http.HandlerFunc(layerUploadHandler.GetUploadStatus), - "PUT": http.HandlerFunc(layerUploadHandler.PutLayerChunk), - "DELETE": http.HandlerFunc(layerUploadHandler.CancelLayerUpload), - } + return handler } // layerUploadHandler handles the http layer upload process. type layerUploadHandler struct { *Context - // TarSum is the unique identifier of the layer being uploaded. - TarSum string - // UUID identifies the upload instance for the current request. UUID string + + Upload storage.LayerUpload } // StartLayerUpload begins the layer upload process and allocates a server- // side upload session. func (luh *layerUploadHandler) StartLayerUpload(w http.ResponseWriter, r *http.Request) { + layers := luh.services.Layers() + upload, err := layers.Upload(luh.Name) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + luh.Upload = upload + defer luh.Upload.Close() + + if err := luh.layerUploadResponse(w, r); err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + w.WriteHeader(http.StatusAccepted) } // GetUploadStatus returns the status of a given upload, identified by uuid. func (luh *layerUploadHandler) GetUploadStatus(w http.ResponseWriter, r *http.Request) { + if luh.Upload == nil { + w.WriteHeader(http.StatusNotFound) + luh.Errors.Push(ErrorCodeUnknownLayerUpload) + } + if err := luh.layerUploadResponse(w, r); err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + + w.WriteHeader(http.StatusNoContent) } // PutLayerChunk receives a layer chunk during the layer upload process, // possible completing the upload with a checksum and length. func (luh *layerUploadHandler) PutLayerChunk(w http.ResponseWriter, r *http.Request) { + if luh.Upload == nil { + w.WriteHeader(http.StatusNotFound) + luh.Errors.Push(ErrorCodeUnknownLayerUpload) + } + var finished bool + + // TODO(stevvooe): This is woefully incomplete. Missing stuff: + // + // 1. Extract information from range header, if present. + // 2. Check offset of current layer. + // 3. Emit correct error responses. + + // Read in the chunk + io.Copy(luh.Upload, r.Body) + + if err := luh.maybeCompleteUpload(w, r); err != nil { + if err != errNotReadyToComplete { + w.WriteHeader(http.StatusInternalServerError) + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + } + + if err := luh.layerUploadResponse(w, r); err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + + if finished { + w.WriteHeader(http.StatusCreated) + } else { + w.WriteHeader(http.StatusAccepted) + } } // CancelLayerUpload cancels an in-progress upload of a layer. func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http.Request) { + if luh.Upload == nil { + w.WriteHeader(http.StatusNotFound) + luh.Errors.Push(ErrorCodeUnknownLayerUpload) + } } + +// layerUploadResponse provides a standard request for uploading layers and +// chunk responses. This sets the correct headers but the response status is +// left to the caller. +func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error { + uploadURL, err := buildLayerUploadURL(luh.router, r, luh.Upload) + if err != nil { + logrus.Infof("error building upload url: %s", err) + return err + } + + w.Header().Set("Location", uploadURL) + w.Header().Set("Content-Length", "0") + w.Header().Set("Range", fmt.Sprintf("0-%d", luh.Upload.Offset())) + + return nil +} + +var errNotReadyToComplete = fmt.Errorf("not ready to complete upload") + +// maybeCompleteUpload tries to complete the upload if the correct parameters +// are available. Returns errNotReadyToComplete if not ready to complete. +func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *http.Request) error { + // If we get a digest and length, we can finish the upload. + dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters! + sizeStr := r.FormValue("length") + + if dgstStr == "" || sizeStr == "" { + return errNotReadyToComplete + } + + dgst, err := digest.ParseDigest(dgstStr) + if err != nil { + return err + } + + size, err := strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + return err + } + + luh.completeUpload(w, r, size, dgst) + return nil +} + +// completeUpload finishes out the upload with the correct response. +func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, size int64, dgst digest.Digest) { + layer, err := luh.Upload.Finish(size, dgst) + if err != nil { + luh.Errors.Push(ErrorCodeUnknown, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + layerURL, err := buildLayerURL(luh.router, r, layer) + if err != nil { + luh.Errors.Push(ErrorCodeUnknown, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Set("Location", layerURL) + w.Header().Set("Content-Length", "0") + w.WriteHeader(http.StatusCreated) +} + +func buildLayerUploadURL(router *mux.Router, r *http.Request, upload storage.LayerUpload) (string, error) { + route := clonedRoute(router, routeNameBlobUploadResume) + + uploadURL, err := route.Schemes(r.URL.Scheme).Host(r.Host). + URL("name", upload.Name(), "uuid", upload.UUID()) + if err != nil { + return "", err + } + + return uploadURL.String(), nil +}