diff --git a/cmd/registry/config.yml b/cmd/registry/config.yml index bf79ca8f..bb3ade11 100644 --- a/cmd/registry/config.yml +++ b/cmd/registry/config.yml @@ -5,3 +5,22 @@ storage: rootdirectory: /tmp/registry-dev http: addr: :5000 + secret: asecretforlocaldevelopment + debug: + addr: localhost:5001 +notifications: + endpoints: + - name: local-8082 + url: http://localhost:5003/callback + headers: + Authorization: [Bearer ] + timeout: 1s + threshold: 10 + backoff: 1s + disabled: true + - name: local-8083 + url: http://localhost:8083/callback + timeout: 1s + threshold: 10 + backoff: 1s + disabled: true diff --git a/cmd/registry/main.go b/cmd/registry/main.go index 73c85ae1..98f3d5bd 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -1,6 +1,7 @@ package main import ( + _ "expvar" "flag" "fmt" "net/http" @@ -47,6 +48,10 @@ func main() { handler = handlers.CombinedLoggingHandler(os.Stdout, handler) log.SetLevel(logLevel(config.Loglevel)) + if config.HTTP.Debug.Addr != "" { + go debugServer(config.HTTP.Debug.Addr) + } + if config.HTTP.TLS.Certificate == "" { log.Infof("listening on %v", config.HTTP.Addr) if err := http.ListenAndServe(config.HTTP.Addr, handler); err != nil { @@ -142,3 +147,13 @@ func configureReporting(app *registry.App) http.Handler { return handler } + +// debugServer starts the debug server with pprof, expvar among other +// endpoints. The addr should not be exposed externally. For most of these to +// work, tls cannot be enabled on the endpoint, so it is generally separate. +func debugServer(addr string) { + log.Infof("debug server listening %v", addr) + if err := http.ListenAndServe(addr, nil); err != nil { + log.Fatalf("error listening on debug interface: %v", err) + } +} diff --git a/configuration/configuration.go b/configuration/configuration.go index 8dd71e78..ed086ba5 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -4,8 +4,10 @@ import ( "fmt" "io" "io/ioutil" + "net/http" "reflect" "strings" + "time" ) // Configuration is a versioned registry configuration, intended to be provided by a yaml file, and @@ -54,7 +56,19 @@ type Configuration struct { // Certificate. Key string `yaml:"key"` } `yaml:"tls"` + + // Debug configures the http debug interface, if specified. This can + // include services such as pprof, expvar and other data that should + // not be exposed externally. Left disabled by default. + Debug struct { + // Addr specifies the bind address for the debug server. + Addr string `yaml:"addr"` + } `yaml:"debug"` } `yaml:"http"` + + // Notifications specifies configuration about various endpoint to which + // registry events are dispatched. + Notifications Notifications `yaml:"notifications"` } // v0_1Configuration is a Version 0.1 Configuration struct @@ -232,6 +246,26 @@ func (auth Auth) MarshalYAML() (interface{}, error) { return map[string]Parameters(auth), nil } +// Notifications configures multiple http endpoints. +type Notifications struct { + // Endpoints is a list of http configurations for endpoints that + // respond to webhook notifications. In the future, we may allow other + // kinds of endpoints, such as external queues. + Endpoints []Endpoint `yaml:"endpoints"` +} + +// Endpoint describes the configuration of an http webhook notification +// endpoint. +type Endpoint struct { + Name string `yaml:"name"` // identifies the endpoint in the registry instance. + Disabled bool `yaml:"disabled"` // disables the endpoint + URL string `yaml:"url"` // post url for the endpoint. + Headers http.Header `yaml:"headers"` // static headers that should be added to all requests + Timeout time.Duration `yaml:"timeout"` // HTTP timeout + Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure + Backoff time.Duration `yaml:"backoff"` // backoff duration +} + // Reporting defines error reporting methods. type Reporting struct { // Bugsnag configures error reporting for Bugsnag (bugsnag.com). diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index ad845a41..5a6abf90 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -2,6 +2,7 @@ package configuration import ( "bytes" + "net/http" "os" "testing" @@ -40,6 +41,17 @@ var configStruct = Configuration{ APIKey: "BugsnagApiKey", }, }, + Notifications: Notifications{ + Endpoints: []Endpoint{ + { + Name: "endpoint-1", + URL: "http://example.com", + Headers: http.Header{ + "Authorization": []string{"Bearer "}, + }, + }, + }, + }, } // configYamlV0_1 is a Version 0.1 yaml document representing configStruct @@ -61,6 +73,12 @@ auth: silly: realm: silly service: silly +notifications: + endpoints: + - name: endpoint-1 + url: http://example.com + headers: + Authorization: [Bearer ] reporting: bugsnag: apikey: BugsnagApiKey @@ -76,6 +94,12 @@ auth: silly: realm: silly service: silly +notifications: + endpoints: + - name: endpoint-1 + url: http://example.com + headers: + Authorization: [Bearer ] ` type ConfigSuite struct { @@ -129,6 +153,7 @@ func (suite *ConfigSuite) TestParseIncomplete(c *C) { suite.expectedConfig.Storage = Storage{"filesystem": Parameters{"rootdirectory": "/tmp/testroot"}} suite.expectedConfig.Auth = Auth{"silly": Parameters{"realm": "silly"}} suite.expectedConfig.Reporting = Reporting{} + suite.expectedConfig.Notifications = Notifications{} os.Setenv("REGISTRY_STORAGE", "filesystem") os.Setenv("REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY", "/tmp/testroot") @@ -292,5 +317,10 @@ func copyConfig(config Configuration) *Configuration { configCopy.Auth.setParameter(k, v) } + configCopy.Notifications = Notifications{Endpoints: []Endpoint{}} + for _, v := range config.Notifications.Endpoints { + configCopy.Notifications.Endpoints = append(configCopy.Notifications.Endpoints, v) + } + return configCopy } diff --git a/manifest/manifest.go b/manifest/manifest.go index ad3ecd00..8ea4d14d 100644 --- a/manifest/manifest.go +++ b/manifest/manifest.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/docker/distribution/digest" + "github.com/docker/libtrust" ) // Versioned provides a struct with just the manifest schemaVersion. Incoming @@ -62,6 +63,31 @@ func (sm *SignedManifest) UnmarshalJSON(b []byte) error { return nil } +// Payload returns the raw, signed content of the signed manifest. The +// contents can be used to calculate the content identifier. +func (sm *SignedManifest) Payload() ([]byte, error) { + jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures") + if err != nil { + return nil, err + } + + // Resolve the payload in the manifest. + return jsig.Payload() +} + +// Signatures returns the signatures as provided by +// (*libtrust.JSONSignature).Signatures. The byte slices are opaque jws +// signatures. +func (sm *SignedManifest) Signatures() ([][]byte, error) { + jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures") + if err != nil { + return nil, err + } + + // Resolve the payload in the manifest. + return jsig.Signatures() +} + // MarshalJSON returns the contents of raw. If Raw is nil, marshals the inner // contents. Applications requiring a marshaled signed manifest should simply // use Raw directly, since the the content produced by json.Marshal will be diff --git a/registry/app.go b/registry/app.go index b5cb6776..53759a1e 100644 --- a/registry/app.go +++ b/registry/app.go @@ -2,16 +2,19 @@ package registry import ( "fmt" + "net" "net/http" + "os" + "code.google.com/p/go-uuid/uuid" + log "github.com/Sirupsen/logrus" "github.com/docker/distribution/api/v2" "github.com/docker/distribution/auth" "github.com/docker/distribution/configuration" "github.com/docker/distribution/storage" + "github.com/docker/distribution/storage/notifications" "github.com/docker/distribution/storagedriver" "github.com/docker/distribution/storagedriver/factory" - - log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" ) @@ -21,17 +24,22 @@ import ( type App struct { Config configuration.Configuration - router *mux.Router + // InstanceID is a unique id assigned to the application on each creation. + // Provides information in the logs and context to identify restarts. + InstanceID string - // driver maintains the app global storage driver instance. - driver storagedriver.StorageDriver + router *mux.Router // main application router, configured with dispatchers + driver storagedriver.StorageDriver // driver maintains the app global storage driver instance. + registry storage.Registry // registry is the primary registry backend for the app instance. + accessController auth.AccessController // main access controller for application - // registry is the primary registry backend for the app instance. - registry storage.Registry + // events contains notification related configuration. + events struct { + sink notifications.Sink + source notifications.SourceRecord + } - layerHandler storage.LayerHandler - - accessController auth.AccessController + layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider } // NewApp takes a configuration and returns a configured app, ready to serve @@ -39,8 +47,9 @@ type App struct { // handlers accordingly. func NewApp(configuration configuration.Configuration) *App { app := &App{ - Config: configuration, - router: v2.Router(), + Config: configuration, + InstanceID: uuid.New(), + router: v2.Router(), } // Register the handler dispatchers. @@ -53,7 +62,8 @@ func NewApp(configuration configuration.Configuration) *App { app.register(v2.RouteNameBlobUpload, layerUploadDispatcher) app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher) - driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) + var err error + app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) if err != nil { // TODO(stevvooe): Move the creation of a service into a protected @@ -62,7 +72,7 @@ func NewApp(configuration configuration.Configuration) *App { panic(err) } - app.driver = driver + app.configureEvents(&configuration) app.registry = storage.NewRegistryWithDriver(app.driver) authType := configuration.Auth.Type() @@ -77,7 +87,7 @@ func NewApp(configuration configuration.Configuration) *App { layerHandlerType := configuration.LayerHandler.Type() if layerHandlerType != "" { - lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), driver) + lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver) if err != nil { panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err)) } @@ -87,12 +97,6 @@ func NewApp(configuration configuration.Configuration) *App { return app } -func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Set a header with the Docker Distribution API Version for all responses. - w.Header().Add("Docker-Distribution-API-Version", "registry/2.0") - app.router.ServeHTTP(w, r) -} - // register a handler with the application, by route name. The handler will be // passed through the application filters and context will be constructed at // request time. @@ -107,6 +111,59 @@ func (app *App) register(routeName string, dispatch dispatchFunc) { app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch)) } +// configureEvents prepares the event sink for action. +func (app *App) configureEvents(configuration *configuration.Configuration) { + // Configure all of the endpoint sinks. + var sinks []notifications.Sink + for _, endpoint := range configuration.Notifications.Endpoints { + if endpoint.Disabled { + log.Infof("endpoint %s disabled, skipping", endpoint.Name) + continue + } + + log.Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers) + endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{ + Timeout: endpoint.Timeout, + Threshold: endpoint.Threshold, + Backoff: endpoint.Backoff, + Headers: endpoint.Headers, + }) + + sinks = append(sinks, endpoint) + } + + // NOTE(stevvooe): Moving to a new queueing implementation is as easy as + // replacing broadcaster with a rabbitmq implementation. It's recommended + // that the registry instances also act as the workers to keep deployment + // simple. + app.events.sink = notifications.NewBroadcaster(sinks...) + + // Populate registry event source + hostname, err := os.Hostname() + if err != nil { + hostname = configuration.HTTP.Addr + } else { + // try to pick the port off the config + _, port, err := net.SplitHostPort(configuration.HTTP.Addr) + if err == nil { + hostname = net.JoinHostPort(hostname, port) + } + } + + app.events.source = notifications.SourceRecord{ + Addr: hostname, + InstanceID: app.InstanceID, + } +} + +func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() // ensure that request body is always closed. + + // Set a header with the Docker Distribution API Version for all responses. + w.Header().Add("Docker-Distribution-API-Version", "registry/2.0") + app.router.ServeHTTP(w, r) +} + // dispatchFunc takes a context and request and returns a constructed handler // for the route. The dispatcher will use this to dynamically create request // specific handlers for each endpoint without creating a new router for each @@ -142,11 +199,14 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { return } + // decorate the authorized repository with an event bridge. + context.Repository = notifications.Listen( + context.Repository, app.eventBridge(context, r)) + context.log = log.WithField("name", context.Repository.Name()) handler := dispatch(context, r) ssrw := &singleStatusResponseWriter{ResponseWriter: w} - context.log.Infoln("handler", resolveHandlerName(r.Method, handler)) handler.ServeHTTP(ssrw, r) // Automated error response handling here. Handlers may return their @@ -167,6 +227,7 @@ func (app *App) context(r *http.Request) *Context { vars := mux.Vars(r) context := &Context{ App: app, + RequestID: uuid.New(), urlBuilder: v2.NewURLBuilderFromRequest(r), } @@ -268,6 +329,20 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont return nil } +// eventBridge returns a bridge for the current request, configured with the +// correct actor and source. +func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener { + // TODO(stevvooe): Need to extract user data from request context using + // auth system. Would prefer to do this during logging refactor and + // addition of user and google context type. + actor := notifications.ActorRecord{ + Name: "--todo--", + } + request := notifications.NewRequestRecord(ctx.RequestID, r) + + return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink) +} + // apiBase implements a simple yes-man for doing overall checks against the // api. This can support auth roundtrips to support docker login. func apiBase(w http.ResponseWriter, r *http.Request) { diff --git a/registry/context.go b/registry/context.go index 8e8d0fed..eaa603a8 100644 --- a/registry/context.go +++ b/registry/context.go @@ -13,6 +13,9 @@ type Context struct { // App points to the application structure that created this context. *App + // RequestID is the unique id of the request. + RequestID string + // Repository is the repository for the current request. All requests // should be scoped to a single repository. This field may be nil. Repository storage.Repository diff --git a/registry/util.go b/registry/util.go deleted file mode 100644 index 976ddf31..00000000 --- a/registry/util.go +++ /dev/null @@ -1,27 +0,0 @@ -package registry - -import ( - "net/http" - "reflect" - "runtime" - - "github.com/gorilla/handlers" -) - -// functionName returns the name of the function fn. -func functionName(fn interface{}) string { - return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() -} - -// resolveHandlerName attempts to resolve a nice, pretty name for the passed -// in handler. -func resolveHandlerName(method string, handler http.Handler) string { - switch v := handler.(type) { - case handlers.MethodHandler: - return functionName(v[method]) - case http.HandlerFunc: - return functionName(v) - default: - return functionName(handler.ServeHTTP) - } -} diff --git a/storage/decorator/decorator.go b/storage/decorator/decorator.go deleted file mode 100644 index 2ebcc94c..00000000 --- a/storage/decorator/decorator.go +++ /dev/null @@ -1,185 +0,0 @@ -package decorator - -import ( - "github.com/docker/distribution/digest" - "github.com/docker/distribution/storage" -) - -// Decorator provides an interface for intercepting object creation within a -// registry. The single method accepts an registry storage object, such as a -// Layer, optionally replacing it upon with an alternative object or a -// wrapper. -// -// For example, if one wants to intercept the instantiation of a layer, an -// implementation might be as follows: -// -// func (md *DecoratorImplementation) Decorate(v interface{}) interface{} { -// switch v := v.(type) { -// case Layer: -// return wrapLayer(v) -// } -// -// // Make sure to return the object or nil if the decorator doesn't require -// // replacement. -// return v -// } -// -// Such a decorator can be used to intercept calls to support implementing -// complex features outside of the storage package. -type Decorator interface { - Decorate(v interface{}) interface{} -} - -// Func provides a shortcut handler for decorators that only need a -// function. Use is similar to http.HandlerFunc. -type Func func(v interface{}) interface{} - -// Decorate allows DecoratorFunc to implement the Decorator interface. -func (df Func) Decorate(v interface{}) interface{} { - return df(v) -} - -// DecorateRegistry the provided registry with decorator. Registries may be -// decorated multiple times. -func DecorateRegistry(registry storage.Registry, decorator Decorator) storage.Registry { - return ®istryDecorator{ - Registry: registry, - decorator: decorator, - } -} - -// registryDecorator intercepts registry object creation with a decorator. -type registryDecorator struct { - storage.Registry - decorator Decorator -} - -// Repository overrides the method of the same name on the Registry, replacing -// the returned instance with a decorator. -func (rd *registryDecorator) Repository(name string) storage.Repository { - delegate := rd.Registry.Repository(name) - decorated := rd.decorator.Decorate(delegate) - if decorated != nil { - repository, ok := decorated.(storage.Repository) - - if ok { - delegate = repository - } - } - - return &repositoryDecorator{ - Repository: delegate, - decorator: rd.decorator, - } -} - -// repositoryDecorator decorates a repository, intercepting calls to Layers -// and Manifests with injected variants. -type repositoryDecorator struct { - storage.Repository - decorator Decorator -} - -// Layers overrides the Layers method of Repository. -func (rd *repositoryDecorator) Layers() storage.LayerService { - delegate := rd.Repository.Layers() - decorated := rd.decorator.Decorate(delegate) - - if decorated != nil { - layers, ok := decorated.(storage.LayerService) - - if ok { - delegate = layers - } - } - - return &layerServiceDecorator{ - LayerService: delegate, - decorator: rd.decorator, - } -} - -// Manifests overrides the Manifests method of Repository. -func (rd *repositoryDecorator) Manifests() storage.ManifestService { - delegate := rd.Repository.Manifests() - decorated := rd.decorator.Decorate(delegate) - - if decorated != nil { - manifests, ok := decorated.(storage.ManifestService) - - if ok { - delegate = manifests - } - } - - // NOTE(stevvooe): We do not have to intercept delegate calls to the - // manifest service since it doesn't produce any interfaces for which - // interception is supported. - return delegate -} - -// layerServiceDecorator intercepts calls that generate Layer and LayerUpload -// instances, replacing them with instances from the decorator. -type layerServiceDecorator struct { - storage.LayerService - decorator Decorator -} - -// Fetch overrides the Fetch method of LayerService. -func (lsd *layerServiceDecorator) Fetch(digest digest.Digest) (storage.Layer, error) { - delegate, err := lsd.LayerService.Fetch(digest) - return decorateLayer(lsd.decorator, delegate), err -} - -// Upload overrides the Upload method of LayerService. -func (lsd *layerServiceDecorator) Upload() (storage.LayerUpload, error) { - delegate, err := lsd.LayerService.Upload() - return decorateLayerUpload(lsd.decorator, delegate), err -} - -// Resume overrides the Resume method of LayerService. -func (lsd *layerServiceDecorator) Resume(uuid string) (storage.LayerUpload, error) { - delegate, err := lsd.LayerService.Resume(uuid) - return decorateLayerUpload(lsd.decorator, delegate), err -} - -// layerUploadDecorator intercepts calls that generate Layer instances, -// replacing them with instances from the decorator. -type layerUploadDecorator struct { - storage.LayerUpload - decorator Decorator -} - -func (lud *layerUploadDecorator) Finish(dgst digest.Digest) (storage.Layer, error) { - delegate, err := lud.LayerUpload.Finish(dgst) - return decorateLayer(lud.decorator, delegate), err -} - -// decorateLayer guarantees that a layer gets correctly decorated. -func decorateLayer(decorator Decorator, delegate storage.Layer) storage.Layer { - decorated := decorator.Decorate(delegate) - if decorated != nil { - layer, ok := decorated.(storage.Layer) - if ok { - delegate = layer - } - } - - return delegate -} - -// decorateLayerUpload guarantees that an upload gets correctly decorated. -func decorateLayerUpload(decorator Decorator, delegate storage.LayerUpload) storage.LayerUpload { - decorated := decorator.Decorate(delegate) - if decorated != nil { - layerUpload, ok := decorated.(storage.LayerUpload) - if ok { - delegate = layerUpload - } - } - - return &layerUploadDecorator{ - LayerUpload: delegate, - decorator: decorator, - } -} diff --git a/storage/manifeststore.go b/storage/manifeststore.go index bc28f3b8..f84a0208 100644 --- a/storage/manifeststore.go +++ b/storage/manifeststore.go @@ -94,6 +94,11 @@ func (ms *manifestStore) Get(tag string) (*manifest.SignedManifest, error) { } func (ms *manifestStore) Put(tag string, manifest *manifest.SignedManifest) error { + // TODO(stevvooe): Add check here to see if the revision is already + // present in the repository. If it is, we should merge the signatures, do + // a shallow verify (or a full one, doesn't matter) and return an error + // indicating what happened. + // Verify the manifest. if err := ms.verifyManifest(tag, manifest); err != nil { return err diff --git a/storage/notifications/bridge.go b/storage/notifications/bridge.go new file mode 100644 index 00000000..d6f41ba0 --- /dev/null +++ b/storage/notifications/bridge.go @@ -0,0 +1,156 @@ +package notifications + +import ( + "net/http" + "time" + + "github.com/docker/distribution/manifest" + + "code.google.com/p/go-uuid/uuid" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storage" +) + +type bridge struct { + ub URLBuilder + actor ActorRecord + source SourceRecord + request RequestRecord + sink Sink +} + +var _ Listener = &bridge{} + +// URLBuilder defines a subset of url builder to be used by the event listener. +type URLBuilder interface { + BuildManifestURL(name, tag string) (string, error) + BuildBlobURL(name string, dgst digest.Digest) (string, error) +} + +// NewBridge returns a notification listener that writes records to sink, +// using the actor and source. Any urls populated in the events created by +// this bridge will be created using the URLBuilder. +// TODO(stevvooe): Update this to simply take a context.Context object. +func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink Sink) Listener { + return &bridge{ + ub: ub, + actor: actor, + source: source, + request: request, + sink: sink, + } +} + +// NewRequestRecord builds a RequestRecord for use in NewBridge from an +// http.Request, associating it with a request id. +func NewRequestRecord(id string, r *http.Request) RequestRecord { + return RequestRecord{ + ID: id, + Addr: r.RemoteAddr, + Host: r.Host, + Method: r.Method, + UserAgent: r.UserAgent(), + } +} + +func (b *bridge) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionPush, repo, sm) +} + +func (b *bridge) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionPull, repo, sm) +} + +func (b *bridge) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionDelete, repo, sm) +} + +func (b *bridge) LayerPushed(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest()) +} + +func (b *bridge) LayerPulled(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest()) +} + +func (b *bridge) LayerDeleted(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest()) +} + +func (b *bridge) createManifestEventAndWrite(action string, repo storage.Repository, sm *manifest.SignedManifest) error { + event, err := b.createManifestEvent(action, repo, sm) + if err != nil { + return err + } + + return b.sink.Write(*event) +} + +func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) { + event := b.createEvent(action) + event.Target.Type = EventTargetTypeManifest + event.Target.Name = repo.Name() + event.Target.Tag = sm.Tag + + p, err := sm.Payload() + if err != nil { + return nil, err + } + + event.Target.Digest, err = digest.FromBytes(p) + if err != nil { + return nil, err + } + + // TODO(stevvooe): Currently, the is the "tag" url: once the digest url is + // implemented, this should be replaced. + event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag) + if err != nil { + return nil, err + } + + return event, nil +} + +func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository, dgst digest.Digest) error { + event, err := b.createLayerEvent(action, repo, dgst) + if err != nil { + return err + } + + return b.sink.Write(*event) +} + +func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) { + event := b.createEvent(action) + event.Target.Type = EventTargetTypeBlob + event.Target.Name = repo.Name() + event.Target.Digest = dgst + + var err error + event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), dgst) + if err != nil { + return nil, err + } + + return event, nil +} + +// createEvent creates an event with actor and source populated. +func (b *bridge) createEvent(action string) *Event { + event := createEvent(action) + event.Source = b.source + event.Actor = b.actor + event.Request = b.request + + return event +} + +// createEvent returns a new event, timestamped, with the specified action. +func createEvent(action string) *Event { + return &Event{ + ID: uuid.New(), + Timestamp: time.Now(), + Action: action, + } +} diff --git a/storage/notifications/endpoint.go b/storage/notifications/endpoint.go new file mode 100644 index 00000000..dfdb111c --- /dev/null +++ b/storage/notifications/endpoint.go @@ -0,0 +1,86 @@ +package notifications + +import ( + "net/http" + "time" +) + +// EndpointConfig covers the optional configuration parameters for an active +// endpoint. +type EndpointConfig struct { + Headers http.Header + Timeout time.Duration + Threshold int + Backoff time.Duration +} + +// defaults set any zero-valued fields to a reasonable default. +func (ec *EndpointConfig) defaults() { + if ec.Timeout <= 0 { + ec.Timeout = time.Second + } + + if ec.Threshold <= 0 { + ec.Threshold = 10 + } + + if ec.Backoff <= 0 { + ec.Backoff = time.Second + } +} + +// Endpoint is a reliable, queued, thread-safe sink that notify external http +// services when events are written. Writes are non-blocking and always +// succeed for callers but events may be queued internally. +type Endpoint struct { + Sink + url string + name string + + EndpointConfig + + metrics *safeMetrics +} + +// NewEndpoint returns a running endpoint, ready to receive events. +func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { + var endpoint Endpoint + endpoint.name = name + endpoint.url = url + endpoint.EndpointConfig = config + endpoint.defaults() + endpoint.metrics = newSafeMetrics() + + // Configures the inmemory queue, retry, http pipeline. + endpoint.Sink = newHTTPSink( + endpoint.url, endpoint.Timeout, endpoint.Headers, + endpoint.metrics.httpStatusListener()) + endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) + endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) + + register(&endpoint) + return &endpoint +} + +// Name returns the name of the endpoint, generally used for debugging. +func (e *Endpoint) Name() string { + return e.name +} + +// URL returns the url of the endpoint. +func (e *Endpoint) URL() string { + return e.url +} + +// ReadMetrics populates em with metrics from the endpoint. +func (e *Endpoint) ReadMetrics(em *EndpointMetrics) { + e.metrics.Lock() + defer e.metrics.Unlock() + + *em = e.metrics.EndpointMetrics + // Map still need to copied in a threadsafe manner. + em.Statuses = make(map[string]int) + for k, v := range e.metrics.Statuses { + em.Statuses[k] = v + } +} diff --git a/storage/notifications/event.go b/storage/notifications/event.go new file mode 100644 index 00000000..c23766fa --- /dev/null +++ b/storage/notifications/event.go @@ -0,0 +1,154 @@ +package notifications + +import ( + "fmt" + "time" + + "github.com/docker/distribution/digest" +) + +// EventAction constants used in action field of Event. +const ( + EventActionPull = "pull" + EventActionPush = "push" + EventActionDelete = "delete" +) + +// EventTargetType constants used in Target section of Event. +const ( + EventTargetTypeManifest = "manifest" + EventTargetTypeBlob = "blob" +) + +// EventsMediaType is the mediatype for the json event envelope. If the Event, +// ActorRecord, SourceRecord or Envelope structs change, the version number +// should be incremented. +const EventsMediaType = "application/vnd.docker.distribution.events.v1+json" + +// Envelope defines the fields of a json event envelope message that can hold +// one or more events. +type Envelope struct { + // Events make up the contents of the envelope. Events present in a single + // envelope are not necessarily related. + Events []Event `json:"events,omitempty"` +} + +// TODO(stevvooe): The event type should be separate from the json format. It +// should be defined as an interface. Leaving as is for now since we don't +// need that at this time. If we make this change, the struct below would be +// called "EventRecord". + +// Event provides the fields required to describe a registry event. +type Event struct { + // ID provides a unique identifier for the event. + ID string `json:"id,omitempty"` + + // Timestamp is the time at which the event occurred. + Timestamp time.Time `json:"timestamp,omitempty"` + + // Action indicates what action encompasses the provided event. + Action string `json:"action,omitempty"` + + // Target uniquely describes the target of the event. + Target struct { + // Type should be "manifest" or "blob" + Type string `json:"type,omitempty"` + + // Name identifies the named repository. + Name string `json:"name,omitempty"` + + // Digest should identify the object in the repository. + Digest digest.Digest `json:"digest,omitempty"` + + // Tag is present if the operation involved a tagged manifest. + Tag string `json:"tag,omitempty"` + + // URL provides a link to the content on the relevant repository instance. + URL string `json:"url,omitempty"` + } `json:"target,omitempty"` + + // Request covers the request that generated the event. + Request RequestRecord `json:"request,omitempty"` + + // Actor specifies the agent that initiated the event. For most + // situations, this could be from the authorizaton context of the request. + Actor ActorRecord `json:"actor,omitempty"` + + // Source identifies the registry node that generated the event. Put + // differently, while the actor "initiates" the event, the source + // "generates" it. + Source SourceRecord `json:"source,omitempty"` +} + +// ActorRecord specifies the agent that initiated the event. For most +// situations, this could be from the authorizaton context of the request. +// Data in this record can refer to both the initiating client and the +// generating request. +type ActorRecord struct { + // Name corresponds to the subject or username associated with the + // request context that generated the event. + Name string `json:"name,omitempty"` + + // TODO(stevvooe): Look into setting a session cookie to get this + // without docker daemon. + // SessionID + + // TODO(stevvooe): Push the "Docker-Command" header to replace cookie and + // get the actual command. + // Command +} + +// RequestRecord covers the request that generated the event. +type RequestRecord struct { + // ID uniquely identifies the request that initiated the event. + ID string `json:"id"` + + // Addr contains the ip or hostname and possibly port of the client + // connection that initiated the event. This is the RemoteAddr from + // the standard http request. + Addr string `json:"addr,omitempty"` + + // Host is the externally accessible host name of the registry instance, + // as specified by the http host header on incoming requests. + Host string `json:"host,omitempty"` + + // Method has the request method that generated the event. + Method string `json:"method"` + + // UserAgent contains the user agent header of the request. + UserAgent string `json:"useragent"` +} + +// SourceRecord identifies the registry node that generated the event. Put +// differently, while the actor "initiates" the event, the source "generates" +// it. +type SourceRecord struct { + // Addr contains the ip or hostname and the port of the registry node + // that generated the event. Generally, this will be resolved by + // os.Hostname() along with the running port. + Addr string `json:"addr,omitempty"` + + // InstanceID identifies a running instance of an application. Changes + // after each restart. + InstanceID string `json:"instanceID,omitempty"` +} + +var ( + // ErrSinkClosed is returned if a write is issued to a sink that has been + // closed. If encountered, the error should be considered terminal and + // retries will not be successful. + ErrSinkClosed = fmt.Errorf("sink: closed") +) + +// Sink accepts and sends events. +type Sink interface { + // Write writes one or more events to the sink. If no error is returned, + // the caller will assume that all events have been committed and will not + // try to send them again. If an error is received, the caller may retry + // sending the event. The caller should cede the slice of memory to the + // sink and not modify it after calling this method. + Write(events ...Event) error + + // Close the sink, possibly waiting for pending events to flush. + Close() error +} diff --git a/storage/notifications/event_test.go b/storage/notifications/event_test.go new file mode 100644 index 00000000..cc2180ac --- /dev/null +++ b/storage/notifications/event_test.go @@ -0,0 +1,145 @@ +package notifications + +import ( + "encoding/json" + "strings" + "testing" + "time" +) + +// TestEventJSONFormat provides silly test to detect if the event format or +// envelope has changed. If this code fails, the revision of the protocol may +// need to be incremented. +func TestEventEnvelopeJSONFormat(t *testing.T) { + var expected = strings.TrimSpace(` +{ + "events": [ + { + "id": "asdf-asdf-asdf-asdf-0", + "timestamp": "2006-01-02T15:04:05Z", + "action": "push", + "target": { + "type": "manifest", + "name": "library/test", + "digest": "sha256:0123456789abcdef0", + "tag": "latest", + "url": "http://example.com/v2/library/test/manifests/latest" + }, + "request": { + "id": "asdfasdf", + "addr": "client.local", + "host": "registrycluster.local", + "method": "PUT", + "useragent": "test/0.1" + }, + "actor": { + "name": "test-actor" + }, + "source": { + "addr": "hostname.local:port" + } + }, + { + "id": "asdf-asdf-asdf-asdf-1", + "timestamp": "2006-01-02T15:04:05Z", + "action": "push", + "target": { + "type": "blob", + "name": "library/test", + "digest": "tarsum.v2+sha256:0123456789abcdef1", + "url": "http://example.com/v2/library/test/manifests/latest" + }, + "request": { + "id": "asdfasdf", + "addr": "client.local", + "host": "registrycluster.local", + "method": "PUT", + "useragent": "test/0.1" + }, + "actor": { + "name": "test-actor" + }, + "source": { + "addr": "hostname.local:port" + } + }, + { + "id": "asdf-asdf-asdf-asdf-2", + "timestamp": "2006-01-02T15:04:05Z", + "action": "push", + "target": { + "type": "blob", + "name": "library/test", + "digest": "tarsum.v2+sha256:0123456789abcdef2", + "url": "http://example.com/v2/library/test/manifests/latest" + }, + "request": { + "id": "asdfasdf", + "addr": "client.local", + "host": "registrycluster.local", + "method": "PUT", + "useragent": "test/0.1" + }, + "actor": { + "name": "test-actor" + }, + "source": { + "addr": "hostname.local:port" + } + } + ] +} + `) + + tm, err := time.Parse(time.RFC3339, time.RFC3339[:len(time.RFC3339)-5]) + if err != nil { + t.Fatalf("error creating time: %v", err) + } + + var prototype Event + prototype.Action = "push" + prototype.Timestamp = tm + prototype.Actor.Name = "test-actor" + prototype.Request.ID = "asdfasdf" + prototype.Request.Addr = "client.local" + prototype.Request.Host = "registrycluster.local" + prototype.Request.Method = "PUT" + prototype.Request.UserAgent = "test/0.1" + prototype.Source.Addr = "hostname.local:port" + + var manifestPush Event + manifestPush = prototype + manifestPush.ID = "asdf-asdf-asdf-asdf-0" + manifestPush.Target.Digest = "sha256:0123456789abcdef0" + manifestPush.Target.Type = EventTargetTypeManifest + manifestPush.Target.Name = "library/test" + manifestPush.Target.Tag = "latest" + manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest" + + var layerPush0 Event + layerPush0 = prototype + layerPush0.ID = "asdf-asdf-asdf-asdf-1" + layerPush0.Target.Digest = "tarsum.v2+sha256:0123456789abcdef1" + layerPush0.Target.Type = EventTargetTypeBlob + layerPush0.Target.Name = "library/test" + layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest" + + var layerPush1 Event + layerPush1 = prototype + layerPush1.ID = "asdf-asdf-asdf-asdf-2" + layerPush1.Target.Digest = "tarsum.v2+sha256:0123456789abcdef2" + layerPush1.Target.Type = EventTargetTypeBlob + layerPush1.Target.Name = "library/test" + layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest" + + var envelope Envelope + envelope.Events = append(envelope.Events, manifestPush, layerPush0, layerPush1) + + p, err := json.MarshalIndent(envelope, "", " ") + if err != nil { + t.Fatalf("unexpected error marshaling envelope: %v", err) + } + if string(p) != expected { + t.Fatalf("format has changed\n%s\n != \n%s", string(p), expected) + } +} diff --git a/storage/notifications/http.go b/storage/notifications/http.go new file mode 100644 index 00000000..15b3574c --- /dev/null +++ b/storage/notifications/http.go @@ -0,0 +1,145 @@ +package notifications + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" +) + +// httpSink implements a single-flight, http notification endpoint. This is +// very lightweight in that it only makes an attempt at an http request. +// Reliability should be provided by the caller. +type httpSink struct { + url string + + mu sync.Mutex + closed bool + client *http.Client + listeners []httpStatusListener + + // TODO(stevvooe): Allow one to configure the media type accepted by this + // sink and choose the serialization based on that. +} + +// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other +// sinks for increased reliability. +func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink { + return &httpSink{ + url: u, + listeners: listeners, + client: &http.Client{ + Transport: &headerRoundTripper{ + Transport: http.DefaultTransport.(*http.Transport), + headers: headers, + }, + Timeout: timeout, + }, + } +} + +// httpStatusListener is called on various outcomes of sending notifications. +type httpStatusListener interface { + success(status int, events ...Event) + failure(status int, events ...Event) + err(err error, events ...Event) +} + +// Accept makes an attempt to notify the endpoint, returning an error if it +// fails. It is the caller's responsibility to retry on error. The events are +// accepted or rejected as a group. +func (hs *httpSink) Write(events ...Event) error { + hs.mu.Lock() + defer hs.mu.Unlock() + + if hs.closed { + return ErrSinkClosed + } + + envelope := Envelope{ + Events: events, + } + + // TODO(stevvooe): It is not ideal to keep re-encoding the request body on + // retry but we are going to do it to keep the code simple. It is likely + // we could change the event struct to manage its own buffer. + + p, err := json.MarshalIndent(envelope, "", " ") + if err != nil { + for _, listener := range hs.listeners { + listener.err(err, events...) + } + return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err) + } + + body := bytes.NewReader(p) + resp, err := hs.client.Post(hs.url, EventsMediaType, body) + if err != nil { + for _, listener := range hs.listeners { + listener.err(err, events...) + } + + return fmt.Errorf("%v: error posting: %v", hs, err) + } + + // The notifier will treat any 2xx or 3xx response as accepted by the + // endpoint. + switch { + case resp.StatusCode >= 200 && resp.StatusCode < 400: + for _, listener := range hs.listeners { + listener.success(resp.StatusCode, events...) + } + + // TODO(stevvooe): This is a little accepting: we may want to support + // unsupported media type responses with retries using the correct + // media type. There may also be cases that will never work. + + return nil + default: + for _, listener := range hs.listeners { + listener.failure(resp.StatusCode, events...) + } + return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status) + } +} + +// Close the endpoint +func (hs *httpSink) Close() error { + hs.mu.Lock() + defer hs.mu.Unlock() + + if hs.closed { + return fmt.Errorf("httpsink: already closed") + } + + hs.closed = true + return nil +} + +func (hs *httpSink) String() string { + return fmt.Sprintf("httpSink{%s}", hs.url) +} + +type headerRoundTripper struct { + *http.Transport // must be transport to support CancelRequest + headers http.Header +} + +func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + var nreq http.Request + nreq = *req + nreq.Header = make(http.Header) + + merge := func(headers http.Header) { + for k, v := range headers { + nreq.Header[k] = append(nreq.Header[k], v...) + } + } + + merge(req.Header) + merge(hrt.headers) + + return hrt.Transport.RoundTrip(&nreq) +} diff --git a/storage/notifications/http_test.go b/storage/notifications/http_test.go new file mode 100644 index 00000000..c2cfbc02 --- /dev/null +++ b/storage/notifications/http_test.go @@ -0,0 +1,155 @@ +package notifications + +import ( + "encoding/json" + "fmt" + "mime" + "net/http" + "net/http/httptest" + "reflect" + "strconv" + "testing" +) + +// TestHTTPSink mocks out an http endpoint and notifies it under a couple of +// conditions, ensuring correct behavior. +func TestHTTPSink(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + t.Fatalf("unexpected request method: %v", r.Method) + return + } + + // Extract the content type and make sure it matches + contentType := r.Header.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(contentType) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType) + return + } + + if mediaType != EventsMediaType { + w.WriteHeader(http.StatusUnsupportedMediaType) + t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType) + return + } + + var envelope Envelope + dec := json.NewDecoder(r.Body) + if err := dec.Decode(&envelope); err != nil { + w.WriteHeader(http.StatusBadRequest) + t.Fatalf("error decoding request body: %v", err) + return + } + + // Let caller choose the status + status, err := strconv.Atoi(r.FormValue("status")) + if err != nil { + t.Logf("error parsing status: %v", err) + + // May just be empty, set status to 200 + status = http.StatusOK + } + + w.WriteHeader(status) + })) + + metrics := newSafeMetrics() + sink := newHTTPSink(server.URL, 0, nil, + &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) + + var expectedMetrics EndpointMetrics + expectedMetrics.Statuses = make(map[string]int) + + for _, tc := range []struct { + events []Event // events to send + url string + failure bool // true if there should be a failure. + statusCode int // if not set, no status code should be incremented. + }{ + { + statusCode: http.StatusOK, + events: []Event{ + createTestEvent("push", "library/test", "manifest")}, + }, + { + statusCode: http.StatusOK, + events: []Event{ + createTestEvent("push", "library/test", "manifest"), + createTestEvent("push", "library/test", "layer"), + createTestEvent("push", "library/test", "layer"), + }, + }, + { + statusCode: http.StatusTemporaryRedirect, + }, + { + statusCode: http.StatusBadRequest, + failure: true, + }, + { + // Case where connection never goes through. + url: "http://shoudlntresolve/", + failure: true, + }, + } { + + if tc.failure { + expectedMetrics.Failures += len(tc.events) + } else { + expectedMetrics.Successes += len(tc.events) + } + + if tc.statusCode > 0 { + expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events) + } + + url := tc.url + if url == "" { + url = server.URL + "/" + } + // setup endpoint to respond with expected status code. + url += fmt.Sprintf("?status=%v", tc.statusCode) + sink.url = url + + t.Logf("testcase: %v, fail=%v", url, tc.failure) + // Try a simple event emission. + err := sink.Write(tc.events...) + + if !tc.failure { + if err != nil { + t.Fatalf("unexpected error send event: %v", err) + } + } else { + if err == nil { + t.Fatalf("the endpoint should have rejected the request") + } + } + + if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) { + t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics) + } + } + + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing http sink: %v", err) + } + + // double close returns error + if err := sink.Close(); err == nil { + t.Fatalf("second close should have returned error: %v", err) + } + +} + +func createTestEvent(action, repo, typ string) Event { + event := createEvent(action) + + event.Target.Type = typ + event.Target.Name = repo + + return *event +} diff --git a/storage/notifications/listener.go b/storage/notifications/listener.go new file mode 100644 index 00000000..2d7bb112 --- /dev/null +++ b/storage/notifications/listener.go @@ -0,0 +1,140 @@ +package notifications + +import ( + "github.com/Sirupsen/logrus" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/storage" +) + +// ManifestListener describes a set of methods for listening to events related to manifests. +type ManifestListener interface { + ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error + ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error + + // TODO(stevvooe): Please note that delete support is still a little shaky + // and we'll need to propagate these in the future. + + ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error +} + +// LayerListener describes a listener that can respond to layer related events. +type LayerListener interface { + LayerPushed(repo storage.Repository, layer storage.Layer) error + LayerPulled(repo storage.Repository, layer storage.Layer) error + + // TODO(stevvooe): Please note that delete support is still a little shaky + // and we'll need to propagate these in the future. + + LayerDeleted(repo storage.Repository, layer storage.Layer) error +} + +// Listener combines all repository events into a single interface. +type Listener interface { + ManifestListener + LayerListener +} + +type repositoryListener struct { + storage.Repository + listener Listener +} + +// Listen dispatches events on the repository to the listener. +func Listen(repo storage.Repository, listener Listener) storage.Repository { + return &repositoryListener{ + Repository: repo, + listener: listener, + } +} + +func (rl *repositoryListener) Manifests() storage.ManifestService { + return &manifestServiceListener{ + ManifestService: rl.Repository.Manifests(), + parent: rl, + } +} + +func (rl *repositoryListener) Layers() storage.LayerService { + return &layerServiceListener{ + LayerService: rl.Repository.Layers(), + parent: rl, + } +} + +type manifestServiceListener struct { + storage.ManifestService + parent *repositoryListener +} + +func (msl *manifestServiceListener) Get(tag string) (*manifest.SignedManifest, error) { + sm, err := msl.ManifestService.Get(tag) + if err == nil { + if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { + logrus.Errorf("error dispatching manifest pull to listener: %v", err) + } + } + + return sm, err +} + +func (msl *manifestServiceListener) Put(tag string, sm *manifest.SignedManifest) error { + err := msl.ManifestService.Put(tag, sm) + + if err == nil { + if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil { + logrus.Errorf("error dispatching manifest push to listener: %v", err) + } + } + + return err +} + +type layerServiceListener struct { + storage.LayerService + parent *repositoryListener +} + +func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (storage.Layer, error) { + layer, err := lsl.LayerService.Fetch(dgst) + if err == nil { + if err := lsl.parent.listener.LayerPulled(lsl.parent.Repository, layer); err != nil { + logrus.Errorf("error dispatching layer pull to listener: %v", err) + } + } + + return layer, err +} + +func (lsl *layerServiceListener) Upload() (storage.LayerUpload, error) { + lu, err := lsl.LayerService.Upload() + return lsl.decorateUpload(lu), err +} + +func (lsl *layerServiceListener) Resume(uuid string) (storage.LayerUpload, error) { + lu, err := lsl.LayerService.Resume(uuid) + return lsl.decorateUpload(lu), err +} + +func (lsl *layerServiceListener) decorateUpload(lu storage.LayerUpload) storage.LayerUpload { + return &layerUploadListener{ + LayerUpload: lu, + parent: lsl, + } +} + +type layerUploadListener struct { + storage.LayerUpload + parent *layerServiceListener +} + +func (lul *layerUploadListener) Finish(dgst digest.Digest) (storage.Layer, error) { + layer, err := lul.LayerUpload.Finish(dgst) + if err == nil { + if err := lul.parent.parent.listener.LayerPushed(lul.parent.parent.Repository, layer); err != nil { + logrus.Errorf("error dispatching layer push to listener: %v", err) + } + } + + return layer, err +} diff --git a/storage/decorator/decorator_test.go b/storage/notifications/listener_test.go similarity index 52% rename from storage/decorator/decorator_test.go rename to storage/notifications/listener_test.go index 213cf755..17af8b15 100644 --- a/storage/decorator/decorator_test.go +++ b/storage/notifications/listener_test.go @@ -1,79 +1,92 @@ -package decorator +package notifications import ( "io" + "reflect" "testing" - "github.com/docker/libtrust" - "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" "github.com/docker/distribution/storage" "github.com/docker/distribution/storagedriver/inmemory" "github.com/docker/distribution/testutil" + "github.com/docker/libtrust" ) -func TestRegistryDecorator(t *testing.T) { - // Initialize the expected decorations. Call counting is a horrible way to - // test this but should keep this code from being atrocious. - expected := map[string]int{ - "repository": 1, - "manifestservice": 1, - "layerservice": 1, - "layer": 4, - "layerupload": 4, - } - decorated := map[string]int{} - - decorator := Func(func(v interface{}) interface{} { - switch v := v.(type) { - case storage.Repository: - t.Logf("decorate repository: %T", v) - decorated["repository"]++ - case storage.ManifestService: - t.Logf("decorate manifestservice: %T", v) - decorated["manifestservice"]++ - case storage.LayerService: - t.Logf("decorate layerservice: %T", v) - decorated["layerservice"]++ - case storage.Layer: - t.Logf("decorate layer: %T", v) - decorated["layer"]++ - case storage.LayerUpload: - t.Logf("decorate layerupload: %T", v) - decorated["layerupload"]++ - default: - t.Fatalf("unexpected object decorated: %v", v) - } - - return v - }) - +func TestListener(t *testing.T) { registry := storage.NewRegistryWithDriver(inmemory.New()) - registry = DecorateRegistry(registry, decorator) + tl := &testListener{ + ops: make(map[string]int), + } + repository := Listen(registry.Repository("foo/bar"), tl) // Now take the registry through a number of operations - checkExerciseRegistry(t, registry) + checkExerciseRepository(t, repository) - for component, calls := range expected { - if decorated[component] != calls { - t.Fatalf("%v was not decorated expected number of times: %d != %d", component, decorated[component], calls) - } + expectedOps := map[string]int{ + "manifest:push": 1, + "manifest:pull": 1, + // "manifest:delete": 0, // deletes not supported for now + "layer:push": 2, + "layer:pull": 2, + // "layer:delete": 0, // deletes not supported for now + } + + if !reflect.DeepEqual(tl.ops, expectedOps) { + t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps) } } +type testListener struct { + ops map[string]int +} + +func (tl *testListener) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:push"]++ + + return nil +} + +func (tl *testListener) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:pull"]++ + return nil +} + +func (tl *testListener) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:delete"]++ + return nil +} + +func (tl *testListener) LayerPushed(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:push"]++ + return nil +} + +func (tl *testListener) LayerPulled(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:pull"]++ + return nil +} + +func (tl *testListener) LayerDeleted(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:delete"]++ + return nil +} + // checkExerciseRegistry takes the registry through all of its operations, // carrying out generic checks. -func checkExerciseRegistry(t *testing.T, registry storage.Registry) { - name := "foo/bar" +func checkExerciseRepository(t *testing.T, repository storage.Repository) { + // TODO(stevvooe): This would be a nice testutil function. Basically, it + // takes the registry through a common set of operations. This could be + // used to make cross-cutting updates by changing internals that affect + // update counts. Basically, it would make writing tests a lot easier. + tag := "thetag" - repository := registry.Repository(name) m := manifest.Manifest{ Versioned: manifest.Versioned{ SchemaVersion: 1, }, - Name: name, + Name: repository.Name(), Tag: tag, } diff --git a/storage/notifications/metrics.go b/storage/notifications/metrics.go new file mode 100644 index 00000000..2a8ffcbd --- /dev/null +++ b/storage/notifications/metrics.go @@ -0,0 +1,152 @@ +package notifications + +import ( + "expvar" + "fmt" + "net/http" + "sync" +) + +// EndpointMetrics track various actions taken by the endpoint, typically by +// number of events. The goal of this to export it via expvar but we may find +// some other future solution to be better. +type EndpointMetrics struct { + Pending int // events pending in queue + Events int // total events incoming + Successes int // total events written successfully + Failures int // total events failed + Errors int // total events errored + Statuses map[string]int // status code histogram, per call event +} + +// safeMetrics guards the metrics implementation with a lock and provides a +// safe update function. +type safeMetrics struct { + EndpointMetrics + sync.Mutex // protects statuses map +} + +// newSafeMetrics returns safeMetrics with map allocated. +func newSafeMetrics() *safeMetrics { + var sm safeMetrics + sm.Statuses = make(map[string]int) + return &sm +} + +// httpStatusListener returns the listener for the http sink that updates the +// relevent counters. +func (sm *safeMetrics) httpStatusListener() httpStatusListener { + return &endpointMetricsHTTPStatusListener{ + safeMetrics: sm, + } +} + +// eventQueueListener returns a listener that maintains queue related counters. +func (sm *safeMetrics) eventQueueListener() eventQueueListener { + return &endpointMetricsEventQueueListener{ + safeMetrics: sm, + } +} + +// endpointMetricsHTTPStatusListener increments counters related to http sinks +// for the relevent events. +type endpointMetricsHTTPStatusListener struct { + *safeMetrics +} + +var _ httpStatusListener = &endpointMetricsHTTPStatusListener{} + +func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) + emsl.Successes += len(events) +} + +func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) + emsl.Failures += len(events) +} + +func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Errors += len(events) +} + +// endpointMetricsEventQueueListener maintains the incoming events counter and +// the queues pending count. +type endpointMetricsEventQueueListener struct { + *safeMetrics +} + +func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { + eqc.Lock() + defer eqc.Unlock() + eqc.Events += len(events) + eqc.Pending += len(events) +} + +func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { + eqc.Lock() + defer eqc.Unlock() + eqc.Pending -= len(events) +} + +// endpoints is global registry of endpoints used to report metrics to expvar +var endpoints struct { + registered []*Endpoint + mu sync.Mutex +} + +// register places the endpoint into expvar so that stats are tracked. +func register(e *Endpoint) { + endpoints.mu.Lock() + defer endpoints.mu.Unlock() + + endpoints.registered = append(endpoints.registered, e) +} + +func init() { + // NOTE(stevvooe): Setup registry metrics structure to report to expvar. + // Ideally, we do more metrics through logging but we need some nice + // realtime metrics for queue state for now. + + registry := expvar.Get("registry") + + if registry == nil { + registry = expvar.NewMap("registry") + } + + var notifications expvar.Map + notifications.Init() + notifications.Set("endpoints", expvar.Func(func() interface{} { + endpoints.mu.Lock() + defer endpoints.mu.Unlock() + + var names []interface{} + for _, v := range endpoints.registered { + var epjson struct { + Name string `json:"name"` + URL string `json:"url"` + EndpointConfig + + Metrics EndpointMetrics + } + + epjson.Name = v.Name() + epjson.URL = v.URL() + epjson.EndpointConfig = v.EndpointConfig + + v.ReadMetrics(&epjson.Metrics) + + names = append(names, epjson) + } + + return names + })) + + registry.(*expvar.Map).Set("notifications", ¬ifications) +} diff --git a/storage/notifications/sinks.go b/storage/notifications/sinks.go new file mode 100644 index 00000000..2bf63e2d --- /dev/null +++ b/storage/notifications/sinks.go @@ -0,0 +1,337 @@ +package notifications + +import ( + "container/list" + "fmt" + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +// NOTE(stevvooe): This file contains definitions for several utility sinks. +// Typically, the broadcaster is the only sink that should be required +// externally, but others are suitable for export if the need arises. Albeit, +// the tight integration with endpoint metrics should be removed. + +// Broadcaster sends events to multiple, reliable Sinks. The goal of this +// component is to dispatch events to configured endpoints. Reliability can be +// provided by wrapping incoming sinks. +type Broadcaster struct { + sinks []Sink + events chan []Event + closed chan chan struct{} +} + +// NewBroadcaster ... +// Add appends one or more sinks to the list of sinks. The broadcaster +// behavior will be affected by the properties of the sink. Generally, the +// sink should accept all messages and deal with reliability on its own. Use +// of EventQueue and RetryingSink should be used here. +func NewBroadcaster(sinks ...Sink) *Broadcaster { + b := Broadcaster{ + sinks: sinks, + events: make(chan []Event), + closed: make(chan chan struct{}), + } + + // Start the broadcaster + go b.run() + + return &b +} + +// Write accepts a block of events to be dispatched to all sinks. This method +// will never fail and should never block (hopefully!). The caller cedes the +// slice memory to the broadcaster and should not modify it after calling +// write. +func (b *Broadcaster) Write(events ...Event) error { + select { + case b.events <- events: + case <-b.closed: + return ErrSinkClosed + } + return nil +} + +// Close the broadcaster, ensuring that all messages are flushed to the +// underlying sink before returning. +func (b *Broadcaster) Close() error { + logrus.Infof("broadcaster: closing") + select { + case <-b.closed: + // already closed + return fmt.Errorf("broadcaster: already closed") + default: + // do a little chan handoff dance to synchronize closing + closed := make(chan struct{}) + b.closed <- closed + close(b.closed) + <-closed + return nil + } +} + +// run is the main broadcast loop, started when the broadcaster is created. +// Under normal conditions, it waits for events on the event channel. After +// Close is called, this goroutine will exit. +func (b *Broadcaster) run() { + for { + select { + case block := <-b.events: + for _, sink := range b.sinks { + if err := sink.Write(block...); err != nil { + logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err) + } + } + case closing := <-b.closed: + + // close all the underlying sinks + for _, sink := range b.sinks { + if err := sink.Close(); err != nil { + logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err) + } + } + closing <- struct{}{} + + logrus.Debugf("broadcaster: closed") + return + } + } +} + +// eventQueue accepts all messages into a queue for asynchronous consumption +// by a sink. It is unbounded and thread safe but the sink must be reliable or +// events will be dropped. +type eventQueue struct { + sink Sink + events *list.List + listeners []eventQueueListener + cond *sync.Cond + mu sync.Mutex + closed bool +} + +// eventQueueListener is called when various events happen on the queue. +type eventQueueListener interface { + ingress(events ...Event) + egress(events ...Event) +} + +// newEventQueue returns a queue to the provided sink. If the updater is non- +// nil, it will be called to update pending metrics on ingress and egress. +func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue { + eq := eventQueue{ + sink: sink, + events: list.New(), + listeners: listeners, + } + + eq.cond = sync.NewCond(&eq.mu) + go eq.run() + return &eq +} + +// Write accepts the events into the queue, only failing if the queue has +// beend closed. +func (eq *eventQueue) Write(events ...Event) error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return ErrSinkClosed + } + + for _, listener := range eq.listeners { + listener.ingress(events...) + } + eq.events.PushBack(events) + eq.cond.Signal() // signal waiters + + return nil +} + +// Close shutsdown the event queue, flushing +func (eq *eventQueue) Close() error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return fmt.Errorf("eventqueue: already closed") + } + + // set closed flag + eq.closed = true + eq.cond.Signal() // signal flushes queue + eq.cond.Wait() // wait for signal from last flush + + return eq.sink.Close() +} + +// run is the main goroutine to flush events to the target sink. +func (eq *eventQueue) run() { + for { + block := eq.next() + + if block == nil { + return // nil block means event queue is closed. + } + + if err := eq.sink.Write(block...); err != nil { + logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err) + } + + for _, listener := range eq.listeners { + listener.egress(block...) + } + } +} + +// next encompasses the critical section of the run loop. When the queue is +// empty, it will block on the condition. If new data arrives, it will wake +// and return a block. When closed, a nil slice will be returned. +func (eq *eventQueue) next() []Event { + eq.mu.Lock() + defer eq.mu.Unlock() + + for eq.events.Len() < 1 { + if eq.closed { + eq.cond.Broadcast() + return nil + } + + eq.cond.Wait() + } + + front := eq.events.Front() + block := front.Value.([]Event) + eq.events.Remove(front) + + return block +} + +// retryingSink retries the write until success or an ErrSinkClosed is +// returned. Underlying sink must have p > 0 of succeeding or the sink will +// block. Internally, it is a circuit breaker retries to manage reset. +// Concurrent calls to a retrying sink are serialized through the sink, +// meaning that if one is in-flight, another will not proceed. +type retryingSink struct { + mu sync.Mutex + sink Sink + closed bool + + // circuit breaker hueristics + failures struct { + threshold int + recent int + last time.Time + backoff time.Duration // time after which we retry after failure. + } +} + +type retryingSinkListener interface { + active(events ...Event) + retry(events ...Event) +} + +// TODO(stevvooe): We are using circuit break here, which actually doesn't +// make a whole lot of sense for this use case, since we always retry. Move +// this to use bounded exponential backoff. + +// newRetryingSink returns a sink that will retry writes to a sink, backing +// off on failure. Parameters threshold and backoff adjust the behavior of the +// circuit breaker. +func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink { + rs := &retryingSink{ + sink: sink, + } + rs.failures.threshold = threshold + rs.failures.backoff = backoff + + return rs +} + +// Write attempts to flush the events to the downstream sink until it succeeds +// or the sink is closed. +func (rs *retryingSink) Write(events ...Event) error { + rs.mu.Lock() + defer rs.mu.Unlock() + +retry: + + if rs.closed { + return ErrSinkClosed + } + + if !rs.proceed() { + logrus.Warnf("%v encountered too many errors, backing off", rs.sink) + rs.wait(rs.failures.backoff) + goto retry + } + + if err := rs.write(events...); err != nil { + if err == ErrSinkClosed { + // terminal! + return err + } + + logrus.Errorf("retryingsink: error writing events: %v, retrying", err) + goto retry + } + + return nil +} + +// Close closes the sink and the underlying sink. +func (rs *retryingSink) Close() error { + rs.mu.Lock() + defer rs.mu.Unlock() + + if rs.closed { + return fmt.Errorf("retryingsink: already closed") + } + + rs.closed = true + return rs.sink.Close() +} + +// write provides a helper that dispatches failure and success properly. Used +// by write as the single-flight write call. +func (rs *retryingSink) write(events ...Event) error { + if err := rs.sink.Write(events...); err != nil { + rs.failure() + return err + } + + rs.reset() + return nil +} + +// wait backoff time against the sink, unlocking so others can proceed. Should +// only be called by methods that currently have the mutex. +func (rs *retryingSink) wait(backoff time.Duration) { + rs.mu.Unlock() + defer rs.mu.Lock() + + // backoff here + time.Sleep(backoff) +} + +// reset marks a succesful call. +func (rs *retryingSink) reset() { + rs.failures.recent = 0 + rs.failures.last = time.Time{} +} + +// failure records a failure. +func (rs *retryingSink) failure() { + rs.failures.recent++ + rs.failures.last = time.Now().UTC() +} + +// proceed returns true if the call should proceed based on circuit breaker +// hueristics. +func (rs *retryingSink) proceed() bool { + return rs.failures.recent < rs.failures.threshold || + time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff)) +} diff --git a/storage/notifications/sinks_test.go b/storage/notifications/sinks_test.go new file mode 100644 index 00000000..89756a99 --- /dev/null +++ b/storage/notifications/sinks_test.go @@ -0,0 +1,223 @@ +package notifications + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/Sirupsen/logrus" + + "testing" +) + +func TestBroadcaster(t *testing.T) { + const nEvents = 1000 + var sinks []Sink + + for i := 0; i < 10; i++ { + sinks = append(sinks, &testSink{}) + } + + b := NewBroadcaster(sinks...) + + var block []Event + var wg sync.WaitGroup + for i := 1; i <= nEvents; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + if err := b.Write(block...); err != nil { + t.Fatalf("error writing block of length %d: %v", len(block), err) + } + wg.Done() + }(block...) + + block = nil + } + } + + wg.Wait() // Wait until writes complete + checkClose(t, b) + + // Iterate through the sinks and check that they all have the expected length. + for _, sink := range sinks { + ts := sink.(*testSink) + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != nEvents { + t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + } + +} + +func TestEventQueue(t *testing.T) { + const nevents = 1000 + var ts testSink + metrics := newSafeMetrics() + eq := newEventQueue( + // delayed sync simulates destination slower than channel comms + &delayedSink{ + Sink: &ts, + delay: time.Millisecond * 1, + }, metrics.eventQueueListener()) + + var wg sync.WaitGroup + var block []Event + for i := 1; i <= nevents; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + if err := eq.Write(block...); err != nil { + t.Fatalf("error writing event block: %v", err) + } + wg.Done() + }(block...) + + block = nil + } + } + + wg.Wait() + checkClose(t, eq) + + ts.mu.Lock() + defer ts.mu.Unlock() + metrics.Lock() + defer metrics.Unlock() + + if len(ts.events) != nevents { + t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + + if metrics.Events != nevents { + t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents) + } + + if metrics.Pending != 0 { + t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0) + } +} + +func TestRetryingSink(t *testing.T) { + + // Make a sync that fails most of the time, ensuring that all the events + // make it through. + var ts testSink + flaky := &flakySink{ + rate: 1.0, // start out always failing. + Sink: &ts, + } + s := newRetryingSink(flaky, 3, 10*time.Millisecond) + + var wg sync.WaitGroup + var block []Event + for i := 1; i <= 100; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + + // Above 50, set the failure rate lower + if i > 50 { + s.mu.Lock() + flaky.rate = 0.90 + s.mu.Unlock() + } + + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + defer wg.Done() + if err := s.Write(block...); err != nil { + t.Fatalf("error writing event block: %v", err) + } + }(block...) + + block = nil + } + } + + wg.Wait() + checkClose(t, s) + + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != 100 { + t.Fatalf("events not propagated: %d != %d", len(ts.events), 100) + } +} + +type testSink struct { + events []Event + mu sync.Mutex + closed bool +} + +func (ts *testSink) Write(events ...Event) error { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.events = append(ts.events, events...) + return nil +} + +func (ts *testSink) Close() error { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.closed = true + + logrus.Infof("closing testSink") + return nil +} + +type delayedSink struct { + Sink + delay time.Duration +} + +func (ds *delayedSink) Write(events ...Event) error { + time.Sleep(ds.delay) + return ds.Sink.Write(events...) +} + +type flakySink struct { + Sink + rate float64 +} + +func (fs *flakySink) Write(events ...Event) error { + if rand.Float64() < fs.rate { + return fmt.Errorf("error writing %d events", len(events)) + } + + return fs.Sink.Write(events...) +} + +func checkClose(t *testing.T, sink Sink) { + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + // second close should not crash but should return an error. + if err := sink.Close(); err == nil { + t.Fatalf("no error on double close") + } + + // Write after closed should be an error + if err := sink.Write([]Event{}...); err == nil { + t.Fatalf("write after closed did not have an error") + } else if err != ErrSinkClosed { + t.Fatalf("error should be ErrSinkClosed") + } +} diff --git a/storage/revisionstore.go b/storage/revisionstore.go index a88ca8c7..b3ecd711 100644 --- a/storage/revisionstore.go +++ b/storage/revisionstore.go @@ -79,13 +79,8 @@ func (rs *revisionStore) get(revision digest.Digest) (*manifest.SignedManifest, // put stores the manifest in the repository, if not already present. Any // updated signatures will be stored, as well. func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error) { - jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures") - if err != nil { - return "", err - } - // Resolve the payload in the manifest. - payload, err := jsig.Payload() + payload, err := sm.Payload() if err != nil { return "", err } @@ -103,7 +98,7 @@ func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error) } // Grab each json signature and store them. - signatures, err := jsig.Signatures() + signatures, err := sm.Signatures() if err != nil { return "", err }