Webhook notification support in registry webapp
Endpoints are now created at applications startup time, using notification configuration. The instances are then added to a Broadcaster instance, which becomes the main event sink for the application. At request time, an event bridge is configured to listen to repository method calls. The actor and source of the eventBridge are created from the requeest context and application, respectively. The result is notifications are dispatched with calls to the context's Repository instance and are queued to each endpoint via the broadcaster. This commit also adds the concept of a RequestID and App.InstanceID. The request id uniquely identifies each request and the InstanceID uniquely identifies a run of the registry. These identifiers can be used in the future to correlate log messages with generated events to support rich debugging. The fields of the app were slightly reorganized for clarity and a few horrid util functions have been removed. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
b6270d9c14
commit
2aed7c2d0c
3 changed files with 102 additions and 49 deletions
121
docs/app.go
121
docs/app.go
|
@ -2,16 +2,19 @@ package registry
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"code.google.com/p/go-uuid/uuid"
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/docker/distribution/api/v2"
|
||||
"github.com/docker/distribution/auth"
|
||||
"github.com/docker/distribution/configuration"
|
||||
"github.com/docker/distribution/storage"
|
||||
"github.com/docker/distribution/storage/notifications"
|
||||
"github.com/docker/distribution/storagedriver"
|
||||
"github.com/docker/distribution/storagedriver/factory"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
|
@ -21,17 +24,22 @@ import (
|
|||
type App struct {
|
||||
Config configuration.Configuration
|
||||
|
||||
router *mux.Router
|
||||
// InstanceID is a unique id assigned to the application on each creation.
|
||||
// Provides information in the logs and context to identify restarts.
|
||||
InstanceID string
|
||||
|
||||
// driver maintains the app global storage driver instance.
|
||||
driver storagedriver.StorageDriver
|
||||
router *mux.Router // main application router, configured with dispatchers
|
||||
driver storagedriver.StorageDriver // driver maintains the app global storage driver instance.
|
||||
registry storage.Registry // registry is the primary registry backend for the app instance.
|
||||
accessController auth.AccessController // main access controller for application
|
||||
|
||||
// registry is the primary registry backend for the app instance.
|
||||
registry storage.Registry
|
||||
// events contains notification related configuration.
|
||||
events struct {
|
||||
sink notifications.Sink
|
||||
source notifications.SourceRecord
|
||||
}
|
||||
|
||||
layerHandler storage.LayerHandler
|
||||
|
||||
accessController auth.AccessController
|
||||
layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider
|
||||
}
|
||||
|
||||
// NewApp takes a configuration and returns a configured app, ready to serve
|
||||
|
@ -39,8 +47,9 @@ type App struct {
|
|||
// handlers accordingly.
|
||||
func NewApp(configuration configuration.Configuration) *App {
|
||||
app := &App{
|
||||
Config: configuration,
|
||||
router: v2.Router(),
|
||||
Config: configuration,
|
||||
InstanceID: uuid.New(),
|
||||
router: v2.Router(),
|
||||
}
|
||||
|
||||
// Register the handler dispatchers.
|
||||
|
@ -53,7 +62,8 @@ func NewApp(configuration configuration.Configuration) *App {
|
|||
app.register(v2.RouteNameBlobUpload, layerUploadDispatcher)
|
||||
app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher)
|
||||
|
||||
driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
|
||||
var err error
|
||||
app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
|
||||
|
||||
if err != nil {
|
||||
// TODO(stevvooe): Move the creation of a service into a protected
|
||||
|
@ -62,7 +72,7 @@ func NewApp(configuration configuration.Configuration) *App {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
app.driver = driver
|
||||
app.configureEvents(&configuration)
|
||||
app.registry = storage.NewRegistryWithDriver(app.driver)
|
||||
authType := configuration.Auth.Type()
|
||||
|
||||
|
@ -77,7 +87,7 @@ func NewApp(configuration configuration.Configuration) *App {
|
|||
layerHandlerType := configuration.LayerHandler.Type()
|
||||
|
||||
if layerHandlerType != "" {
|
||||
lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), driver)
|
||||
lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err))
|
||||
}
|
||||
|
@ -87,12 +97,6 @@ func NewApp(configuration configuration.Configuration) *App {
|
|||
return app
|
||||
}
|
||||
|
||||
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// Set a header with the Docker Distribution API Version for all responses.
|
||||
w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
|
||||
app.router.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// register a handler with the application, by route name. The handler will be
|
||||
// passed through the application filters and context will be constructed at
|
||||
// request time.
|
||||
|
@ -107,6 +111,59 @@ func (app *App) register(routeName string, dispatch dispatchFunc) {
|
|||
app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch))
|
||||
}
|
||||
|
||||
// configureEvents prepares the event sink for action.
|
||||
func (app *App) configureEvents(configuration *configuration.Configuration) {
|
||||
// Configure all of the endpoint sinks.
|
||||
var sinks []notifications.Sink
|
||||
for _, endpoint := range configuration.Notifications.Endpoints {
|
||||
if endpoint.Disabled {
|
||||
log.Infof("endpoint %s disabled, skipping", endpoint.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
|
||||
endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
|
||||
Timeout: endpoint.Timeout,
|
||||
Threshold: endpoint.Threshold,
|
||||
Backoff: endpoint.Backoff,
|
||||
Headers: endpoint.Headers,
|
||||
})
|
||||
|
||||
sinks = append(sinks, endpoint)
|
||||
}
|
||||
|
||||
// NOTE(stevvooe): Moving to a new queueing implementation is as easy as
|
||||
// replacing broadcaster with a rabbitmq implementation. It's recommended
|
||||
// that the registry instances also act as the workers to keep deployment
|
||||
// simple.
|
||||
app.events.sink = notifications.NewBroadcaster(sinks...)
|
||||
|
||||
// Populate registry event source
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
hostname = configuration.HTTP.Addr
|
||||
} else {
|
||||
// try to pick the port off the config
|
||||
_, port, err := net.SplitHostPort(configuration.HTTP.Addr)
|
||||
if err == nil {
|
||||
hostname = net.JoinHostPort(hostname, port)
|
||||
}
|
||||
}
|
||||
|
||||
app.events.source = notifications.SourceRecord{
|
||||
Addr: hostname,
|
||||
InstanceID: app.InstanceID,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close() // ensure that request body is always closed.
|
||||
|
||||
// Set a header with the Docker Distribution API Version for all responses.
|
||||
w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
|
||||
app.router.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// dispatchFunc takes a context and request and returns a constructed handler
|
||||
// for the route. The dispatcher will use this to dynamically create request
|
||||
// specific handlers for each endpoint without creating a new router for each
|
||||
|
@ -142,11 +199,14 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
|
|||
return
|
||||
}
|
||||
|
||||
// decorate the authorized repository with an event bridge.
|
||||
context.Repository = notifications.Listen(
|
||||
context.Repository, app.eventBridge(context, r))
|
||||
|
||||
context.log = log.WithField("name", context.Repository.Name())
|
||||
handler := dispatch(context, r)
|
||||
|
||||
ssrw := &singleStatusResponseWriter{ResponseWriter: w}
|
||||
context.log.Infoln("handler", resolveHandlerName(r.Method, handler))
|
||||
handler.ServeHTTP(ssrw, r)
|
||||
|
||||
// Automated error response handling here. Handlers may return their
|
||||
|
@ -167,6 +227,7 @@ func (app *App) context(r *http.Request) *Context {
|
|||
vars := mux.Vars(r)
|
||||
context := &Context{
|
||||
App: app,
|
||||
RequestID: uuid.New(),
|
||||
urlBuilder: v2.NewURLBuilderFromRequest(r),
|
||||
}
|
||||
|
||||
|
@ -268,6 +329,22 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont
|
|||
return nil
|
||||
}
|
||||
|
||||
// eventBridge returns a bridge for the current request, configured with the
|
||||
// correct actor and source.
|
||||
func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener {
|
||||
// TODO(stevvooe): Need to extract user data from request context using
|
||||
// auth system. Would prefer to do this during logging refactor and
|
||||
// addition of user and google context type.
|
||||
actor := notifications.ActorRecord{
|
||||
Name: "--todo--",
|
||||
Addr: r.RemoteAddr,
|
||||
Host: r.Host,
|
||||
RequestID: ctx.RequestID,
|
||||
}
|
||||
|
||||
return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, app.events.sink)
|
||||
}
|
||||
|
||||
// apiBase implements a simple yes-man for doing overall checks against the
|
||||
// api. This can support auth roundtrips to support docker login.
|
||||
func apiBase(w http.ResponseWriter, r *http.Request) {
|
||||
|
|
|
@ -13,6 +13,9 @@ type Context struct {
|
|||
// App points to the application structure that created this context.
|
||||
*App
|
||||
|
||||
// RequestID is the unique id of the request.
|
||||
RequestID string
|
||||
|
||||
// Repository is the repository for the current request. All requests
|
||||
// should be scoped to a single repository. This field may be nil.
|
||||
Repository storage.Repository
|
||||
|
|
27
docs/util.go
27
docs/util.go
|
@ -1,27 +0,0 @@
|
|||
package registry
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"reflect"
|
||||
"runtime"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
)
|
||||
|
||||
// functionName returns the name of the function fn.
|
||||
func functionName(fn interface{}) string {
|
||||
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
|
||||
}
|
||||
|
||||
// resolveHandlerName attempts to resolve a nice, pretty name for the passed
|
||||
// in handler.
|
||||
func resolveHandlerName(method string, handler http.Handler) string {
|
||||
switch v := handler.(type) {
|
||||
case handlers.MethodHandler:
|
||||
return functionName(v[method])
|
||||
case http.HandlerFunc:
|
||||
return functionName(v)
|
||||
default:
|
||||
return functionName(handler.ServeHTTP)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue