593bbccdb5
This PR refactors the blob service API to be oriented around blob descriptors. Identified by digests, blobs become an abstract entity that can be read and written using a descriptor as a handle. This allows blobs to take many forms, such as a ReadSeekCloser or a simple byte buffer, allowing blob oriented operations to better integrate with blob agnostic APIs (such as the `io` package). The error definitions are now better organized to reflect conditions that can only be seen when interacting with the blob API. The main benefit of this is to separate the much smaller metadata from large file storage. Many benefits also follow from this. Reading and writing has been separated into discrete services. Backend implementation is also simplified, by reducing the amount of metadata that needs to be picked up to simply serve a read. This also improves cacheability. "Opening" a blob simply consists of an access check (Stat) and a path calculation. Caching is greatly simplified and we've made the mapping of provisional to canonical hashes a first-class concept. BlobDescriptorService and BlobProvider can be combined in different ways to achieve varying effects. Recommend Review Approach ------------------------- This is a very large patch. While apologies are in order, we are getting a considerable amount of refactoring. Most changes follow from the changes to the root package (distribution), so start there. From there, the main changes are in storage. Looking at (*repository).Blobs will help to understand the how the linkedBlobStore is wired. One can explore the internals within and also branch out into understanding the changes to the caching layer. Following the descriptions below will also help to guide you. To reduce the chances for regressions, it was critical that major changes to unit tests were avoided. Where possible, they are left untouched and where not, the spirit is hopefully captured. Pay particular attention to where behavior may have changed. Storage ------- The primary changes to the `storage` package, other than the interface updates, were to merge the layerstore and blobstore. Blob access is now layered even further. The first layer, blobStore, exposes a global `BlobStatter` and `BlobProvider`. Operations here provide a fast path for most read operations that don't take access control into account. The `linkedBlobStore` layers on top of the `blobStore`, providing repository- scoped blob link management in the backend. The `linkedBlobStore` implements the full `BlobStore` suite, providing access-controlled, repository-local blob writers. The abstraction between the two is slightly broken in that `linkedBlobStore` is the only channel under which one can write into the global blob store. The `linkedBlobStore` also provides flexibility in that it can act over different link sets depending on configuration. This allows us to use the same code for signature links, manifest links and blob links. Eventually, we will fully consolidate this storage. The improved cache flow comes from the `linkedBlobStatter` component of `linkedBlobStore`. Using a `cachedBlobStatter`, these combine together to provide a simple cache hierarchy that should streamline access checks on read and write operations, or at least provide a single path to optimize. The metrics have been changed in a slightly incompatible way since the former operations, Fetch and Exists, are no longer relevant. The fileWriter and fileReader have been slightly modified to support the rest of the changes. The most interesting is the removal of the `Stat` call from `newFileReader`. This was the source of unnecessary round trips that were only present to look up the size of the resulting reader. Now, one must simply pass in the size, requiring the caller to decide whether or not the `Stat` call is appropriate. In several cases, it turned out the caller already had the size already. The `WriterAt` implementation has been removed from `fileWriter`, since it is no longer required for `BlobWriter`, reducing the number of paths which writes may take. Cache ----- Unfortunately, the `cache` package required a near full rewrite. It was pretty mechanical in that the cache is oriented around the `BlobDescriptorService` slightly modified to include the ability to set the values for individual digests. While the implementation is oriented towards caching, it can act as a primary store. Provisions are in place to have repository local metadata, in addition to global metadata. Fallback is implemented as a part of the storage package to maintain this flexibility. One unfortunate side-effect is that caching is now repository-scoped, rather than global. This should have little effect on performance but may increase memory usage. Handlers -------- The `handlers` package has been updated to leverage the new API. For the most part, the changes are superficial or mechanical based on the API changes. This did expose a bug in the handling of provisional vs canonical digests that was fixed in the unit tests. Configuration ------------- One user-facing change has been made to the configuration and is updated in the associated documentation. The `layerinfo` cache parameter has been deprecated by the `blobdescriptor` cache parameter. Both are equivalent and configuration files should be backward compatible. Notifications ------------- Changes the `notification` package are simply to support the interface changes. Context ------- A small change has been made to the tracing log-level. Traces have been moved from "info" to "debug" level to reduce output when not needed. Signed-off-by: Stephen J Day <stephen.day@docker.com>
664 lines
21 KiB
Go
664 lines
21 KiB
Go
package handlers
|
|
|
|
import (
|
|
"expvar"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/docker/distribution"
|
|
"github.com/docker/distribution/configuration"
|
|
ctxu "github.com/docker/distribution/context"
|
|
"github.com/docker/distribution/notifications"
|
|
"github.com/docker/distribution/registry/api/v2"
|
|
"github.com/docker/distribution/registry/auth"
|
|
registrymiddleware "github.com/docker/distribution/registry/middleware/registry"
|
|
repositorymiddleware "github.com/docker/distribution/registry/middleware/repository"
|
|
"github.com/docker/distribution/registry/storage"
|
|
"github.com/docker/distribution/registry/storage/cache"
|
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
|
"github.com/docker/distribution/registry/storage/driver/factory"
|
|
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
|
|
"github.com/garyburd/redigo/redis"
|
|
"github.com/gorilla/mux"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
// App is a global registry application object. Shared resources can be placed
|
|
// on this object that will be accessible from all requests. Any writable
|
|
// fields should be protected.
|
|
type App struct {
|
|
context.Context
|
|
|
|
Config configuration.Configuration
|
|
|
|
router *mux.Router // main application router, configured with dispatchers
|
|
driver storagedriver.StorageDriver // driver maintains the app global storage driver instance.
|
|
registry distribution.Namespace // registry is the primary registry backend for the app instance.
|
|
accessController auth.AccessController // main access controller for application
|
|
|
|
// events contains notification related configuration.
|
|
events struct {
|
|
sink notifications.Sink
|
|
source notifications.SourceRecord
|
|
}
|
|
|
|
redis *redis.Pool
|
|
}
|
|
|
|
// NewApp takes a configuration and returns a configured app, ready to serve
|
|
// requests. The app only implements ServeHTTP and can be wrapped in other
|
|
// handlers accordingly.
|
|
func NewApp(ctx context.Context, configuration configuration.Configuration) *App {
|
|
app := &App{
|
|
Config: configuration,
|
|
Context: ctx,
|
|
router: v2.RouterWithPrefix(configuration.HTTP.Prefix),
|
|
}
|
|
|
|
app.Context = ctxu.WithLogger(app.Context, ctxu.GetLogger(app, "instance.id"))
|
|
|
|
// Register the handler dispatchers.
|
|
app.register(v2.RouteNameBase, func(ctx *Context, r *http.Request) http.Handler {
|
|
return http.HandlerFunc(apiBase)
|
|
})
|
|
app.register(v2.RouteNameManifest, imageManifestDispatcher)
|
|
app.register(v2.RouteNameTags, tagsDispatcher)
|
|
app.register(v2.RouteNameBlob, blobDispatcher)
|
|
app.register(v2.RouteNameBlobUpload, blobUploadDispatcher)
|
|
app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher)
|
|
|
|
var err error
|
|
app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
|
|
if err != nil {
|
|
// TODO(stevvooe): Move the creation of a service into a protected
|
|
// method, where this is created lazily. Its status can be queried via
|
|
// a health check.
|
|
panic(err)
|
|
}
|
|
|
|
purgeConfig := uploadPurgeDefaultConfig()
|
|
if mc, ok := configuration.Storage["maintenance"]; ok {
|
|
for k, v := range mc {
|
|
switch k {
|
|
case "uploadpurging":
|
|
purgeConfig = v.(map[interface{}]interface{})
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
startUploadPurger(app, app.driver, ctxu.GetLogger(app), purgeConfig)
|
|
|
|
app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"])
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
app.configureEvents(&configuration)
|
|
app.configureRedis(&configuration)
|
|
|
|
// configure storage caches
|
|
if cc, ok := configuration.Storage["cache"]; ok {
|
|
v, ok := cc["blobdescriptor"]
|
|
if !ok {
|
|
// Backwards compatible: "layerinfo" == "blobdescriptor"
|
|
v = cc["layerinfo"]
|
|
}
|
|
|
|
switch v {
|
|
case "redis":
|
|
if app.redis == nil {
|
|
panic("redis configuration required to use for layerinfo cache")
|
|
}
|
|
app.registry = storage.NewRegistryWithDriver(app, app.driver, cache.NewRedisBlobDescriptorCacheProvider(app.redis))
|
|
ctxu.GetLogger(app).Infof("using redis blob descriptor cache")
|
|
case "inmemory":
|
|
app.registry = storage.NewRegistryWithDriver(app, app.driver, cache.NewInMemoryBlobDescriptorCacheProvider())
|
|
ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache")
|
|
default:
|
|
if v != "" {
|
|
ctxu.GetLogger(app).Warnf("unkown cache type %q, caching disabled", configuration.Storage["cache"])
|
|
}
|
|
}
|
|
}
|
|
|
|
if app.registry == nil {
|
|
// configure the registry if no cache section is available.
|
|
app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil)
|
|
}
|
|
|
|
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
authType := configuration.Auth.Type()
|
|
|
|
if authType != "" {
|
|
accessController, err := auth.GetAccessController(configuration.Auth.Type(), configuration.Auth.Parameters())
|
|
if err != nil {
|
|
panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err))
|
|
}
|
|
app.accessController = accessController
|
|
}
|
|
|
|
return app
|
|
}
|
|
|
|
// register a handler with the application, by route name. The handler will be
|
|
// passed through the application filters and context will be constructed at
|
|
// request time.
|
|
func (app *App) register(routeName string, dispatch dispatchFunc) {
|
|
|
|
// TODO(stevvooe): This odd dispatcher/route registration is by-product of
|
|
// some limitations in the gorilla/mux router. We are using it to keep
|
|
// routing consistent between the client and server, but we may want to
|
|
// replace it with manual routing and structure-based dispatch for better
|
|
// control over the request execution.
|
|
|
|
app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch))
|
|
}
|
|
|
|
// configureEvents prepares the event sink for action.
|
|
func (app *App) configureEvents(configuration *configuration.Configuration) {
|
|
// Configure all of the endpoint sinks.
|
|
var sinks []notifications.Sink
|
|
for _, endpoint := range configuration.Notifications.Endpoints {
|
|
if endpoint.Disabled {
|
|
ctxu.GetLogger(app).Infof("endpoint %s disabled, skipping", endpoint.Name)
|
|
continue
|
|
}
|
|
|
|
ctxu.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
|
|
endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
|
|
Timeout: endpoint.Timeout,
|
|
Threshold: endpoint.Threshold,
|
|
Backoff: endpoint.Backoff,
|
|
Headers: endpoint.Headers,
|
|
})
|
|
|
|
sinks = append(sinks, endpoint)
|
|
}
|
|
|
|
// NOTE(stevvooe): Moving to a new queueing implementation is as easy as
|
|
// replacing broadcaster with a rabbitmq implementation. It's recommended
|
|
// that the registry instances also act as the workers to keep deployment
|
|
// simple.
|
|
app.events.sink = notifications.NewBroadcaster(sinks...)
|
|
|
|
// Populate registry event source
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = configuration.HTTP.Addr
|
|
} else {
|
|
// try to pick the port off the config
|
|
_, port, err := net.SplitHostPort(configuration.HTTP.Addr)
|
|
if err == nil {
|
|
hostname = net.JoinHostPort(hostname, port)
|
|
}
|
|
}
|
|
|
|
app.events.source = notifications.SourceRecord{
|
|
Addr: hostname,
|
|
InstanceID: ctxu.GetStringValue(app, "instance.id"),
|
|
}
|
|
}
|
|
|
|
func (app *App) configureRedis(configuration *configuration.Configuration) {
|
|
if configuration.Redis.Addr == "" {
|
|
ctxu.GetLogger(app).Infof("redis not configured")
|
|
return
|
|
}
|
|
|
|
pool := &redis.Pool{
|
|
Dial: func() (redis.Conn, error) {
|
|
// TODO(stevvooe): Yet another use case for contextual timing.
|
|
ctx := context.WithValue(app, "redis.connect.startedat", time.Now())
|
|
|
|
done := func(err error) {
|
|
logger := ctxu.GetLoggerWithField(ctx, "redis.connect.duration",
|
|
ctxu.Since(ctx, "redis.connect.startedat"))
|
|
if err != nil {
|
|
logger.Errorf("redis: error connecting: %v", err)
|
|
} else {
|
|
logger.Infof("redis: connect %v", configuration.Redis.Addr)
|
|
}
|
|
}
|
|
|
|
conn, err := redis.DialTimeout("tcp",
|
|
configuration.Redis.Addr,
|
|
configuration.Redis.DialTimeout,
|
|
configuration.Redis.ReadTimeout,
|
|
configuration.Redis.WriteTimeout)
|
|
if err != nil {
|
|
ctxu.GetLogger(app).Errorf("error connecting to redis instance %s: %v",
|
|
configuration.Redis.Addr, err)
|
|
done(err)
|
|
return nil, err
|
|
}
|
|
|
|
// authorize the connection
|
|
if configuration.Redis.Password != "" {
|
|
if _, err = conn.Do("AUTH", configuration.Redis.Password); err != nil {
|
|
defer conn.Close()
|
|
done(err)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// select the database to use
|
|
if configuration.Redis.DB != 0 {
|
|
if _, err = conn.Do("SELECT", configuration.Redis.DB); err != nil {
|
|
defer conn.Close()
|
|
done(err)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
done(nil)
|
|
return conn, nil
|
|
},
|
|
MaxIdle: configuration.Redis.Pool.MaxIdle,
|
|
MaxActive: configuration.Redis.Pool.MaxActive,
|
|
IdleTimeout: configuration.Redis.Pool.IdleTimeout,
|
|
TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
|
// TODO(stevvooe): We can probably do something more interesting
|
|
// here with the health package.
|
|
_, err := c.Do("PING")
|
|
return err
|
|
},
|
|
Wait: false, // if a connection is not avialable, proceed without cache.
|
|
}
|
|
|
|
app.redis = pool
|
|
|
|
// setup expvar
|
|
registry := expvar.Get("registry")
|
|
if registry == nil {
|
|
registry = expvar.NewMap("registry")
|
|
}
|
|
|
|
registry.(*expvar.Map).Set("redis", expvar.Func(func() interface{} {
|
|
return map[string]interface{}{
|
|
"Config": configuration.Redis,
|
|
"Active": app.redis.ActiveCount(),
|
|
}
|
|
}))
|
|
}
|
|
|
|
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close() // ensure that request body is always closed.
|
|
|
|
// Instantiate an http context here so we can track the error codes
|
|
// returned by the request router.
|
|
ctx := defaultContextManager.context(app, w, r)
|
|
defer func() {
|
|
ctxu.GetResponseLogger(ctx).Infof("response completed")
|
|
}()
|
|
defer defaultContextManager.release(ctx)
|
|
|
|
// NOTE(stevvooe): Total hack to get instrumented responsewriter from context.
|
|
var err error
|
|
w, err = ctxu.GetResponseWriter(ctx)
|
|
if err != nil {
|
|
ctxu.GetLogger(ctx).Warnf("response writer not found in context")
|
|
}
|
|
|
|
// Set a header with the Docker Distribution API Version for all responses.
|
|
w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
|
|
app.router.ServeHTTP(w, r)
|
|
}
|
|
|
|
// dispatchFunc takes a context and request and returns a constructed handler
|
|
// for the route. The dispatcher will use this to dynamically create request
|
|
// specific handlers for each endpoint without creating a new router for each
|
|
// request.
|
|
type dispatchFunc func(ctx *Context, r *http.Request) http.Handler
|
|
|
|
// TODO(stevvooe): dispatchers should probably have some validation error
|
|
// chain with proper error reporting.
|
|
|
|
// dispatcher returns a handler that constructs a request specific context and
|
|
// handler, using the dispatch factory function.
|
|
func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
context := app.context(w, r)
|
|
|
|
if err := app.authorized(w, r, context); err != nil {
|
|
ctxu.GetLogger(context).Errorf("error authorizing context: %v", err)
|
|
return
|
|
}
|
|
|
|
// Add username to request logging
|
|
context.Context = ctxu.WithLogger(context.Context, ctxu.GetLogger(context.Context, "auth.user.name"))
|
|
|
|
if app.nameRequired(r) {
|
|
repository, err := app.registry.Repository(context, getName(context))
|
|
|
|
if err != nil {
|
|
ctxu.GetLogger(context).Errorf("error resolving repository: %v", err)
|
|
|
|
switch err := err.(type) {
|
|
case distribution.ErrRepositoryUnknown:
|
|
context.Errors.Push(v2.ErrorCodeNameUnknown, err)
|
|
case distribution.ErrRepositoryNameInvalid:
|
|
context.Errors.Push(v2.ErrorCodeNameInvalid, err)
|
|
}
|
|
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
serveJSON(w, context.Errors)
|
|
return
|
|
}
|
|
|
|
// assign and decorate the authorized repository with an event bridge.
|
|
context.Repository = notifications.Listen(
|
|
repository,
|
|
app.eventBridge(context, r))
|
|
|
|
context.Repository, err = applyRepoMiddleware(context.Repository, app.Config.Middleware["repository"])
|
|
if err != nil {
|
|
ctxu.GetLogger(context).Errorf("error initializing repository middleware: %v", err)
|
|
context.Errors.Push(v2.ErrorCodeUnknown, err)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
serveJSON(w, context.Errors)
|
|
return
|
|
}
|
|
}
|
|
|
|
dispatch(context, r).ServeHTTP(w, r)
|
|
// Automated error response handling here. Handlers may return their
|
|
// own errors if they need different behavior (such as range errors
|
|
// for layer upload).
|
|
if context.Errors.Len() > 0 {
|
|
if context.Value("http.response.status") == 0 {
|
|
// TODO(stevvooe): Getting this value from the context is a
|
|
// bit of a hack. We can further address with some of our
|
|
// future refactoring.
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
}
|
|
app.logError(context, context.Errors)
|
|
serveJSON(w, context.Errors)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (app *App) logError(context context.Context, errors v2.Errors) {
|
|
for _, e := range errors.Errors {
|
|
c := ctxu.WithValue(context, "err.code", e.Code)
|
|
c = ctxu.WithValue(c, "err.message", e.Message)
|
|
c = ctxu.WithValue(c, "err.detail", e.Detail)
|
|
c = ctxu.WithLogger(c, ctxu.GetLogger(c,
|
|
"err.code",
|
|
"err.message",
|
|
"err.detail"))
|
|
ctxu.GetLogger(c).Errorf("An error occured")
|
|
}
|
|
}
|
|
|
|
// context constructs the context object for the application. This only be
|
|
// called once per request.
|
|
func (app *App) context(w http.ResponseWriter, r *http.Request) *Context {
|
|
ctx := defaultContextManager.context(app, w, r)
|
|
ctx = ctxu.WithVars(ctx, r)
|
|
ctx = ctxu.WithLogger(ctx, ctxu.GetLogger(ctx,
|
|
"vars.name",
|
|
"vars.reference",
|
|
"vars.digest",
|
|
"vars.uuid"))
|
|
|
|
context := &Context{
|
|
App: app,
|
|
Context: ctx,
|
|
urlBuilder: v2.NewURLBuilderFromRequest(r),
|
|
}
|
|
|
|
return context
|
|
}
|
|
|
|
// authorized checks if the request can proceed with access to the requested
|
|
// repository. If it succeeds, the context may access the requested
|
|
// repository. An error will be returned if access is not available.
|
|
func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Context) error {
|
|
ctxu.GetLogger(context).Debug("authorizing request")
|
|
repo := getName(context)
|
|
|
|
if app.accessController == nil {
|
|
return nil // access controller is not enabled.
|
|
}
|
|
|
|
var accessRecords []auth.Access
|
|
|
|
if repo != "" {
|
|
accessRecords = appendAccessRecords(accessRecords, r.Method, repo)
|
|
} else {
|
|
// Only allow the name not to be set on the base route.
|
|
if app.nameRequired(r) {
|
|
// For this to be properly secured, repo must always be set for a
|
|
// resource that may make a modification. The only condition under
|
|
// which name is not set and we still allow access is when the
|
|
// base route is accessed. This section prevents us from making
|
|
// that mistake elsewhere in the code, allowing any operation to
|
|
// proceed.
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
w.WriteHeader(http.StatusForbidden)
|
|
|
|
var errs v2.Errors
|
|
errs.Push(v2.ErrorCodeUnauthorized)
|
|
serveJSON(w, errs)
|
|
return fmt.Errorf("forbidden: no repository name")
|
|
}
|
|
}
|
|
|
|
ctx, err := app.accessController.Authorized(context.Context, accessRecords...)
|
|
if err != nil {
|
|
switch err := err.(type) {
|
|
case auth.Challenge:
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
err.ServeHTTP(w, r)
|
|
|
|
var errs v2.Errors
|
|
errs.Push(v2.ErrorCodeUnauthorized, accessRecords)
|
|
serveJSON(w, errs)
|
|
default:
|
|
// This condition is a potential security problem either in
|
|
// the configuration or whatever is backing the access
|
|
// controller. Just return a bad request with no information
|
|
// to avoid exposure. The request should not proceed.
|
|
ctxu.GetLogger(context).Errorf("error checking authorization: %v", err)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// TODO(stevvooe): This pattern needs to be cleaned up a bit. One context
|
|
// should be replaced by another, rather than replacing the context on a
|
|
// mutable object.
|
|
context.Context = ctx
|
|
return nil
|
|
}
|
|
|
|
// eventBridge returns a bridge for the current request, configured with the
|
|
// correct actor and source.
|
|
func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener {
|
|
actor := notifications.ActorRecord{
|
|
Name: getUserName(ctx, r),
|
|
}
|
|
request := notifications.NewRequestRecord(ctxu.GetRequestID(ctx), r)
|
|
|
|
return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink)
|
|
}
|
|
|
|
// nameRequired returns true if the route requires a name.
|
|
func (app *App) nameRequired(r *http.Request) bool {
|
|
route := mux.CurrentRoute(r)
|
|
return route == nil || route.GetName() != v2.RouteNameBase
|
|
}
|
|
|
|
// apiBase implements a simple yes-man for doing overall checks against the
|
|
// api. This can support auth roundtrips to support docker login.
|
|
func apiBase(w http.ResponseWriter, r *http.Request) {
|
|
const emptyJSON = "{}"
|
|
// Provide a simple /v2/ 200 OK response with empty json response.
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
w.Header().Set("Content-Length", fmt.Sprint(len(emptyJSON)))
|
|
|
|
fmt.Fprint(w, emptyJSON)
|
|
}
|
|
|
|
// appendAccessRecords checks the method and adds the appropriate Access records to the records list.
|
|
func appendAccessRecords(records []auth.Access, method string, repo string) []auth.Access {
|
|
resource := auth.Resource{
|
|
Type: "repository",
|
|
Name: repo,
|
|
}
|
|
|
|
switch method {
|
|
case "GET", "HEAD":
|
|
records = append(records,
|
|
auth.Access{
|
|
Resource: resource,
|
|
Action: "pull",
|
|
})
|
|
case "POST", "PUT", "PATCH":
|
|
records = append(records,
|
|
auth.Access{
|
|
Resource: resource,
|
|
Action: "pull",
|
|
},
|
|
auth.Access{
|
|
Resource: resource,
|
|
Action: "push",
|
|
})
|
|
case "DELETE":
|
|
// DELETE access requires full admin rights, which is represented
|
|
// as "*". This may not be ideal.
|
|
records = append(records,
|
|
auth.Access{
|
|
Resource: resource,
|
|
Action: "*",
|
|
})
|
|
}
|
|
return records
|
|
}
|
|
|
|
// applyRegistryMiddleware wraps a registry instance with the configured middlewares
|
|
func applyRegistryMiddleware(registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) {
|
|
for _, mw := range middlewares {
|
|
rmw, err := registrymiddleware.Get(mw.Name, mw.Options, registry)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err)
|
|
}
|
|
registry = rmw
|
|
}
|
|
return registry, nil
|
|
|
|
}
|
|
|
|
// applyRepoMiddleware wraps a repository with the configured middlewares
|
|
func applyRepoMiddleware(repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) {
|
|
for _, mw := range middlewares {
|
|
rmw, err := repositorymiddleware.Get(mw.Name, mw.Options, repository)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
repository = rmw
|
|
}
|
|
return repository, nil
|
|
}
|
|
|
|
// applyStorageMiddleware wraps a storage driver with the configured middlewares
|
|
func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []configuration.Middleware) (storagedriver.StorageDriver, error) {
|
|
for _, mw := range middlewares {
|
|
smw, err := storagemiddleware.Get(mw.Name, mw.Options, driver)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to configure storage middleware (%s): %v", mw.Name, err)
|
|
}
|
|
driver = smw
|
|
}
|
|
return driver, nil
|
|
}
|
|
|
|
// uploadPurgeDefaultConfig provides a default configuration for upload
|
|
// purging to be used in the absence of configuration in the
|
|
// confifuration file
|
|
func uploadPurgeDefaultConfig() map[interface{}]interface{} {
|
|
config := map[interface{}]interface{}{}
|
|
config["enabled"] = true
|
|
config["age"] = "168h"
|
|
config["interval"] = "24h"
|
|
config["dryrun"] = false
|
|
return config
|
|
}
|
|
|
|
func badPurgeUploadConfig(reason string) {
|
|
panic(fmt.Sprintf("Unable to parse upload purge configuration: %s", reason))
|
|
}
|
|
|
|
// startUploadPurger schedules a goroutine which will periodically
|
|
// check upload directories for old files and delete them
|
|
func startUploadPurger(ctx context.Context, storageDriver storagedriver.StorageDriver, log ctxu.Logger, config map[interface{}]interface{}) {
|
|
if config["enabled"] == false {
|
|
return
|
|
}
|
|
|
|
var purgeAgeDuration time.Duration
|
|
var err error
|
|
purgeAge, ok := config["age"]
|
|
if ok {
|
|
ageStr, ok := purgeAge.(string)
|
|
if !ok {
|
|
badPurgeUploadConfig("age is not a string")
|
|
}
|
|
purgeAgeDuration, err = time.ParseDuration(ageStr)
|
|
if err != nil {
|
|
badPurgeUploadConfig(fmt.Sprintf("Cannot parse duration: %s", err.Error()))
|
|
}
|
|
} else {
|
|
badPurgeUploadConfig("age missing")
|
|
}
|
|
|
|
var intervalDuration time.Duration
|
|
interval, ok := config["interval"]
|
|
if ok {
|
|
intervalStr, ok := interval.(string)
|
|
if !ok {
|
|
badPurgeUploadConfig("interval is not a string")
|
|
}
|
|
|
|
intervalDuration, err = time.ParseDuration(intervalStr)
|
|
if err != nil {
|
|
badPurgeUploadConfig(fmt.Sprintf("Cannot parse interval: %s", err.Error()))
|
|
}
|
|
} else {
|
|
badPurgeUploadConfig("interval missing")
|
|
}
|
|
|
|
var dryRunBool bool
|
|
dryRun, ok := config["dryrun"]
|
|
if ok {
|
|
dryRunBool, ok = dryRun.(bool)
|
|
if !ok {
|
|
badPurgeUploadConfig("cannot parse dryrun")
|
|
}
|
|
} else {
|
|
badPurgeUploadConfig("dryrun missing")
|
|
}
|
|
|
|
go func() {
|
|
rand.Seed(time.Now().Unix())
|
|
jitter := time.Duration(rand.Int()%60) * time.Minute
|
|
log.Infof("Starting upload purge in %s", jitter)
|
|
time.Sleep(jitter)
|
|
|
|
for {
|
|
storage.PurgeUploads(ctx, storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool)
|
|
log.Infof("Starting upload purge in %s", intervalDuration)
|
|
time.Sleep(intervalDuration)
|
|
}
|
|
}()
|
|
}
|