Make middleware survive a restart (#142)

Make middleware that sets up a (http) handler survive a graceful
restart. We calls the middleware's Shutdown function(s). If restart
fails the Start function is called again.

* middleware/health: OK
* middleware/pprof: OK
* middleware/metrics: OK

All restart OK.
This commit is contained in:
Miek Gieben 2016-04-29 07:28:35 +01:00
parent a1478f891d
commit 9e9d72655d
10 changed files with 132 additions and 54 deletions

View file

@ -117,6 +117,9 @@ func Restart(newCorefile Input) error {
} }
wpipe.Close() wpipe.Close()
// Run all shutdown functions for the middleware, if child start fails, restart them all...
executeShutdownCallbacks("SIGUSR1")
// Determine whether child startup succeeded // Determine whether child startup succeeded
answer, readErr := ioutil.ReadAll(sigrpipe) answer, readErr := ioutil.ReadAll(sigrpipe)
if answer == nil || len(answer) == 0 { if answer == nil || len(answer) == 0 {
@ -125,6 +128,9 @@ func Restart(newCorefile Input) error {
if readErr != nil { if readErr != nil {
log.Printf("[ERROR] Restart: additionally, error communicating with child process: %v", readErr) log.Printf("[ERROR] Restart: additionally, error communicating with child process: %v", readErr)
} }
// re-call all startup functions.
// TODO(miek): this needs to be tested, somehow.
executeStartupCallbacks("SIGUSR1")
return errIncompleteRestart return errIncompleteRestart
} }

View file

@ -11,8 +11,9 @@ func Health(c *Controller) (middleware.Middleware, error) {
return nil, err return nil, err
} }
h := health.Health{Addr: addr} h := &health.Health{Addr: addr}
c.Startup = append(c.Startup, h.ListenAndServe) c.Startup = append(c.Startup, h.Start)
c.Shutdown = append(c.Shutdown, h.Shutdown)
return nil, nil return nil, nil
} }

View file

