Initial implementation of Layer API

The http API has its first set of endpoints to implement the core aspects of
fetching and uploading layers. Uploads can be started and completed in a single
chunk and the content can be fetched via tarsum. Most proposed error conditions
should be represented but edge cases likely remain.

In this version, note that the layers are still called layers, even though the
routes are pointing to blobs. This will change with backend refactoring over
the next few weeks.

The unit tests are a bit of a shamble but these need to be carefully written
along with the core specification process. As the the client-server interaction
solidifies, we can port this into a verification suite for registry providers.
This commit is contained in:
Stephen J Day 2014-11-20 19:57:01 -08:00
parent 195568017a
commit e158e3cd65
6 changed files with 528 additions and 32 deletions

236
api_test.go Normal file
View file

@ -0,0 +1,236 @@
package registry
import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"os"
"testing"
"github.com/Sirupsen/logrus"
_ "github.com/docker/docker-registry/storagedriver/inmemory"
"github.com/gorilla/handlers"
"github.com/docker/docker-registry/common/testutil"
"github.com/docker/docker-registry/configuration"
"github.com/docker/docker-registry/digest"
)
// TestLayerAPI conducts a full of the of the layer api.
func TestLayerAPI(t *testing.T) {
// TODO(stevvooe): This test code is complete junk but it should cover the
// complete flow. This must be broken down and checked against the
// specification *before* we submit the final to docker core.
config := configuration.Configuration{
Storage: configuration.Storage{
"inmemory": configuration.Parameters{},
},
}
app := NewApp(config)
server := httptest.NewServer(handlers.CombinedLoggingHandler(os.Stderr, app))
router := v2APIRouter()
u, err := url.Parse(server.URL)
if err != nil {
t.Fatalf("error parsing server url: %v", err)
}
imageName := "foo/bar"
// "build" our layer file
layerFile, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating random layer file: %v", err)
}
layerDigest := digest.Digest(tarSumStr)
// -----------------------------------
// Test fetch for non-existent content
r, err := router.GetRoute(routeNameBlob).Host(u.Host).
URL("name", imageName,
"digest", tarSumStr)
resp, err := http.Get(r.String())
if err != nil {
t.Fatalf("unexpected error fetching non-existent layer: %v", err)
}
switch resp.StatusCode {
case http.StatusNotFound:
break // expected
default:
d, err := httputil.DumpResponse(resp, true)
if err != nil {
t.Fatalf("unexpected status fetching non-existent layer: %v, %v", resp.StatusCode, resp.Status)
}
t.Logf("response:\n%s", string(d))
t.Fatalf("unexpected status fetching non-existent layer: %v, %v", resp.StatusCode, resp.Status)
}
// ------------------------------------------
// Test head request for non-existent content
resp, err = http.Head(r.String())
if err != nil {
t.Fatalf("unexpected error checking head on non-existent layer: %v", err)
}
switch resp.StatusCode {
case http.StatusNotFound:
break // expected
default:
d, err := httputil.DumpResponse(resp, true)
if err != nil {
t.Fatalf("unexpected status checking head on non-existent layer: %v, %v", resp.StatusCode, resp.Status)
}
t.Logf("response:\n%s", string(d))
t.Fatalf("unexpected status checking head on non-existent layer: %v, %v", resp.StatusCode, resp.Status)
}
// ------------------------------------------
// Upload a layer
r, err = router.GetRoute(routeNameBlobUpload).Host(u.Host).
URL("name", imageName)
if err != nil {
t.Fatalf("error starting layer upload: %v", err)
}
resp, err = http.Post(r.String(), "", nil)
if err != nil {
t.Fatalf("error starting layer upload: %v", err)
}
if resp.StatusCode != http.StatusAccepted {
d, err := httputil.DumpResponse(resp, true)
if err != nil {
t.Fatalf("unexpected status starting layer upload: %v, %v", resp.StatusCode, resp.Status)
}
t.Logf("response:\n%s", string(d))
t.Fatalf("unexpected status starting layer upload: %v, %v", resp.StatusCode, resp.Status)
}
if resp.Header.Get("Location") == "" { // TODO(stevvooe): Need better check here.
t.Fatalf("unexpected Location: %q != %q", resp.Header.Get("Location"), "foo")
}
if resp.Header.Get("Content-Length") != "0" {
t.Fatalf("unexpected content-length: %q != %q", resp.Header.Get("Content-Length"), "0")
}
layerLength, _ := layerFile.Seek(0, os.SEEK_END)
layerFile.Seek(0, os.SEEK_SET)
uploadURLStr := resp.Header.Get("Location")
// TODO(sday): Cancel the layer upload here and restart.
query := url.Values{
"digest": []string{layerDigest.String()},
"length": []string{fmt.Sprint(layerLength)},
}
uploadURL, err := url.Parse(uploadURLStr)
if err != nil {
t.Fatalf("unexpected error parsing url: %v", err)
}
uploadURL.RawQuery = query.Encode()
// Just do a monolithic upload
req, err := http.NewRequest("PUT", uploadURL.String(), layerFile)
if err != nil {
t.Fatalf("unexpected error creating new request: %v", err)
}
resp, err = http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unexpected error doing put: %v", err)
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusCreated:
break // expected
default:
d, err := httputil.DumpResponse(resp, true)
if err != nil {
t.Fatalf("unexpected status putting chunk: %v, %v", resp.StatusCode, resp.Status)
}
t.Logf("response:\n%s", string(d))
t.Fatalf("unexpected status putting chunk: %v, %v", resp.StatusCode, resp.Status)
}
if resp.Header.Get("Location") == "" {
t.Fatalf("unexpected Location: %q", resp.Header.Get("Location"))
}
if resp.Header.Get("Content-Length") != "0" {
t.Fatalf("unexpected content-length: %q != %q", resp.Header.Get("Content-Length"), "0")
}
layerURL := resp.Header.Get("Location")
// ------------------------
// Use a head request to see if the layer exists.
resp, err = http.Head(layerURL)
if err != nil {
t.Fatalf("unexpected error checking head on non-existent layer: %v", err)
}
switch resp.StatusCode {
case http.StatusOK:
break // expected
default:
d, err := httputil.DumpResponse(resp, true)
if err != nil {
t.Fatalf("unexpected status checking head on layer: %v, %v", resp.StatusCode, resp.Status)
}
t.Logf("response:\n%s", string(d))
t.Fatalf("unexpected status checking head on layer: %v, %v", resp.StatusCode, resp.Status)
}
logrus.Infof("fetch the layer")
// ----------------
// Fetch the layer!
resp, err = http.Get(layerURL)
if err != nil {
t.Fatalf("unexpected error fetching layer: %v", err)
}
switch resp.StatusCode {
case http.StatusOK:
break // expected
default:
d, err := httputil.DumpResponse(resp, true)
if err != nil {
t.Fatalf("unexpected status fetching layer: %v, %v", resp.StatusCode, resp.Status)
}
t.Logf("response:\n%s", string(d))
t.Fatalf("unexpected status fetching layer: %v, %v", resp.StatusCode, resp.Status)
}
// Verify the body
verifier := digest.NewDigestVerifier(layerDigest)
io.Copy(verifier, resp.Body)
if !verifier.Verified() {
d, err := httputil.DumpResponse(resp, true)
if err != nil {
t.Fatalf("unexpected status checking head on layer ayo!: %v, %v", resp.StatusCode, resp.Status)
}
t.Logf("response:\n%s", string(d))
t.Fatalf("response body did not pass verification")
}
}

49
app.go
View file

@ -3,7 +3,11 @@ package registry
import (
"net/http"
"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker-registry/storagedriver/factory"
"github.com/docker/docker-registry/configuration"
"github.com/docker/docker-registry/storage"
log "github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
@ -16,6 +20,12 @@ type App struct {
Config configuration.Configuration
router *mux.Router
// driver maintains the app global storage driver instance.
driver storagedriver.StorageDriver
// services contains the main services instance for the application.
services *storage.Services
}
// NewApp takes a configuration and returns a configured app, ready to serve
@ -29,11 +39,23 @@ func NewApp(configuration configuration.Configuration) *App {
// Register the handler dispatchers.
app.register(routeNameImageManifest, imageManifestDispatcher)
app.register(routeNameBlob, layerDispatcher)
app.register(routeNameTags, tagsDispatcher)
app.register(routeNameBlob, layerDispatcher)
app.register(routeNameBlobUpload, layerUploadDispatcher)
app.register(routeNameBlobUploadResume, layerUploadDispatcher)
driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
if err != nil {
// TODO(stevvooe): Move the creation of a service into a protected
// method, where this is created lazily. Its status can be queried via
// a health check.
panic(err)
}
app.driver = driver
app.services = storage.NewServices(app.driver)
return app
}
@ -64,6 +86,22 @@ type dispatchFunc func(ctx *Context, r *http.Request) http.Handler
// TODO(stevvooe): dispatchers should probably have some validation error
// chain with proper error reporting.
// singleStatusResponseWriter only allows the first status to be written to be
// the valid request status. The current use case of this class should be
// factored out.
type singleStatusResponseWriter struct {
http.ResponseWriter
status int
}
func (ssrw *singleStatusResponseWriter) WriteHeader(status int) {
if ssrw.status != 0 {
return
}
ssrw.status = status
ssrw.ResponseWriter.WriteHeader(status)
}
// dispatcher returns a handler that constructs a request specific context and
// handler, using the dispatch factory function.
func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
@ -80,14 +118,17 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
context.log = log.WithField("name", context.Name)
handler := dispatch(context, r)
ssrw := &singleStatusResponseWriter{ResponseWriter: w}
context.log.Infoln("handler", resolveHandlerName(r.Method, handler))
handler.ServeHTTP(w, r)
handler.ServeHTTP(ssrw, r)
// Automated error response handling here. Handlers may return their
// own errors if they need different behavior (such as range errors
// for layer upload).
if len(context.Errors.Errors) > 0 {
w.WriteHeader(http.StatusBadRequest)
if context.Errors.Len() > 0 {
if ssrw.status == 0 {
w.WriteHeader(http.StatusBadRequest)
}
serveJSON(w, context.Errors)
}
})

View file

@ -1,8 +1,6 @@
package registry
import (
"github.com/Sirupsen/logrus"
)
import "github.com/Sirupsen/logrus"
// Context should contain the request specific context for use in across
// handlers. Resources that don't need to be shared across handlers should not
@ -20,11 +18,6 @@ type Context struct {
// handler *must not* start the response via http.ResponseWriter.
Errors Errors
// TODO(stevvooe): Context would be a good place to create a
// representation of the "authorized resource". Perhaps, rather than
// having fields like "name", the context should be a set of parameters
// then we do routing from there.
// vars contains the extracted gorilla/mux variables that can be used for
// assignment.
vars map[string]string

View file

@ -2,7 +2,10 @@ package registry
import (
"encoding/json"
"io"
"net/http"
"github.com/gorilla/mux"
)
// serveJSON marshals v and sets the content-type header to
@ -18,3 +21,21 @@ func serveJSON(w http.ResponseWriter, v interface{}) error {
return nil
}
// closeResources closes all the provided resources after running the target
// handler.
func closeResources(handler http.Handler, closers ...io.Closer) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for _, closer := range closers {
defer closer.Close()
}
handler.ServeHTTP(w, r)
})
}
// clondedRoute returns a clone of the named route from the router.
func clonedRoute(router *mux.Router, name string) *mux.Route {
route := new(mux.Route)
*route = *router.GetRoute(name) // clone the route
return route
}

View file

@ -3,17 +3,28 @@ package registry
import (
"net/http"
"github.com/docker/docker-registry/digest"
"github.com/docker/docker-registry/storage"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
)
// layerDispatcher uses the request context to build a layerHandler.
func layerDispatcher(ctx *Context, r *http.Request) http.Handler {
layerHandler := &layerHandler{
Context: ctx,
TarSum: ctx.vars["tarsum"],
dgst, err := digest.ParseDigest(ctx.vars["digest"])
if err != nil {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx.Errors.Push(ErrorCodeInvalidDigest, err)
})
}
layerHandler.log = layerHandler.log.WithField("tarsum", layerHandler.TarSum)
layerHandler := &layerHandler{
Context: ctx,
Digest: dgst,
}
layerHandler.log = layerHandler.log.WithField("digest", dgst)
return handlers.MethodHandler{
"GET": http.HandlerFunc(layerHandler.GetLayer),
@ -25,11 +36,44 @@ func layerDispatcher(ctx *Context, r *http.Request) http.Handler {
type layerHandler struct {
*Context
TarSum string
Digest digest.Digest
}
// GetLayer fetches the binary data from backend storage returns it in the
// response.
func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) {
layers := lh.services.Layers()
layer, err := layers.Fetch(lh.Name, lh.Digest)
if err != nil {
switch err {
case storage.ErrLayerUnknown:
w.WriteHeader(http.StatusNotFound)
lh.Errors.Push(ErrorCodeUnknownLayer,
map[string]interface{}{
"unknown": FSLayer{BlobSum: lh.Digest},
})
return
default:
lh.Errors.Push(ErrorCodeUnknown, err)
return
}
}
defer layer.Close()
http.ServeContent(w, r, layer.Digest().String(), layer.CreatedAt(), layer)
}
func buildLayerURL(router *mux.Router, r *http.Request, layer storage.Layer) (string, error) {
route := clonedRoute(router, routeNameBlob)
layerURL, err := route.Schemes(r.URL.Scheme).Host(r.Host).
URL("name", layer.Name(),
"digest", layer.Digest().String())
if err != nil {
return "", err
}
return layerURL.String(), nil
}

View file

@ -1,64 +1,225 @@
package registry
import (
"fmt"
"io"
"net/http"
"strconv"
"github.com/Sirupsen/logrus"
"github.com/docker/docker-registry/digest"
"github.com/docker/docker-registry/storage"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
)
// layerUploadDispatcher constructs and returns the layer upload handler for
// the given request context.
func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler {
layerUploadHandler := &layerUploadHandler{
luh := &layerUploadHandler{
Context: ctx,
TarSum: ctx.vars["tarsum"],
UUID: ctx.vars["uuid"],
}
layerUploadHandler.log = layerUploadHandler.log.WithField("tarsum", layerUploadHandler.TarSum)
handler := http.Handler(handlers.MethodHandler{
"POST": http.HandlerFunc(luh.StartLayerUpload),
"GET": http.HandlerFunc(luh.GetUploadStatus),
"HEAD": http.HandlerFunc(luh.GetUploadStatus),
"PUT": http.HandlerFunc(luh.PutLayerChunk),
"DELETE": http.HandlerFunc(luh.CancelLayerUpload),
})
if layerUploadHandler.UUID != "" {
layerUploadHandler.log = layerUploadHandler.log.WithField("uuid", layerUploadHandler.UUID)
if luh.UUID != "" {
luh.log = luh.log.WithField("uuid", luh.UUID)
layers := ctx.services.Layers()
upload, err := layers.Resume(luh.UUID)
if err != nil && err != storage.ErrLayerUploadUnknown {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logrus.Infof("error resolving upload: %v", err)
w.WriteHeader(http.StatusInternalServerError)
luh.Errors.Push(ErrorCodeUnknown, err)
})
}
luh.Upload = upload
handler = closeResources(handler, luh.Upload)
}
return handlers.MethodHandler{
"POST": http.HandlerFunc(layerUploadHandler.StartLayerUpload),
"GET": http.HandlerFunc(layerUploadHandler.GetUploadStatus),
"HEAD": http.HandlerFunc(layerUploadHandler.GetUploadStatus),
"PUT": http.HandlerFunc(layerUploadHandler.PutLayerChunk),
"DELETE": http.HandlerFunc(layerUploadHandler.CancelLayerUpload),
}
return handler
}
// layerUploadHandler handles the http layer upload process.
type layerUploadHandler struct {
*Context
// TarSum is the unique identifier of the layer being uploaded.
TarSum string
// UUID identifies the upload instance for the current request.
UUID string
Upload storage.LayerUpload
}
// StartLayerUpload begins the layer upload process and allocates a server-
// side upload session.
func (luh *layerUploadHandler) StartLayerUpload(w http.ResponseWriter, r *http.Request) {
layers := luh.services.Layers()
upload, err := layers.Upload(luh.Name)
if err != nil {
w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
luh.Errors.Push(ErrorCodeUnknown, err)
return
}
luh.Upload = upload
defer luh.Upload.Close()
if err := luh.layerUploadResponse(w, r); err != nil {
w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
luh.Errors.Push(ErrorCodeUnknown, err)
return
}
w.WriteHeader(http.StatusAccepted)
}
// GetUploadStatus returns the status of a given upload, identified by uuid.
func (luh *layerUploadHandler) GetUploadStatus(w http.ResponseWriter, r *http.Request) {
if luh.Upload == nil {
w.WriteHeader(http.StatusNotFound)
luh.Errors.Push(ErrorCodeUnknownLayerUpload)
}
if err := luh.layerUploadResponse(w, r); err != nil {
w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
luh.Errors.Push(ErrorCodeUnknown, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
// PutLayerChunk receives a layer chunk during the layer upload process,
// possible completing the upload with a checksum and length.
func (luh *layerUploadHandler) PutLayerChunk(w http.ResponseWriter, r *http.Request) {
if luh.Upload == nil {
w.WriteHeader(http.StatusNotFound)
luh.Errors.Push(ErrorCodeUnknownLayerUpload)
}
var finished bool
// TODO(stevvooe): This is woefully incomplete. Missing stuff:
//
// 1. Extract information from range header, if present.
// 2. Check offset of current layer.
// 3. Emit correct error responses.
// Read in the chunk
io.Copy(luh.Upload, r.Body)
if err := luh.maybeCompleteUpload(w, r); err != nil {
if err != errNotReadyToComplete {
w.WriteHeader(http.StatusInternalServerError)
luh.Errors.Push(ErrorCodeUnknown, err)
return
}
}
if err := luh.layerUploadResponse(w, r); err != nil {
w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
luh.Errors.Push(ErrorCodeUnknown, err)
return
}
if finished {
w.WriteHeader(http.StatusCreated)
} else {
w.WriteHeader(http.StatusAccepted)
}
}
// CancelLayerUpload cancels an in-progress upload of a layer.
func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http.Request) {
if luh.Upload == nil {
w.WriteHeader(http.StatusNotFound)
luh.Errors.Push(ErrorCodeUnknownLayerUpload)
}
}
// layerUploadResponse provides a standard request for uploading layers and
// chunk responses. This sets the correct headers but the response status is
// left to the caller.
func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error {
uploadURL, err := buildLayerUploadURL(luh.router, r, luh.Upload)
if err != nil {
logrus.Infof("error building upload url: %s", err)
return err
}
w.Header().Set("Location", uploadURL)
w.Header().Set("Content-Length", "0")
w.Header().Set("Range", fmt.Sprintf("0-%d", luh.Upload.Offset()))
return nil
}
var errNotReadyToComplete = fmt.Errorf("not ready to complete upload")
// maybeCompleteUpload tries to complete the upload if the correct parameters
// are available. Returns errNotReadyToComplete if not ready to complete.
func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *http.Request) error {
// If we get a digest and length, we can finish the upload.
dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters!
sizeStr := r.FormValue("length")
if dgstStr == "" || sizeStr == "" {
return errNotReadyToComplete
}
dgst, err := digest.ParseDigest(dgstStr)
if err != nil {
return err
}
size, err := strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
return err
}
luh.completeUpload(w, r, size, dgst)
return nil
}
// completeUpload finishes out the upload with the correct response.
func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, size int64, dgst digest.Digest) {
layer, err := luh.Upload.Finish(size, dgst)
if err != nil {
luh.Errors.Push(ErrorCodeUnknown, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
layerURL, err := buildLayerURL(luh.router, r, layer)
if err != nil {
luh.Errors.Push(ErrorCodeUnknown, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("Location", layerURL)
w.Header().Set("Content-Length", "0")
w.WriteHeader(http.StatusCreated)
}
func buildLayerUploadURL(router *mux.Router, r *http.Request, upload storage.LayerUpload) (string, error) {
route := clonedRoute(router, routeNameBlobUploadResume)
uploadURL, err := route.Schemes(r.URL.Scheme).Host(r.Host).
URL("name", upload.Name(), "uuid", upload.UUID())
if err != nil {
return "", err
}
return uploadURL.String(), nil
}