distribution/registry/handlers/app.go
Richard a72fb20b85 Add configuration for upload purging
Signed-off-by: Richard Scothern <richard.scothern@gmail.com>
2015-05-06 11:04:51 -07:00

660 lines
21 KiB
Go

package handlers
import (
"expvar"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/configuration"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/notifications"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/auth"
registrymiddleware "github.com/docker/distribution/registry/middleware/registry"
repositorymiddleware "github.com/docker/distribution/registry/middleware/repository"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/factory"
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
"github.com/garyburd/redigo/redis"
"github.com/gorilla/mux"
"golang.org/x/net/context"
)
// App is a global registry application object. Shared resources can be placed
// on this object that will be accessible from all requests. Any writable
// fields should be protected.
type App struct {
context.Context
Config configuration.Configuration
router *mux.Router // main application router, configured with dispatchers
driver storagedriver.StorageDriver // driver maintains the app global storage driver instance.
registry distribution.Namespace // registry is the primary registry backend for the app instance.
accessController auth.AccessController // main access controller for application
// events contains notification related configuration.
events struct {
sink notifications.Sink
source notifications.SourceRecord
}
redis *redis.Pool
}
// NewApp takes a configuration and returns a configured app, ready to serve
// requests. The app only implements ServeHTTP and can be wrapped in other
// handlers accordingly.
func NewApp(ctx context.Context, configuration configuration.Configuration) *App {
app := &App{
Config: configuration,
Context: ctx,
router: v2.RouterWithPrefix(configuration.HTTP.Prefix),
}
app.Context = ctxu.WithLogger(app.Context, ctxu.GetLogger(app, "instance.id"))
// Register the handler dispatchers.
app.register(v2.RouteNameBase, func(ctx *Context, r *http.Request) http.Handler {
return http.HandlerFunc(apiBase)
})
app.register(v2.RouteNameManifest, imageManifestDispatcher)
app.register(v2.RouteNameTags, tagsDispatcher)
app.register(v2.RouteNameBlob, layerDispatcher)
app.register(v2.RouteNameBlobUpload, layerUploadDispatcher)
app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher)
var err error
app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
if err != nil {
// TODO(stevvooe): Move the creation of a service into a protected
// method, where this is created lazily. Its status can be queried via
// a health check.
panic(err)
}
purgeConfig := uploadPurgeDefaultConfig()
if mc, ok := configuration.Storage["maintenance"]; ok {
for k, v := range mc {
switch k {
case "uploadpurging":
purgeConfig = v.(map[interface{}]interface{})
}
}
}
startUploadPurger(app.driver, ctxu.GetLogger(app), purgeConfig)
app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"])
if err != nil {
panic(err)
}
app.configureEvents(&configuration)
app.configureRedis(&configuration)
// configure storage caches
if cc, ok := configuration.Storage["cache"]; ok {
switch cc["layerinfo"] {
case "redis":
if app.redis == nil {
panic("redis configuration required to use for layerinfo cache")
}
app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewRedisLayerInfoCache(app.redis))
ctxu.GetLogger(app).Infof("using redis layerinfo cache")
case "inmemory":
app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewInMemoryLayerInfoCache())
ctxu.GetLogger(app).Infof("using inmemory layerinfo cache")
default:
if cc["layerinfo"] != "" {
ctxu.GetLogger(app).Warnf("unkown cache type %q, caching disabled", configuration.Storage["cache"])
}
}
}
if app.registry == nil {
// configure the registry if no cache section is available.
app.registry = storage.NewRegistryWithDriver(app.driver, nil)
}
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
if err != nil {
panic(err)
}
authType := configuration.Auth.Type()
if authType != "" {
accessController, err := auth.GetAccessController(configuration.Auth.Type(), configuration.Auth.Parameters())
if err != nil {
panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err))
}
app.accessController = accessController
}
return app
}
// register a handler with the application, by route name. The handler will be
// passed through the application filters and context will be constructed at
// request time.
func (app *App) register(routeName string, dispatch dispatchFunc) {
// TODO(stevvooe): This odd dispatcher/route registration is by-product of
// some limitations in the gorilla/mux router. We are using it to keep
// routing consistent between the client and server, but we may want to
// replace it with manual routing and structure-based dispatch for better
// control over the request execution.
app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch))
}
// configureEvents prepares the event sink for action.
func (app *App) configureEvents(configuration *configuration.Configuration) {
// Configure all of the endpoint sinks.
var sinks []notifications.Sink
for _, endpoint := range configuration.Notifications.Endpoints {
if endpoint.Disabled {
ctxu.GetLogger(app).Infof("endpoint %s disabled, skipping", endpoint.Name)
continue
}
ctxu.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
Timeout: endpoint.Timeout,
Threshold: endpoint.Threshold,
Backoff: endpoint.Backoff,
Headers: endpoint.Headers,
})
sinks = append(sinks, endpoint)
}
// NOTE(stevvooe): Moving to a new queueing implementation is as easy as
// replacing broadcaster with a rabbitmq implementation. It's recommended
// that the registry instances also act as the workers to keep deployment
// simple.
app.events.sink = notifications.NewBroadcaster(sinks...)
// Populate registry event source
hostname, err := os.Hostname()
if err != nil {
hostname = configuration.HTTP.Addr
} else {
// try to pick the port off the config
_, port, err := net.SplitHostPort(configuration.HTTP.Addr)
if err == nil {
hostname = net.JoinHostPort(hostname, port)
}
}
app.events.source = notifications.SourceRecord{
Addr: hostname,
InstanceID: ctxu.GetStringValue(app, "instance.id"),
}
}
func (app *App) configureRedis(configuration *configuration.Configuration) {
if configuration.Redis.Addr == "" {
ctxu.GetLogger(app).Infof("redis not configured")
return
}
pool := &redis.Pool{
Dial: func() (redis.Conn, error) {
// TODO(stevvooe): Yet another use case for contextual timing.
ctx := context.WithValue(app, "redis.connect.startedat", time.Now())
done := func(err error) {
logger := ctxu.GetLoggerWithField(ctx, "redis.connect.duration",
ctxu.Since(ctx, "redis.connect.startedat"))
if err != nil {
logger.Errorf("redis: error connecting: %v", err)
} else {
logger.Infof("redis: connect %v", configuration.Redis.Addr)
}
}
conn, err := redis.DialTimeout("tcp",
configuration.Redis.Addr,
configuration.Redis.DialTimeout,
configuration.Redis.ReadTimeout,
configuration.Redis.WriteTimeout)
if err != nil {
ctxu.GetLogger(app).Errorf("error connecting to redis instance %s: %v",
configuration.Redis.Addr, err)
done(err)
return nil, err
}
// authorize the connection
if configuration.Redis.Password != "" {
if _, err = conn.Do("AUTH", configuration.Redis.Password); err != nil {
defer conn.Close()
done(err)
return nil, err
}
}
// select the database to use
if configuration.Redis.DB != 0 {
if _, err = conn.Do("SELECT", configuration.Redis.DB); err != nil {
defer conn.Close()
done(err)
return nil, err
}
}
done(nil)
return conn, nil
},
MaxIdle: configuration.Redis.Pool.MaxIdle,
MaxActive: configuration.Redis.Pool.MaxActive,
IdleTimeout: configuration.Redis.Pool.IdleTimeout,
TestOnBorrow: func(c redis.Conn, t time.Time) error {
// TODO(stevvooe): We can probably do something more interesting
// here with the health package.
_, err := c.Do("PING")
return err
},
Wait: false, // if a connection is not avialable, proceed without cache.
}
app.redis = pool
// setup expvar
registry := expvar.Get("registry")
if registry == nil {
registry = expvar.NewMap("registry")
}
registry.(*expvar.Map).Set("redis", expvar.Func(func() interface{} {
return map[string]interface{}{
"Config": configuration.Redis,
"Active": app.redis.ActiveCount(),
}
}))
}
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() // ensure that request body is always closed.
// Instantiate an http context here so we can track the error codes
// returned by the request router.
ctx := defaultContextManager.context(app, w, r)
defer func() {
ctxu.GetResponseLogger(ctx).Infof("response completed")
}()
defer defaultContextManager.release(ctx)
// NOTE(stevvooe): Total hack to get instrumented responsewriter from context.
var err error
w, err = ctxu.GetResponseWriter(ctx)
if err != nil {
ctxu.GetLogger(ctx).Warnf("response writer not found in context")
}
// Set a header with the Docker Distribution API Version for all responses.
w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
app.router.ServeHTTP(w, r)
}
// dispatchFunc takes a context and request and returns a constructed handler
// for the route. The dispatcher will use this to dynamically create request
// specific handlers for each endpoint without creating a new router for each
// request.
type dispatchFunc func(ctx *Context, r *http.Request) http.Handler
// TODO(stevvooe): dispatchers should probably have some validation error
// chain with proper error reporting.
// dispatcher returns a handler that constructs a request specific context and
// handler, using the dispatch factory function.
func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
context := app.context(w, r)
if err := app.authorized(w, r, context); err != nil {
ctxu.GetLogger(context).Errorf("error authorizing context: %v", err)
return
}
// Add username to request logging
context.Context = ctxu.WithLogger(context.Context, ctxu.GetLogger(context.Context, "auth.user.name"))
if app.nameRequired(r) {
repository, err := app.registry.Repository(context, getName(context))
if err != nil {
ctxu.GetLogger(context).Errorf("error resolving repository: %v", err)
switch err := err.(type) {
case distribution.ErrRepositoryUnknown:
context.Errors.Push(v2.ErrorCodeNameUnknown, err)
case distribution.ErrRepositoryNameInvalid:
context.Errors.Push(v2.ErrorCodeNameInvalid, err)
}
w.WriteHeader(http.StatusBadRequest)
serveJSON(w, context.Errors)
return
}
// assign and decorate the authorized repository with an event bridge.
context.Repository = notifications.Listen(
repository,
app.eventBridge(context, r))
context.Repository, err = applyRepoMiddleware(context.Repository, app.Config.Middleware["repository"])
if err != nil {
ctxu.GetLogger(context).Errorf("error initializing repository middleware: %v", err)
context.Errors.Push(v2.ErrorCodeUnknown, err)
w.WriteHeader(http.StatusInternalServerError)
serveJSON(w, context.Errors)
return
}
}
dispatch(context, r).ServeHTTP(w, r)
// Automated error response handling here. Handlers may return their
// own errors if they need different behavior (such as range errors
// for layer upload).
if context.Errors.Len() > 0 {
if context.Value("http.response.status") == 0 {
// TODO(stevvooe): Getting this value from the context is a
// bit of a hack. We can further address with some of our
// future refactoring.
w.WriteHeader(http.StatusBadRequest)
}
app.logError(context, context.Errors)
serveJSON(w, context.Errors)
}
})
}
func (app *App) logError(context context.Context, errors v2.Errors) {
for _, e := range errors.Errors {
c := ctxu.WithValue(context, "err.code", e.Code)
c = ctxu.WithValue(c, "err.message", e.Message)
c = ctxu.WithValue(c, "err.detail", e.Detail)
c = ctxu.WithLogger(c, ctxu.GetLogger(c,
"err.code",
"err.message",
"err.detail"))
ctxu.GetLogger(c).Errorf("An error occured")
}
}
// context constructs the context object for the application. This only be
// called once per request.
func (app *App) context(w http.ResponseWriter, r *http.Request) *Context {
ctx := defaultContextManager.context(app, w, r)
ctx = ctxu.WithVars(ctx, r)
ctx = ctxu.WithLogger(ctx, ctxu.GetLogger(ctx,
"vars.name",
"vars.reference",
"vars.digest",
"vars.uuid"))
context := &Context{
App: app,
Context: ctx,
urlBuilder: v2.NewURLBuilderFromRequest(r),
}
return context
}
// authorized checks if the request can proceed with access to the requested
// repository. If it succeeds, the context may access the requested
// repository. An error will be returned if access is not available.
func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Context) error {
ctxu.GetLogger(context).Debug("authorizing request")
repo := getName(context)
if app.accessController == nil {
return nil // access controller is not enabled.
}
var accessRecords []auth.Access
if repo != "" {
accessRecords = appendAccessRecords(accessRecords, r.Method, repo)
} else {
// Only allow the name not to be set on the base route.
if app.nameRequired(r) {
// For this to be properly secured, repo must always be set for a
// resource that may make a modification. The only condition under
// which name is not set and we still allow access is when the
// base route is accessed. This section prevents us from making
// that mistake elsewhere in the code, allowing any operation to
// proceed.
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusForbidden)
var errs v2.Errors
errs.Push(v2.ErrorCodeUnauthorized)
serveJSON(w, errs)
return fmt.Errorf("forbidden: no repository name")
}
}
ctx, err := app.accessController.Authorized(context.Context, accessRecords...)
if err != nil {
switch err := err.(type) {
case auth.Challenge:
w.Header().Set("Content-Type", "application/json; charset=utf-8")
err.ServeHTTP(w, r)
var errs v2.Errors
errs.Push(v2.ErrorCodeUnauthorized, accessRecords)
serveJSON(w, errs)
default:
// This condition is a potential security problem either in
// the configuration or whatever is backing the access
// controller. Just return a bad request with no information
// to avoid exposure. The request should not proceed.
ctxu.GetLogger(context).Errorf("error checking authorization: %v", err)
w.WriteHeader(http.StatusBadRequest)
}
return err
}
// TODO(stevvooe): This pattern needs to be cleaned up a bit. One context
// should be replaced by another, rather than replacing the context on a
// mutable object.
context.Context = ctx
return nil
}
// eventBridge returns a bridge for the current request, configured with the
// correct actor and source.
func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener {
actor := notifications.ActorRecord{
Name: getUserName(ctx, r),
}
request := notifications.NewRequestRecord(ctxu.GetRequestID(ctx), r)
return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink)
}
// nameRequired returns true if the route requires a name.
func (app *App) nameRequired(r *http.Request) bool {
route := mux.CurrentRoute(r)
return route == nil || route.GetName() != v2.RouteNameBase
}
// apiBase implements a simple yes-man for doing overall checks against the
// api. This can support auth roundtrips to support docker login.
func apiBase(w http.ResponseWriter, r *http.Request) {
const emptyJSON = "{}"
// Provide a simple /v2/ 200 OK response with empty json response.
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("Content-Length", fmt.Sprint(len(emptyJSON)))
fmt.Fprint(w, emptyJSON)
}
// appendAccessRecords checks the method and adds the appropriate Access records to the records list.
func appendAccessRecords(records []auth.Access, method string, repo string) []auth.Access {
resource := auth.Resource{
Type: "repository",
Name: repo,
}
switch method {
case "GET", "HEAD":
records = append(records,
auth.Access{
Resource: resource,
Action: "pull",
})
case "POST", "PUT", "PATCH":
records = append(records,
auth.Access{
Resource: resource,
Action: "pull",
},
auth.Access{
Resource: resource,
Action: "push",
})
case "DELETE":
// DELETE access requires full admin rights, which is represented
// as "*". This may not be ideal.
records = append(records,
auth.Access{
Resource: resource,
Action: "*",
})
}
return records
}
// applyRegistryMiddleware wraps a registry instance with the configured middlewares
func applyRegistryMiddleware(registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) {
for _, mw := range middlewares {
rmw, err := registrymiddleware.Get(mw.Name, mw.Options, registry)
if err != nil {
return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err)
}
registry = rmw
}
return registry, nil
}
// applyRepoMiddleware wraps a repository with the configured middlewares
func applyRepoMiddleware(repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) {
for _, mw := range middlewares {
rmw, err := repositorymiddleware.Get(mw.Name, mw.Options, repository)
if err != nil {
return nil, err
}
repository = rmw
}
return repository, nil
}
// applyStorageMiddleware wraps a storage driver with the configured middlewares
func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []configuration.Middleware) (storagedriver.StorageDriver, error) {
for _, mw := range middlewares {
smw, err := storagemiddleware.Get(mw.Name, mw.Options, driver)
if err != nil {
return nil, fmt.Errorf("unable to configure storage middleware (%s): %v", mw.Name, err)
}
driver = smw
}
return driver, nil
}
// uploadPurgeDefaultConfig provides a default configuration for upload
// purging to be used in the absence of configuration in the
// confifuration file
func uploadPurgeDefaultConfig() map[interface{}]interface{} {
config := map[interface{}]interface{}{}
config["enabled"] = true
config["age"] = "168h"
config["interval"] = "24h"
config["dryrun"] = false
return config
}
func badPurgeUploadConfig(reason string) {
panic(fmt.Sprintf("Unable to parse upload purge configuration: %s", reason))
}
// startUploadPurger schedules a goroutine which will periodically
// check upload directories for old files and delete them
func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logger, config map[interface{}]interface{}) {
if config["enabled"] == false {
return
}
var purgeAgeDuration time.Duration
var err error
purgeAge, ok := config["age"]
if ok {
ageStr, ok := purgeAge.(string)
if !ok {
badPurgeUploadConfig("age is not a string")
}
purgeAgeDuration, err = time.ParseDuration(ageStr)
if err != nil {
badPurgeUploadConfig(fmt.Sprintf("Cannot parse duration: %s", err.Error()))
}
} else {
badPurgeUploadConfig("age missing")
}
var intervalDuration time.Duration
interval, ok := config["interval"]
if ok {
intervalStr, ok := interval.(string)
if !ok {
badPurgeUploadConfig("interval is not a string")
}
intervalDuration, err = time.ParseDuration(intervalStr)
if err != nil {
badPurgeUploadConfig(fmt.Sprintf("Cannot parse interval: %s", err.Error()))
}
} else {
badPurgeUploadConfig("interval missing")
}
var dryRunBool bool
dryRun, ok := config["dryrun"]
if ok {
dryRunBool, ok = dryRun.(bool)
if !ok {
badPurgeUploadConfig("cannot parse dryrun")
}
} else {
badPurgeUploadConfig("dryrun missing")
}
go func() {
rand.Seed(time.Now().Unix())
jitter := time.Duration(rand.Int()%60) * time.Minute
log.Infof("Starting upload purge in %s", jitter)
time.Sleep(jitter)
for {
storage.PurgeUploads(storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool)
log.Infof("Starting upload purge in %s", intervalDuration)
time.Sleep(intervalDuration)
}
}()
}