@ -12,18 +12,19 @@ const addr = "localhost:9153"
var metricsOnce sync.Once var metricsOnce sync.Once
func Prometheus(c *Controller) (middleware.Middleware, error) { func Prometheus(c *Controller) (middleware.Middleware, error) {
met, err := parsePrometheus(c) m, err := parsePrometheus(c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
metricsOnce.Do(func() { metricsOnce.Do(func() {
c.Startup = append(c.Startup, met.Start) c.Startup = append(c.Startup, m.Start)
c.Shutdown = append(c.Shutdown, m.Shutdown)
}) })
return func(next middleware.Handler) middleware.Handler { return func(next middleware.Handler) middleware.Handler {
met.Next = next m.Next = next
return met return m
}, nil }, nil
} }

View file

@ -27,6 +27,7 @@ func PProf(c *Controller) (middleware.Middleware, error) {
handler := &pprof.Handler{} handler := &pprof.Handler{}
pprofOnce.Do(func() { pprofOnce.Do(func() {
c.Startup = append(c.Startup, handler.Start) c.Startup = append(c.Startup, handler.Start)
c.Shutdown = append(c.Shutdown, handler.Shutdown)
}) })
return func(next middleware.Handler) middleware.Handler { return func(next middleware.Handler) middleware.Handler {

View file

@ -68,4 +68,26 @@ func executeShutdownCallbacks(signame string) (exitCode int) {
return return
} }
var shutdownCallbacksOnce sync.Once // executeStartupCallbacks executes the startup callbacks as initiated
// by signame. This is used when on restart when the child failed to start and
// all middleware executed their shutdown functions
func executeStartupCallbacks(signame string) (exitCode int) {
startupCallbacksOnce.Do(func() {
serversMu.Lock()
errs := server.StartupCallbacks(servers)
serversMu.Unlock()
if len(errs) > 0 {
for _, err := range errs {
log.Printf("[ERROR] %s shutdown: %v", signame, err)
}
exitCode = 1
}
})
return
}
var (
shutdownCallbacksOnce sync.Once
startupCallbacksOnce sync.Once
)

View file

@ -3,6 +3,7 @@ package health
import ( import (
"io" "io"
"log" "log"
"net"
"net/http" "net/http"
"sync" "sync"
) )
@ -11,28 +12,45 @@ var once sync.Once
type Health struct { type Health struct {
Addr string Addr string
ln net.Listener
mux *http.ServeMux
} }
func health(w http.ResponseWriter, r *http.Request) { func health(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, ok) io.WriteString(w, ok)
} }
func (h Health) ListenAndServe() error { func (h *Health) Start() error {
if h.Addr == "" { if h.Addr == "" {
h.Addr = defAddr h.Addr = defAddr
} }
once.Do(func() { once.Do(func() {
http.HandleFunc("/health", health) if ln, err := net.Listen("tcp", h.Addr); err != nil {
log.Printf("[ERROR] Failed to start health handler: %s", err)
return
} else {
h.ln = ln
}
h.mux = http.NewServeMux()
h.mux.HandleFunc(path, health)
go func() { go func() {
if err := http.ListenAndServe(h.Addr, nil); err != nil { http.Serve(h.ln, h.mux)
log.Printf("[ERROR] Failed to start health handler: %s", err)
}
}() }()
}) })
return nil return nil
} }
func (h *Health) Shutdown() error {
if h.ln != nil {
return h.ln.Close()
}
return nil
}
const ( const (
ok = "OK" ok = "OK"
defAddr = ":8080" defAddr = ":8080"
path = "/health"
) )

View file

@ -2,6 +2,7 @@ package metrics
import ( import (
"log" "log"
"net"
"net/http" "net/http"
"sync" "sync"
@ -17,12 +18,12 @@ var (
responseRcode *prometheus.CounterVec responseRcode *prometheus.CounterVec
) )
const path = "/metrics"
// Metrics holds the prometheus configuration. The metrics' path is fixed to be /metrics // Metrics holds the prometheus configuration. The metrics' path is fixed to be /metrics
type Metrics struct { type Metrics struct {
Next middleware.Handler Next middleware.Handler
Addr string // where to we listen Addr string
ln net.Listener
mux *http.ServeMux
Once sync.Once Once sync.Once
ZoneNames []string ZoneNames []string
} }
@ -31,21 +32,35 @@ func (m *Metrics) Start() error {
m.Once.Do(func() { m.Once.Do(func() {
define() define()
if ln, err := net.Listen("tcp", m.Addr); err != nil {
log.Printf("[ERROR] Failed to start metrics handler: %s", err)
return
} else {
m.ln = ln
}
m.mux = http.NewServeMux()
prometheus.MustRegister(requestCount) prometheus.MustRegister(requestCount)
prometheus.MustRegister(requestDuration) prometheus.MustRegister(requestDuration)
prometheus.MustRegister(responseSize) prometheus.MustRegister(responseSize)
prometheus.MustRegister(responseRcode) prometheus.MustRegister(responseRcode)
http.Handle(path, prometheus.Handler()) m.mux.Handle(path, prometheus.Handler())
go func() { go func() {
if err := http.ListenAndServe(m.Addr, nil); err != nil { http.Serve(m.ln, m.mux)
log.Printf("[ERROR] Failed to start prometheus handler: %s", err)
}
}() }()
}) })
return nil return nil
} }
func (m *Metrics) Shutdown() error {
if m.ln != nil {
return m.ln.Close()
}
return nil
}
func define() { func define() {
requestCount = prometheus.NewCounterVec(prometheus.CounterOpts{ requestCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: middleware.Namespace, Namespace: middleware.Namespace,
@ -80,7 +95,7 @@ func define() {
const ( const (
// Dropped indicates we dropped the query before any handling. It has no closing dot, so it can not be a valid zone. // Dropped indicates we dropped the query before any handling. It has no closing dot, so it can not be a valid zone.
Dropped = "dropped" Dropped = "dropped"
subsystem = "dns" subsystem = "dns"
path = "/metrics"
) )

View file

@ -1,7 +1,7 @@
# pprof # pprof
pprof publishes runtime profiling data at endpoints under /debug/pprof. You can visit /debug/pprof pprof publishes runtime profiling data at endpoints under /debug/pprof. You can visit /debug/pprof
on your site for an index of the available endpoints. By default it will listen on localhost:8053. on your site for an index of the available endpoints. By default it will listen on localhost:6053.
> This is a debugging tool. Certain requests (such as collecting execution traces) can be slow. If > This is a debugging tool. Certain requests (such as collecting execution traces) can be slow. If
> you use pprof on a live site, consider restricting access or enabling it only temporarily. > you use pprof on a live site, consider restricting access or enabling it only temporarily.

View file

@ -2,8 +2,9 @@ package pprof
import ( import (
"log" "log"
"net"
"net/http" "net/http"
_ "net/http/pprof" pp "net/http/pprof"
"github.com/miekg/coredns/middleware" "github.com/miekg/coredns/middleware"
@ -11,10 +12,10 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
const addr = "localhost:8053"
type Handler struct { type Handler struct {
Next middleware.Handler Next middleware.Handler
ln net.Listener
mux *http.ServeMux
} }
// ServeDNS passes all other requests up the chain. // ServeDNS passes all other requests up the chain.
@ -23,10 +24,34 @@ func (h *Handler) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
} }
func (h *Handler) Start() error { func (h *Handler) Start() error {
if ln, err := net.Listen("tcp", addr); err != nil {
log.Printf("[ERROR] Failed to start pprof handler: %s", err)
return err
} else {
h.ln = ln
}
h.mux = http.NewServeMux()
h.mux.HandleFunc(path+"/", pp.Index)
h.mux.HandleFunc(path+"/cmdline", pp.Cmdline)
h.mux.HandleFunc(path+"/profile", pp.Profile)
h.mux.HandleFunc(path+"/symbol", pp.Symbol)
h.mux.HandleFunc(path+"/trace", pp.Trace)
go func() { go func() {
if err := http.ListenAndServe(addr, nil); err != nil { http.Serve(h.ln, h.mux)
log.Printf("[ERROR] Failed to start pprof handler: %s", err)
}
}() }()
return nil return nil
} }
func (h *Handler) Shutdown() error {
if h.ln != nil {
return h.ln.Close()
}
return nil
}
const (
addr = "localhost:6053"
path = "/debug/pprof"
)

View file

@ -445,32 +445,6 @@ func (s *Server) RunFirstStartupFuncs() error {
return nil return nil
} }
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
// connections. It's used by ListenAndServe and ListenAndServeTLS so
// dead TCP connections (e.g. closing laptop mid-download) eventually
// go away.
//
// Borrowed from the Go standard library.
type tcpKeepAliveListener struct {
*net.TCPListener
}
// Accept accepts the connection with a keep-alive enabled.
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
tc, err := ln.AcceptTCP()
if err != nil {
return
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(3 * time.Minute)
return tc, nil
}
// File implements ListenerFile; returns the underlying file of the listener.
func (ln tcpKeepAliveListener) File() (*os.File, error) {
return ln.TCPListener.File()
}
// ShutdownCallbacks executes all the shutdown callbacks // ShutdownCallbacks executes all the shutdown callbacks
// for all the virtualhosts in servers, and returns all the // for all the virtualhosts in servers, and returns all the
// errors generated during their execution. In other words, // errors generated during their execution. In other words,
@ -493,6 +467,21 @@ func ShutdownCallbacks(servers []*Server) []error {
return errs return errs
} }
func StartupCallbacks(servers []*Server) []error {
var errs []error
for _, s := range servers {
for _, zone := range s.zones {
for _, startupFunc := range zone.config.Startup {
err := startupFunc()
if err != nil {
errs = append(errs, err)
}
}
}
}
return errs
}
func RcodeNoClientWrite(rcode int) bool { func RcodeNoClientWrite(rcode int) bool {
switch rcode { switch rcode {
case dns.RcodeServerFailure: case dns.RcodeServerFailure: