middleware/proxy: make it scale (#287)

* middleware/proxy

Use connection pooling for communicating with an upstream, instead of
opening a new socket every time.

This makes the proxy more efficient and allowed for some cleanups.

* Some cleanups

* Some fixes

* more

* Kill pool

* Add nil check

* remove pool
This commit is contained in:
Miek Gieben 2016-10-08 14:46:22 +01:00 committed by GitHub
parent f29f622ec7
commit a05901f62a
9 changed files with 159 additions and 388 deletions

View file

@ -1,3 +1,4 @@
// Package cache implements a cache.
package cache package cache
import ( import (

View file

@ -0,0 +1,97 @@
package proxy
import (
"net"
"time"
"github.com/miekg/coredns/middleware/pkg/singleflight"
"github.com/miekg/coredns/request"
"github.com/miekg/dns"
)
type Client struct {
Timeout time.Duration
group *singleflight.Group
}
func NewClient() *Client {
return &Client{Timeout: defaultTimeout, group: new(singleflight.Group)}
}
// ServeDNS does not satisfy middleware.Handler, instead it interacts with the upstream
// and returns the respons or an error.
func (c *Client) ServeDNS(w dns.ResponseWriter, r *dns.Msg, u *UpstreamHost) (*dns.Msg, error) {
co, err := net.DialTimeout(request.Proto(w), u.Name, c.Timeout)
if err != nil {
return nil, err
}
reply, _, err := c.Exchange(r, co)
co.Close()
if reply != nil && reply.Truncated {
// Suppress proxy error for truncated responses
err = nil
}
if err != nil {
return nil, err
}
reply.Compress = true
reply.Id = r.Id
return reply, nil
}
func (c *Client) Exchange(m *dns.Msg, co net.Conn) (*dns.Msg, time.Duration, error) {
t := "nop"
if t1, ok := dns.TypeToString[m.Question[0].Qtype]; ok {
t = t1
}
cl := "nop"
if cl1, ok := dns.ClassToString[m.Question[0].Qclass]; ok {
cl = cl1
}
start := time.Now()
// Name needs to be normalized! Bug in go dns.
r, err := c.group.Do(m.Question[0].Name+t+cl, func() (interface{}, error) {
ret, e := c.exchange(m, co)
return ret, e
})
rtt := time.Since(start)
if err != nil {
return &dns.Msg{}, rtt, err
}
r1 := r.(dns.Msg)
return &r1, rtt, nil
}
// exchange does *not* return a pointer to dns.Msg because that leads to buffer reuse when
// group.Do is used in Exchange.
func (c *Client) exchange(m *dns.Msg, co net.Conn) (dns.Msg, error) {
opt := m.IsEdns0()
udpsize := uint16(dns.MinMsgSize)
// If EDNS0 is used use that for size.
if opt != nil && opt.UDPSize() >= dns.MinMsgSize {
udpsize = opt.UDPSize()
}
dnsco := &dns.Conn{Conn: co, UDPSize: udpsize}
dnsco.WriteMsg(m)
r, err := dnsco.ReadMsg()
dnsco.Close()
if r == nil {
return dns.Msg{}, err
}
return *r, err
}

View file

@ -1,7 +1,6 @@
package proxy package proxy
// functions OTHER middleware might want to use to do lookup in the same // functions other middleware might want to use to do lookup in the same style as the proxy.
// style as the proxy.
import ( import (
"sync/atomic" "sync/atomic"
@ -14,7 +13,7 @@ import (
// New create a new proxy with the hosts in host and a Random policy. // New create a new proxy with the hosts in host and a Random policy.
func New(hosts []string) Proxy { func New(hosts []string) Proxy {
p := Proxy{Next: nil, Client: Clients()} p := Proxy{Next: nil, Client: NewClient()}
upstream := &staticUpstream{ upstream := &staticUpstream{
from: "", from: "",
@ -31,7 +30,8 @@ func New(hosts []string) Proxy {
Conns: 0, Conns: 0,
Fails: 0, Fails: 0,
FailTimeout: upstream.FailTimeout, FailTimeout: upstream.FailTimeout,
Unhealthy: false,
Unhealthy: false,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool { return func(uh *UpstreamHost) bool {
if uh.Unhealthy { if uh.Unhealthy {
@ -54,7 +54,6 @@ func New(hosts []string) Proxy {
// Lookup will use name and type to forge a new message and will send that upstream. It will // Lookup will use name and type to forge a new message and will send that upstream. It will
// set any EDNS0 options correctly so that downstream will be able to process the reply. // set any EDNS0 options correctly so that downstream will be able to process the reply.
// Lookup is not suitable for forwarding request. Ssee for that.
func (p Proxy) Lookup(state request.Request, name string, tpe uint16) (*dns.Msg, error) { func (p Proxy) Lookup(state request.Request, name string, tpe uint16) (*dns.Msg, error) {
req := new(dns.Msg) req := new(dns.Msg)
req.SetQuestion(name, tpe) req.SetQuestion(name, tpe)
@ -63,18 +62,13 @@ func (p Proxy) Lookup(state request.Request, name string, tpe uint16) (*dns.Msg,
return p.lookup(state, req) return p.lookup(state, req)
} }
// Forward will forward the request to upstream // Forward forward the request in state as-is. Unlike Lookup that adds EDNS0 suffix to the message.
func (p Proxy) Forward(state request.Request) (*dns.Msg, error) { func (p Proxy) Forward(state request.Request) (*dns.Msg, error) {
return p.lookup(state, state.Req) return p.lookup(state, state.Req)
} }
func (p Proxy) lookup(state request.Request, r *dns.Msg) (*dns.Msg, error) { func (p Proxy) lookup(state request.Request, r *dns.Msg) (*dns.Msg, error) {
var (
reply *dns.Msg
err error
)
for _, upstream := range p.Upstreams { for _, upstream := range p.Upstreams {
// allowed bla bla bla TODO(miek): fix full proxy spec from caddy?
start := time.Now() start := time.Now()
// Since Select() should give us "up" hosts, keep retrying // Since Select() should give us "up" hosts, keep retrying
@ -85,15 +79,16 @@ func (p Proxy) lookup(state request.Request, r *dns.Msg) (*dns.Msg, error) {
return nil, errUnreachable return nil, errUnreachable
} }
// duplicated from proxy.go, but with a twist, we don't write the
// reply back to the client, we return it.
atomic.AddInt64(&host.Conns, 1) atomic.AddInt64(&host.Conns, 1)
if state.Proto() == "tcp" {
reply, _, err = p.Client.TCP.Exchange(r, host.Name) reply, backendErr := p.Client.ServeDNS(state.W, r, host)
} else {
reply, _, err = p.Client.UDP.Exchange(r, host.Name)
}
atomic.AddInt64(&host.Conns, -1) atomic.AddInt64(&host.Conns, -1)
if err == nil { if backendErr == nil {
return reply, nil return reply, nil
} }
timeout := host.FailTimeout timeout := host.FailTimeout

View file

@ -14,19 +14,13 @@ import (
var errUnreachable = errors.New("unreachable backend") var errUnreachable = errors.New("unreachable backend")
// Proxy represents a middleware instance that can proxy requests. // Proxy represents a middleware instance that can proxy requests to another DNS server.
type Proxy struct { type Proxy struct {
Next middleware.Handler Next middleware.Handler
Client Client Client *Client
Upstreams []Upstream Upstreams []Upstream
} }
// Client represents client information that the proxy uses.
type Client struct {
UDP *dns.Client
TCP *dns.Client
}
// Upstream manages a pool of proxy upstream hosts. Select should return a // Upstream manages a pool of proxy upstream hosts. Select should return a
// suitable upstream host, or nil if no such hosts are available. // suitable upstream host, or nil if no such hosts are available.
type Upstream interface { type Upstream interface {
@ -82,12 +76,15 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
if host == nil { if host == nil {
return dns.RcodeServerFailure, errUnreachable return dns.RcodeServerFailure, errUnreachable
} }
reverseproxy := ReverseProxy{Host: host.Name, Client: p.Client, Options: upstream.Options()}
atomic.AddInt64(&host.Conns, 1) atomic.AddInt64(&host.Conns, 1)
backendErr := reverseproxy.ServeDNS(w, r, nil)
reply, backendErr := p.Client.ServeDNS(w, r, host)
atomic.AddInt64(&host.Conns, -1) atomic.AddInt64(&host.Conns, -1)
if backendErr == nil { if backendErr == nil {
w.WriteMsg(reply)
return 0, nil return 0, nil
} }
timeout := host.FailTimeout timeout := host.FailTimeout
@ -105,19 +102,5 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
return p.Next.ServeDNS(ctx, w, r) return p.Next.ServeDNS(ctx, w, r)
} }
// Clients returns the new client for proxy requests. // defaultTimeout is the default networking timeout for DNS requests.
func Clients() Client {
udp := newClient("udp", defaultTimeout)
tcp := newClient("tcp", defaultTimeout)
return Client{UDP: udp, TCP: tcp}
}
// newClient returns a new client for proxy requests.
func newClient(net string, timeout time.Duration) *dns.Client {
if timeout == 0 {
timeout = defaultTimeout
}
return &dns.Client{Net: net, ReadTimeout: timeout, WriteTimeout: timeout, SingleInflight: true}
}
const defaultTimeout = 5 * time.Second const defaultTimeout = 5 * time.Second

View file

@ -1,303 +1,3 @@
package proxy package proxy
// Also test these inputs: /* TODO */
//.:1053 {
//proxy . ::1 2001:4860:4860::8844 8.8.8.8:54 [2001:4860:4860::8845]:53
//}
/*
func init() {
tryDuration = 50 * time.Millisecond // prevent tests from hanging
}
func TestReverseProxy(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stderr)
var requestReceived bool
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestReceived = true
w.Write([]byte("Hello, client"))
}))
defer backend.Close()
// set up proxy
p := &Proxy{
Upstreams: []Upstream{newFakeUpstream(backend.URL, false)},
}
// create request and response recorder
r, err := http.NewRequest("GET", "/", nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
w := httptest.NewRecorder()
p.ServeHTTP(w, r)
if !requestReceived {
t.Error("Expected backend to receive request, but it didn't")
}
}
func TestReverseProxyInsecureSkipVerify(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stderr)
var requestReceived bool
backend := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestReceived = true
w.Write([]byte("Hello, client"))
}))
defer backend.Close()
// set up proxy
p := &Proxy{
Upstreams: []Upstream{newFakeUpstream(backend.URL, true)},
}
// create request and response recorder
r, err := http.NewRequest("GET", "/", nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
w := httptest.NewRecorder()
p.ServeHTTP(w, r)
if !requestReceived {
t.Error("Even with insecure HTTPS, expected backend to receive request, but it didn't")
}
}
func TestWebSocketReverseProxyServeHTTPHandler(t *testing.T) {
// No-op websocket backend simply allows the WS connection to be
// accepted then it will be immediately closed. Perfect for testing.
wsNop := httptest.NewServer(websocket.Handler(func(ws *websocket.Conn) {}))
defer wsNop.Close()
// Get proxy to use for the test
p := newWebSocketTestProxy(wsNop.URL)
// Create client request
r, err := http.NewRequest("GET", "/", nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
r.Header = http.Header{
"Connection": {"Upgrade"},
"Upgrade": {"websocket"},
"Origin": {wsNop.URL},
"Sec-WebSocket-Key": {"x3JJHMbDL1EzLkh9GBhXDw=="},
"Sec-WebSocket-Version": {"13"},
}
// Capture the request
w := &recorderHijacker{httptest.NewRecorder(), new(fakeConn)}
// Booya! Do the test.
p.ServeHTTP(w, r)
// Make sure the backend accepted the WS connection.
// Mostly interested in the Upgrade and Connection response headers
// and the 101 status code.
expected := []byte("HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=\r\n\r\n")
actual := w.fakeConn.writeBuf.Bytes()
if !bytes.Equal(actual, expected) {
t.Errorf("Expected backend to accept response:\n'%s'\nActually got:\n'%s'", expected, actual)
}
}
func TestWebSocketReverseProxyFromWSClient(t *testing.T) {
// Echo server allows us to test that socket bytes are properly
// being proxied.
wsEcho := httptest.NewServer(websocket.Handler(func(ws *websocket.Conn) {
io.Copy(ws, ws)
}))
defer wsEcho.Close()
// Get proxy to use for the test
p := newWebSocketTestProxy(wsEcho.URL)
// This is a full end-end test, so the proxy handler
// has to be part of a server listening on a port. Our
// WS client will connect to this test server, not
// the echo client directly.
echoProxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p.ServeHTTP(w, r)
}))
defer echoProxy.Close()
// Set up WebSocket client
url := strings.Replace(echoProxy.URL, "http://", "ws://", 1)
ws, err := websocket.Dial(url, "", echoProxy.URL)
if err != nil {
t.Fatal(err)
}
defer ws.Close()
// Send test message
trialMsg := "Is it working?"
websocket.Message.Send(ws, trialMsg)
// It should be echoed back to us
var actualMsg string
websocket.Message.Receive(ws, &actualMsg)
if actualMsg != trialMsg {
t.Errorf("Expected '%s' but got '%s' instead", trialMsg, actualMsg)
}
}
func TestUnixSocketProxy(t *testing.T) {
if runtime.GOOS == "windows" {
return
}
trialMsg := "Is it working?"
var proxySuccess bool
// This is our fake "application" we want to proxy to
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Request was proxied when this is called
proxySuccess = true
fmt.Fprint(w, trialMsg)
}))
// Get absolute path for unix: socket
socketPath, err := filepath.Abs("./test_socket")
if err != nil {
t.Fatalf("Unable to get absolute path: %v", err)
}
// Change httptest.Server listener to listen to unix: socket
ln, err := net.Listen("unix", socketPath)
if err != nil {
t.Fatalf("Unable to listen: %v", err)
}
ts.Listener = ln
ts.Start()
defer ts.Close()
url := strings.Replace(ts.URL, "http://", "unix:", 1)
p := newWebSocketTestProxy(url)
echoProxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p.ServeHTTP(w, r)
}))
defer echoProxy.Close()
res, err := http.Get(echoProxy.URL)
if err != nil {
t.Fatalf("Unable to GET: %v", err)
}
greeting, err := ioutil.ReadAll(res.Body)
res.Body.Close()
if err != nil {
t.Fatalf("Unable to GET: %v", err)
}
actualMsg := fmt.Sprintf("%s", greeting)
if !proxySuccess {
t.Errorf("Expected request to be proxied, but it wasn't")
}
if actualMsg != trialMsg {
t.Errorf("Expected '%s' but got '%s' instead", trialMsg, actualMsg)
}
}
func newFakeUpstream(name string, insecure bool) *fakeUpstream {
uri, _ := url.Parse(name)
u := &fakeUpstream{
name: name,
host: &UpstreamHost{
Name: name,
ReverseProxy: NewSingleHostReverseProxy(uri, ""),
},
}
if insecure {
u.host.ReverseProxy.Transport = InsecureTransport
}
return u
}
type fakeUpstream struct {
name string
host *UpstreamHost
}
func (u *fakeUpstream) From() string {
return "/"
}
func (u *fakeUpstream) Select() *UpstreamHost {
return u.host
}
func (u *fakeUpstream) IsAllowedPath(requestPath string) bool {
return true
}
// newWebSocketTestProxy returns a test proxy that will
// redirect to the specified backendAddr. The function
// also sets up the rules/environment for testing WebSocket
// proxy.
func newWebSocketTestProxy(backendAddr string) *Proxy {
return &Proxy{
Upstreams: []Upstream{&fakeWsUpstream{name: backendAddr}},
}
}
type fakeWsUpstream struct {
name string
}
func (u *fakeWsUpstream) From() string {
return "/"
}
func (u *fakeWsUpstream) Select() *UpstreamHost {
uri, _ := url.Parse(u.name)
return &UpstreamHost{
Name: u.name,
ReverseProxy: NewSingleHostReverseProxy(uri, ""),
ExtraHeaders: http.Header{
"Connection": {"{>Connection}"},
"Upgrade": {"{>Upgrade}"}},
}
}
func (u *fakeWsUpstream) IsAllowedPath(requestPath string) bool {
return true
}
// recorderHijacker is a ResponseRecorder that can
// be hijacked.
type recorderHijacker struct {
*httptest.ResponseRecorder
fakeConn *fakeConn
}
func (rh *recorderHijacker) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return rh.fakeConn, nil, nil
}
type fakeConn struct {
readBuf bytes.Buffer
writeBuf bytes.Buffer
}
func (c *fakeConn) LocalAddr() net.Addr { return nil }
func (c *fakeConn) RemoteAddr() net.Addr { return nil }
func (c *fakeConn) SetDeadline(t time.Time) error { return nil }
func (c *fakeConn) SetReadDeadline(t time.Time) error { return nil }
func (c *fakeConn) SetWriteDeadline(t time.Time) error { return nil }
func (c *fakeConn) Close() error { return nil }
func (c *fakeConn) Read(b []byte) (int, error) { return c.readBuf.Read(b) }
func (c *fakeConn) Write(b []byte) (int, error) { return c.writeBuf.Write(b) }
*/

View file

@ -1,44 +0,0 @@
// Package proxy is middleware that proxies requests.
package proxy
import (
"github.com/miekg/coredns/request"
"github.com/miekg/dns"
)
// ReverseProxy is a basic reverse proxy
type ReverseProxy struct {
Host string
Client Client
Options Options
}
// ServeDNS implements the middleware.Handler interface.
func (p ReverseProxy) ServeDNS(w dns.ResponseWriter, r *dns.Msg, extra []dns.RR) error {
var (
reply *dns.Msg
err error
)
switch {
case request.Proto(w) == "tcp": // TODO(miek): keep this in request
reply, _, err = p.Client.TCP.Exchange(r, p.Host)
default:
reply, _, err = p.Client.UDP.Exchange(r, p.Host)
}
if reply != nil && reply.Truncated {
// Suppress proxy error for truncated responses
err = nil
}
if err != nil {
return err
}
reply.Compress = true
reply.Id = r.Id
w.WriteMsg(reply)
return nil
}

View file

@ -20,7 +20,7 @@ func setup(c *caddy.Controller) error {
return middleware.Error("proxy", err) return middleware.Error("proxy", err)
} }
dnsserver.GetConfig(c).AddMiddleware(func(next middleware.Handler) middleware.Handler { dnsserver.GetConfig(c).AddMiddleware(func(next middleware.Handler) middleware.Handler {
return Proxy{Next: next, Client: Clients(), Upstreams: upstreams} return Proxy{Next: next, Client: NewClient(), Upstreams: upstreams}
}) })
return nil return nil

View file

@ -89,6 +89,7 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
Fails: 0, Fails: 0,
FailTimeout: upstream.FailTimeout, FailTimeout: upstream.FailTimeout,
Unhealthy: false, Unhealthy: false,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool { return func(uh *UpstreamHost) bool {
if uh.Unhealthy { if uh.Unhealthy {

View file

@ -62,3 +62,41 @@ func TestLookupProxy(t *testing.T) {
t.Errorf("Expected 127.0.0.1, got: %s", resp.Answer[0].(*dns.A).A.String()) t.Errorf("Expected 127.0.0.1, got: %s", resp.Answer[0].(*dns.A).A.String())
} }
} }
func BenchmarkLookupProxy(b *testing.B) {
t := new(testing.T)
name, rm, err := test.TempFile(t, ".", exampleOrg)
if err != nil {
t.Fatalf("failed to created zone: %s", err)
}
defer rm()
corefile := `example.org:0 {
file ` + name + `
}
`
i, err := CoreDNSServer(corefile)
if err != nil {
t.Fatalf("could not get CoreDNS serving instance: %s", err)
}
udp, _ := CoreDNSServerPorts(i, 0)
if udp == "" {
t.Fatalf("could not get udp listening port")
}
defer i.Stop()
log.SetOutput(ioutil.Discard)
p := proxy.New([]string{udp})
state := request.Request{W: &test.ResponseWriter{}, Req: new(dns.Msg)}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := p.Lookup(state, "example.org.", dns.TypeA)
if err != nil {
b.Fatal("Expected to receive reply, but didn't")
}
}
}