plugin/grpc: New gRPC plugin (#2667)
* plugin/grpc: New gRPC plugin * some changes after the first review: - remove healthcheck. gRPC already has this implicitly implemented - some naming and stetic changes - fix some comments - other minor fixes * plugin/grpc: New gRPC plugin * some changes after the first review: - remove healthcheck. gRPC already has this implicitly implemented - some naming and stetic changes - fix some comments - other minor fixes * add OWNERS file and change plugin order * remove Rcode checker
This commit is contained in:
parent
0d8e1cf8b4
commit
7b6cb76237
15 changed files with 952 additions and 0 deletions
|
@ -47,4 +47,5 @@ var Directives = []string{
|
||||||
"erratic",
|
"erratic",
|
||||||
"whoami",
|
"whoami",
|
||||||
"on",
|
"on",
|
||||||
|
"grpc",
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
_ "github.com/coredns/coredns/plugin/federation"
|
_ "github.com/coredns/coredns/plugin/federation"
|
||||||
_ "github.com/coredns/coredns/plugin/file"
|
_ "github.com/coredns/coredns/plugin/file"
|
||||||
_ "github.com/coredns/coredns/plugin/forward"
|
_ "github.com/coredns/coredns/plugin/forward"
|
||||||
|
_ "github.com/coredns/coredns/plugin/grpc"
|
||||||
_ "github.com/coredns/coredns/plugin/health"
|
_ "github.com/coredns/coredns/plugin/health"
|
||||||
_ "github.com/coredns/coredns/plugin/hosts"
|
_ "github.com/coredns/coredns/plugin/hosts"
|
||||||
_ "github.com/coredns/coredns/plugin/k8s_external"
|
_ "github.com/coredns/coredns/plugin/k8s_external"
|
||||||
|
|
1
go.sum
1
go.sum
|
@ -109,6 +109,7 @@ github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg
|
||||||
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||||
github.com/openzipkin/zipkin-go-opentracing v0.3.4 h1:x/pBv/5VJNWkcHF1G9xqhug8Iw7X1y1zOMzDmyuvP2g=
|
github.com/openzipkin/zipkin-go-opentracing v0.3.4 h1:x/pBv/5VJNWkcHF1G9xqhug8Iw7X1y1zOMzDmyuvP2g=
|
||||||
github.com/openzipkin/zipkin-go-opentracing v0.3.4/go.mod h1:js2AbwmHW0YD9DwIw2JhQWmbfFi/UnWyYwdVhqbCDOE=
|
github.com/openzipkin/zipkin-go-opentracing v0.3.4/go.mod h1:js2AbwmHW0YD9DwIw2JhQWmbfFi/UnWyYwdVhqbCDOE=
|
||||||
|
github.com/petar/GoLLRB v0.0.0-20130427215148-53be0d36a84c/go.mod h1:HUpKUBZnpzkdx0kD/+Yfuft+uD3zHGtXF/XJB14TUr4=
|
||||||
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
|
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
|
||||||
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
|
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
|
||||||
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
|
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
|
||||||
|
|
|
@ -53,6 +53,7 @@ etcd:etcd
|
||||||
loop:loop
|
loop:loop
|
||||||
forward:forward
|
forward:forward
|
||||||
proxy:deprecated
|
proxy:deprecated
|
||||||
|
grpc:grpc
|
||||||
erratic:erratic
|
erratic:erratic
|
||||||
whoami:whoami
|
whoami:whoami
|
||||||
on:github.com/mholt/caddy/onevent
|
on:github.com/mholt/caddy/onevent
|
||||||
|
|
6
plugin/grpc/OWNERS
Normal file
6
plugin/grpc/OWNERS
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
reviewers:
|
||||||
|
- inigohu
|
||||||
|
- miekg
|
||||||
|
approvers:
|
||||||
|
- inigohu
|
||||||
|
- miekg
|
135
plugin/grpc/README.md
Normal file
135
plugin/grpc/README.md
Normal file
|
@ -0,0 +1,135 @@
|
||||||
|
# grpc
|
||||||
|
|
||||||
|
## Name
|
||||||
|
|
||||||
|
*grpc* - facilitates proxying DNS messages to upstream resolvers via gRPC protocol.
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
The *grpc* plugin supports gRPC and TLS.
|
||||||
|
|
||||||
|
This plugin can only be used once per Server Block.
|
||||||
|
|
||||||
|
## Syntax
|
||||||
|
|
||||||
|
In its most basic form:
|
||||||
|
|
||||||
|
~~~
|
||||||
|
grpc FROM TO...
|
||||||
|
~~~
|
||||||
|
|
||||||
|
* **FROM** is the base domain to match for the request to be proxied.
|
||||||
|
* **TO...** are the destination endpoints to proxy to. The number of upstreams is
|
||||||
|
limited to 15.
|
||||||
|
|
||||||
|
Multiple upstreams are randomized (see `policy`) on first use. When a proxy returns an error
|
||||||
|
the next upstream in the list is tried.
|
||||||
|
|
||||||
|
Extra knobs are available with an expanded syntax:
|
||||||
|
|
||||||
|
~~~
|
||||||
|
grpc FROM TO... {
|
||||||
|
except IGNORED_NAMES...
|
||||||
|
tls CERT KEY CA
|
||||||
|
tls_servername NAME
|
||||||
|
policy random|round_robin|sequential
|
||||||
|
}
|
||||||
|
~~~
|
||||||
|
|
||||||
|
* **FROM** and **TO...** as above.
|
||||||
|
* **IGNORED_NAMES** in `except` is a space-separated list of domains to exclude from proxying.
|
||||||
|
Requests that match none of these names will be passed through.
|
||||||
|
* `tls` **CERT** **KEY** **CA** define the TLS properties for TLS connection. From 0 to 3 arguments can be
|
||||||
|
provided with the meaning as described below
|
||||||
|
|
||||||
|
* `tls` - no client authentication is used, and the system CAs are used to verify the server certificate
|
||||||
|
* `tls` **CA** - no client authentication is used, and the file CA is used to verify the server certificate
|
||||||
|
* `tls` **CERT** **KEY** - client authentication is used with the specified cert/key pair.
|
||||||
|
The server certificate is verified with the system CAs
|
||||||
|
* `tls` **CERT** **KEY** **CA** - client authentication is used with the specified cert/key pair.
|
||||||
|
The server certificate is verified using the specified CA file
|
||||||
|
|
||||||
|
* `tls_servername` **NAME** allows you to set a server name in the TLS configuration; for instance 9.9.9.9
|
||||||
|
needs this to be set to `dns.quad9.net`. Multiple upstreams are still allowed in this scenario,
|
||||||
|
but they have to use the same `tls_servername`. E.g. mixing 9.9.9.9 (QuadDNS) with 1.1.1.1
|
||||||
|
(Cloudflare) will not work.
|
||||||
|
* `policy` specifies the policy to use for selecting upstream servers. The default is `random`.
|
||||||
|
|
||||||
|
Also note the TLS config is "global" for the whole grpc proxy if you need a different
|
||||||
|
`tls-name` for different upstreams you're out of luck.
|
||||||
|
|
||||||
|
## Metrics
|
||||||
|
|
||||||
|
If monitoring is enabled (via the *prometheus* directive) then the following metric are exported:
|
||||||
|
|
||||||
|
* `coredns_grpc_request_duration_seconds{to}` - duration per upstream interaction.
|
||||||
|
* `coredns_grpc_request_count_total{to}` - query count per upstream.
|
||||||
|
* `coredns_grpc_response_rcode_total{to, rcode}` - count of RCODEs per upstream.
|
||||||
|
and we are randomly (this always uses the `random` policy) spraying to an upstream.
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
|
||||||
|
Proxy all requests within `example.org.` to a nameserver running on a different port:
|
||||||
|
|
||||||
|
~~~ corefile
|
||||||
|
example.org {
|
||||||
|
grpc . 127.0.0.1:9005
|
||||||
|
}
|
||||||
|
~~~
|
||||||
|
|
||||||
|
Load balance all requests between three resolvers, one of which has a IPv6 address.
|
||||||
|
|
||||||
|
~~~ corefile
|
||||||
|
. {
|
||||||
|
grpc . 10.0.0.10:53 10.0.0.11:1053 [2003::1]:53
|
||||||
|
}
|
||||||
|
~~~
|
||||||
|
|
||||||
|
Forward everything except requests to `example.org`
|
||||||
|
|
||||||
|
~~~ corefile
|
||||||
|
. {
|
||||||
|
grpc . 10.0.0.10:1234 {
|
||||||
|
except example.org
|
||||||
|
}
|
||||||
|
}
|
||||||
|
~~~
|
||||||
|
|
||||||
|
Proxy everything except `example.org` using the host's `resolv.conf`'s nameservers:
|
||||||
|
|
||||||
|
~~~ corefile
|
||||||
|
. {
|
||||||
|
grpc . /etc/resolv.conf {
|
||||||
|
except example.org
|
||||||
|
}
|
||||||
|
}
|
||||||
|
~~~
|
||||||
|
|
||||||
|
Proxy all requests to 9.9.9.9 using the TLS protocol, and cache every answer for up to 30
|
||||||
|
seconds. Note the `tls_servername` is mandatory if you want a working setup, as 9.9.9.9 can't be
|
||||||
|
used in the TLS negotiation.
|
||||||
|
|
||||||
|
~~~ corefile
|
||||||
|
. {
|
||||||
|
grpc . 9.9.9.9 {
|
||||||
|
tls_servername dns.quad9.net
|
||||||
|
}
|
||||||
|
cache 30
|
||||||
|
}
|
||||||
|
~~~
|
||||||
|
|
||||||
|
Or with multiple upstreams from the same provider
|
||||||
|
|
||||||
|
~~~ corefile
|
||||||
|
. {
|
||||||
|
grpc . 1.1.1.1 1.0.0.1 {
|
||||||
|
tls_servername cloudflare-dns.com
|
||||||
|
}
|
||||||
|
cache 30
|
||||||
|
}
|
||||||
|
~~~
|
||||||
|
|
||||||
|
## Bugs
|
||||||
|
|
||||||
|
The TLS config is global for the whole grpc proxy if you need a different `tls_servername` for
|
||||||
|
different upstreams you're out of luck.
|
130
plugin/grpc/grpc.go
Normal file
130
plugin/grpc/grpc.go
Normal file
|
@ -0,0 +1,130 @@
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coredns/coredns/plugin"
|
||||||
|
"github.com/coredns/coredns/plugin/debug"
|
||||||
|
"github.com/coredns/coredns/request"
|
||||||
|
|
||||||
|
"github.com/miekg/dns"
|
||||||
|
ot "github.com/opentracing/opentracing-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GRPC represents a plugin instance that can proxy requests to another (DNS) server via gRPC protocol.
|
||||||
|
// It has a list of proxies each representing one upstream proxy.
|
||||||
|
type GRPC struct {
|
||||||
|
proxies []*Proxy
|
||||||
|
p Policy
|
||||||
|
|
||||||
|
from string
|
||||||
|
ignored []string
|
||||||
|
|
||||||
|
tlsConfig *tls.Config
|
||||||
|
tlsServerName string
|
||||||
|
|
||||||
|
Next plugin.Handler
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeDNS implements the plugin.Handler interface.
|
||||||
|
func (g *GRPC) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
|
||||||
|
state := request.Request{W: w, Req: r}
|
||||||
|
if !g.match(state) {
|
||||||
|
return plugin.NextOrFailure(g.Name(), g.Next, ctx, w, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
span, child ot.Span
|
||||||
|
ret *dns.Msg
|
||||||
|
err error
|
||||||
|
i int
|
||||||
|
)
|
||||||
|
span = ot.SpanFromContext(ctx)
|
||||||
|
list := g.list()
|
||||||
|
deadline := time.Now().Add(defaultTimeout)
|
||||||
|
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if i >= len(list) {
|
||||||
|
// reached the end of list without any answer
|
||||||
|
if ret != nil {
|
||||||
|
// write empty response and finish
|
||||||
|
w.WriteMsg(ret)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
proxy := list[i]
|
||||||
|
i++
|
||||||
|
|
||||||
|
if span != nil {
|
||||||
|
child = span.Tracer().StartSpan("query", ot.ChildOf(span.Context()))
|
||||||
|
ctx = ot.ContextWithSpan(ctx, child)
|
||||||
|
}
|
||||||
|
|
||||||
|
ret, err = proxy.query(ctx, r)
|
||||||
|
if err != nil {
|
||||||
|
// Continue with the next proxy
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if child != nil {
|
||||||
|
child.Finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the reply is correct; if not return FormErr.
|
||||||
|
if !state.Match(ret) {
|
||||||
|
debug.Hexdumpf(ret, "Wrong reply for id: %d, %s %d", ret.Id, state.QName(), state.QType())
|
||||||
|
|
||||||
|
formerr := state.ErrorMessage(dns.RcodeFormatError)
|
||||||
|
w.WriteMsg(formerr)
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteMsg(ret)
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewGRPC returns a new GRPC.
|
||||||
|
func newGRPC() *GRPC {
|
||||||
|
g := &GRPC{
|
||||||
|
p: new(random),
|
||||||
|
}
|
||||||
|
return g
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name implements the Handler interface.
|
||||||
|
func (g *GRPC) Name() string { return "grpc" }
|
||||||
|
|
||||||
|
// Len returns the number of configured proxies.
|
||||||
|
func (g *GRPC) len() int { return len(g.proxies) }
|
||||||
|
|
||||||
|
func (g *GRPC) match(state request.Request) bool {
|
||||||
|
if !plugin.Name(g.from).Matches(state.Name()) || !g.isAllowedDomain(state.Name()) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GRPC) isAllowedDomain(name string) bool {
|
||||||
|
if dns.Name(name) == dns.Name(g.from) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ignore := range g.ignored {
|
||||||
|
if plugin.Name(ignore).Matches(name) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// List returns a set of proxies to be used for this client depending on the policy in p.
|
||||||
|
func (g *GRPC) list() []*Proxy { return g.p.List(g.proxies) }
|
||||||
|
|
||||||
|
const defaultTimeout = 5 * time.Second
|
75
plugin/grpc/grpc_test.go
Normal file
75
plugin/grpc/grpc_test.go
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/coredns/coredns/pb"
|
||||||
|
"github.com/coredns/coredns/plugin/pkg/dnstest"
|
||||||
|
"github.com/coredns/coredns/plugin/test"
|
||||||
|
|
||||||
|
"github.com/miekg/dns"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGRPC(t *testing.T) {
|
||||||
|
m := &dns.Msg{}
|
||||||
|
msg, err := m.Pack()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error packing response: %s", err.Error())
|
||||||
|
}
|
||||||
|
dnsPacket := &pb.DnsPacket{Msg: msg}
|
||||||
|
tests := map[string]struct {
|
||||||
|
proxies []*Proxy
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
"single_proxy_ok": {
|
||||||
|
proxies: []*Proxy{
|
||||||
|
{client: &testServiceClient{dnsPacket: dnsPacket, err: nil}},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
"multiple_proxies_ok": {
|
||||||
|
proxies: []*Proxy{
|
||||||
|
{client: &testServiceClient{dnsPacket: dnsPacket, err: nil}},
|
||||||
|
{client: &testServiceClient{dnsPacket: dnsPacket, err: nil}},
|
||||||
|
{client: &testServiceClient{dnsPacket: dnsPacket, err: nil}},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
"single_proxy_ko": {
|
||||||
|
proxies: []*Proxy{
|
||||||
|
{client: &testServiceClient{dnsPacket: nil, err: errors.New("")}},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
"multiple_proxies_one_ko": {
|
||||||
|
proxies: []*Proxy{
|
||||||
|
{client: &testServiceClient{dnsPacket: dnsPacket, err: nil}},
|
||||||
|
{client: &testServiceClient{dnsPacket: nil, err: errors.New("")}},
|
||||||
|
{client: &testServiceClient{dnsPacket: dnsPacket, err: nil}},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
"multiple_proxies_ko": {
|
||||||
|
proxies: []*Proxy{
|
||||||
|
{client: &testServiceClient{dnsPacket: nil, err: errors.New("")}},
|
||||||
|
{client: &testServiceClient{dnsPacket: nil, err: errors.New("")}},
|
||||||
|
{client: &testServiceClient{dnsPacket: nil, err: errors.New("")}},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tt := range tests {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
g := newGRPC()
|
||||||
|
g.from = "."
|
||||||
|
g.proxies = tt.proxies
|
||||||
|
rec := dnstest.NewRecorder(&test.ResponseWriter{})
|
||||||
|
if _, err := g.ServeDNS(context.TODO(), rec, m); err != nil && !tt.wantErr {
|
||||||
|
t.Fatal("Expected to receive reply, but didn't")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
30
plugin/grpc/metrics.go
Normal file
30
plugin/grpc/metrics.go
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/coredns/coredns/plugin"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Variables declared for monitoring.
|
||||||
|
var (
|
||||||
|
RequestCount = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: plugin.Namespace,
|
||||||
|
Subsystem: "grpc",
|
||||||
|
Name: "request_count_total",
|
||||||
|
Help: "Counter of requests made per upstream.",
|
||||||
|
}, []string{"to"})
|
||||||
|
RcodeCount = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: plugin.Namespace,
|
||||||
|
Subsystem: "grpc",
|
||||||
|
Name: "response_rcode_count_total",
|
||||||
|
Help: "Counter of requests made per upstream.",
|
||||||
|
}, []string{"rcode", "to"})
|
||||||
|
RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Namespace: plugin.Namespace,
|
||||||
|
Subsystem: "grpc",
|
||||||
|
Name: "request_duration_seconds",
|
||||||
|
Buckets: plugin.TimeBuckets,
|
||||||
|
Help: "Histogram of the time each request took.",
|
||||||
|
}, []string{"to"})
|
||||||
|
)
|
64
plugin/grpc/policy.go
Normal file
64
plugin/grpc/policy.go
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Policy defines a policy we use for selecting upstreams.
|
||||||
|
type Policy interface {
|
||||||
|
List([]*Proxy) []*Proxy
|
||||||
|
String() string
|
||||||
|
}
|
||||||
|
|
||||||
|
// random is a policy that implements random upstream selection.
|
||||||
|
type random struct{}
|
||||||
|
|
||||||
|
func (r *random) String() string { return "random" }
|
||||||
|
|
||||||
|
func (r *random) List(p []*Proxy) []*Proxy {
|
||||||
|
switch len(p) {
|
||||||
|
case 1:
|
||||||
|
return p
|
||||||
|
case 2:
|
||||||
|
if rand.Int()%2 == 0 {
|
||||||
|
return []*Proxy{p[1], p[0]} // swap
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
perms := rand.Perm(len(p))
|
||||||
|
rnd := make([]*Proxy, len(p))
|
||||||
|
|
||||||
|
for i, p1 := range perms {
|
||||||
|
rnd[i] = p[p1]
|
||||||
|
}
|
||||||
|
return rnd
|
||||||
|
}
|
||||||
|
|
||||||
|
// roundRobin is a policy that selects hosts based on round robin ordering.
|
||||||
|
type roundRobin struct {
|
||||||
|
robin uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *roundRobin) String() string { return "round_robin" }
|
||||||
|
|
||||||
|
func (r *roundRobin) List(p []*Proxy) []*Proxy {
|
||||||
|
poolLen := uint32(len(p))
|
||||||
|
i := atomic.AddUint32(&r.robin, 1) % poolLen
|
||||||
|
|
||||||
|
robin := []*Proxy{p[i]}
|
||||||
|
robin = append(robin, p[:i]...)
|
||||||
|
robin = append(robin, p[i+1:]...)
|
||||||
|
|
||||||
|
return robin
|
||||||
|
}
|
||||||
|
|
||||||
|
// sequential is a policy that selects hosts based on sequential ordering.
|
||||||
|
type sequential struct{}
|
||||||
|
|
||||||
|
func (r *sequential) String() string { return "sequential" }
|
||||||
|
|
||||||
|
func (r *sequential) List(p []*Proxy) []*Proxy {
|
||||||
|
return p
|
||||||
|
}
|
81
plugin/grpc/proxy.go
Normal file
81
plugin/grpc/proxy.go
Normal file
|
@ -0,0 +1,81 @@
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coredns/coredns/pb"
|
||||||
|
|
||||||
|
"github.com/miekg/dns"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Proxy defines an upstream host.
|
||||||
|
type Proxy struct {
|
||||||
|
addr string
|
||||||
|
|
||||||
|
// connection
|
||||||
|
client pb.DnsServiceClient
|
||||||
|
dialOpts []grpc.DialOption
|
||||||
|
}
|
||||||
|
|
||||||
|
// newProxy returns a new proxy.
|
||||||
|
func newProxy(addr string, tlsConfig *tls.Config) (*Proxy, error) {
|
||||||
|
p := &Proxy{
|
||||||
|
addr: addr,
|
||||||
|
}
|
||||||
|
|
||||||
|
if tlsConfig != nil {
|
||||||
|
p.dialOpts = append(p.dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
|
||||||
|
} else {
|
||||||
|
p.dialOpts = append(p.dialOpts, grpc.WithInsecure())
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := grpc.Dial(p.addr, p.dialOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
p.client = pb.NewDnsServiceClient(conn)
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// query sends the request and waits for a response.
|
||||||
|
func (p *Proxy) query(ctx context.Context, req *dns.Msg) (*dns.Msg, error) {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
msg, err := req.Pack()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
reply, err := p.client.Query(ctx, &pb.DnsPacket{Msg: msg})
|
||||||
|
if err != nil {
|
||||||
|
// if not found message, return empty message with NXDomain code
|
||||||
|
if status.Code(err) == codes.NotFound {
|
||||||
|
m := new(dns.Msg).SetRcode(req, dns.RcodeNameError)
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ret := new(dns.Msg)
|
||||||
|
if err := ret.Unpack(reply.Msg); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rc, ok := dns.RcodeToString[ret.Rcode]
|
||||||
|
if !ok {
|
||||||
|
rc = strconv.Itoa(ret.Rcode)
|
||||||
|
}
|
||||||
|
|
||||||
|
RequestCount.WithLabelValues(p.addr).Add(1)
|
||||||
|
RcodeCount.WithLabelValues(rc, p.addr).Add(1)
|
||||||
|
RequestDuration.WithLabelValues(p.addr).Observe(time.Since(start).Seconds())
|
||||||
|
|
||||||
|
return ret, nil
|
||||||
|
}
|
66
plugin/grpc/proxy_test.go
Normal file
66
plugin/grpc/proxy_test.go
Normal file
|
@ -0,0 +1,66 @@
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/coredns/coredns/pb"
|
||||||
|
|
||||||
|
"github.com/miekg/dns"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestProxy(t *testing.T) {
|
||||||
|
tests := map[string]struct {
|
||||||
|
p *Proxy
|
||||||
|
res *dns.Msg
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
"response_ok": {
|
||||||
|
p: &Proxy{},
|
||||||
|
res: &dns.Msg{},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
"nil_response": {
|
||||||
|
p: &Proxy{},
|
||||||
|
res: nil,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
"tls": {
|
||||||
|
p: &Proxy{dialOpts: []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(nil))}},
|
||||||
|
res: &dns.Msg{},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for name, tt := range tests {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
var mock *testServiceClient
|
||||||
|
if tt.res != nil {
|
||||||
|
msg, err := tt.res.Pack()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error packing response: %s", err.Error())
|
||||||
|
}
|
||||||
|
mock = &testServiceClient{&pb.DnsPacket{Msg: msg}, nil}
|
||||||
|
} else {
|
||||||
|
mock = &testServiceClient{nil, errors.New("server error")}
|
||||||
|
}
|
||||||
|
tt.p.client = mock
|
||||||
|
|
||||||
|
_, err := tt.p.query(context.TODO(), new(dns.Msg))
|
||||||
|
if err != nil && !tt.wantErr {
|
||||||
|
t.Fatalf("Error query(): %s", err.Error())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type testServiceClient struct {
|
||||||
|
dnsPacket *pb.DnsPacket
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m testServiceClient) Query(ctx context.Context, in *pb.DnsPacket, opts ...grpc.CallOption) (*pb.DnsPacket, error) {
|
||||||
|
return m.dnsPacket, m.err
|
||||||
|
}
|
158
plugin/grpc/setup.go
Normal file
158
plugin/grpc/setup.go
Normal file
|
@ -0,0 +1,158 @@
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/coredns/coredns/core/dnsserver"
|
||||||
|
"github.com/coredns/coredns/plugin"
|
||||||
|
"github.com/coredns/coredns/plugin/metrics"
|
||||||
|
"github.com/coredns/coredns/plugin/pkg/parse"
|
||||||
|
pkgtls "github.com/coredns/coredns/plugin/pkg/tls"
|
||||||
|
|
||||||
|
"github.com/mholt/caddy"
|
||||||
|
"github.com/mholt/caddy/caddyfile"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
caddy.RegisterPlugin("grpc", caddy.Plugin{
|
||||||
|
ServerType: "dns",
|
||||||
|
Action: setup,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func setup(c *caddy.Controller) error {
|
||||||
|
g, err := parseGRPC(c)
|
||||||
|
if err != nil {
|
||||||
|
return plugin.Error("grpc", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if g.len() > max {
|
||||||
|
return plugin.Error("grpc", fmt.Errorf("more than %d TOs configured: %d", max, g.len()))
|
||||||
|
}
|
||||||
|
|
||||||
|
dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
|
||||||
|
g.Next = next // Set the Next field, so the plugin chaining works.
|
||||||
|
return g
|
||||||
|
})
|
||||||
|
|
||||||
|
c.OnStartup(func() error {
|
||||||
|
metrics.MustRegister(c, RequestCount, RcodeCount, RequestDuration)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseGRPC(c *caddy.Controller) (*GRPC, error) {
|
||||||
|
var (
|
||||||
|
g *GRPC
|
||||||
|
err error
|
||||||
|
i int
|
||||||
|
)
|
||||||
|
for c.Next() {
|
||||||
|
if i > 0 {
|
||||||
|
return nil, plugin.ErrOnce
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
g, err = parseGRPCStanza(&c.Dispenser)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return g, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseGRPCStanza(c *caddyfile.Dispenser) (*GRPC, error) {
|
||||||
|
g := newGRPC()
|
||||||
|
|
||||||
|
if !c.Args(&g.from) {
|
||||||
|
return g, c.ArgErr()
|
||||||
|
}
|
||||||
|
g.from = plugin.Host(g.from).Normalize()
|
||||||
|
|
||||||
|
to := c.RemainingArgs()
|
||||||
|
if len(to) == 0 {
|
||||||
|
return g, c.ArgErr()
|
||||||
|
}
|
||||||
|
|
||||||
|
toHosts, err := parse.HostPortOrFile(to...)
|
||||||
|
if err != nil {
|
||||||
|
return g, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if g.tlsServerName != "" {
|
||||||
|
if g.tlsConfig == nil {
|
||||||
|
g.tlsConfig = new(tls.Config)
|
||||||
|
}
|
||||||
|
g.tlsConfig.ServerName = g.tlsServerName
|
||||||
|
}
|
||||||
|
for _, host := range toHosts {
|
||||||
|
pr, err := newProxy(host, g.tlsConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
g.proxies = append(g.proxies, pr)
|
||||||
|
}
|
||||||
|
|
||||||
|
for c.NextBlock() {
|
||||||
|
if err := parseBlock(c, g); err != nil {
|
||||||
|
return g, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return g, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseBlock(c *caddyfile.Dispenser, g *GRPC) error {
|
||||||
|
|
||||||
|
switch c.Val() {
|
||||||
|
case "except":
|
||||||
|
ignore := c.RemainingArgs()
|
||||||
|
if len(ignore) == 0 {
|
||||||
|
return c.ArgErr()
|
||||||
|
}
|
||||||
|
for i := 0; i < len(ignore); i++ {
|
||||||
|
ignore[i] = plugin.Host(ignore[i]).Normalize()
|
||||||
|
}
|
||||||
|
g.ignored = ignore
|
||||||
|
case "tls":
|
||||||
|
args := c.RemainingArgs()
|
||||||
|
if len(args) > 3 {
|
||||||
|
return c.ArgErr()
|
||||||
|
}
|
||||||
|
|
||||||
|
tlsConfig, err := pkgtls.NewTLSConfigFromArgs(args...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
g.tlsConfig = tlsConfig
|
||||||
|
case "tls_servername":
|
||||||
|
if !c.NextArg() {
|
||||||
|
return c.ArgErr()
|
||||||
|
}
|
||||||
|
g.tlsServerName = c.Val()
|
||||||
|
case "policy":
|
||||||
|
if !c.NextArg() {
|
||||||
|
return c.ArgErr()
|
||||||
|
}
|
||||||
|
switch x := c.Val(); x {
|
||||||
|
case "random":
|
||||||
|
g.p = &random{}
|
||||||
|
case "round_robin":
|
||||||
|
g.p = &roundRobin{}
|
||||||
|
case "sequential":
|
||||||
|
g.p = &sequential{}
|
||||||
|
default:
|
||||||
|
return c.Errf("unknown policy '%s'", x)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if c.Val() != "}" {
|
||||||
|
return c.Errf("unknown property '%s'", c.Val())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const max = 15 // Maximum number of upstreams.
|
47
plugin/grpc/setup_policy_test.go
Normal file
47
plugin/grpc/setup_policy_test.go
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/mholt/caddy"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSetupPolicy(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input string
|
||||||
|
shouldErr bool
|
||||||
|
expectedPolicy string
|
||||||
|
expectedErr string
|
||||||
|
}{
|
||||||
|
// positive
|
||||||
|
{"grpc . 127.0.0.1 {\npolicy random\n}\n", false, "random", ""},
|
||||||
|
{"grpc . 127.0.0.1 {\npolicy round_robin\n}\n", false, "round_robin", ""},
|
||||||
|
{"grpc . 127.0.0.1 {\npolicy sequential\n}\n", false, "sequential", ""},
|
||||||
|
// negative
|
||||||
|
{"grpc . 127.0.0.1 {\npolicy random2\n}\n", true, "random", "unknown policy"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
c := caddy.NewTestController("dns", test.input)
|
||||||
|
g, err := parseGRPC(c)
|
||||||
|
|
||||||
|
if test.shouldErr && err == nil {
|
||||||
|
t.Errorf("Test %d: expected error but found %s for input %s", i, err, test.input)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if !test.shouldErr {
|
||||||
|
t.Errorf("Test %d: expected no error but found one for input %s, got: %v", i, test.input, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(err.Error(), test.expectedErr) {
|
||||||
|
t.Errorf("Test %d: expected error to contain: %v, found error: %v, input: %s", i, test.expectedErr, err, test.input)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !test.shouldErr && g.p.String() != test.expectedPolicy {
|
||||||
|
t.Errorf("Test %d: expected: %s, got: %s", i, test.expectedPolicy, g.p.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
156
plugin/grpc/setup_test.go
Normal file
156
plugin/grpc/setup_test.go
Normal file
|
@ -0,0 +1,156 @@
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/mholt/caddy"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSetup(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input string
|
||||||
|
shouldErr bool
|
||||||
|
expectedFrom string
|
||||||
|
expectedIgnored []string
|
||||||
|
expectedErr string
|
||||||
|
}{
|
||||||
|
// positive
|
||||||
|
{"grpc . 127.0.0.1", false, ".", nil, ""},
|
||||||
|
{"grpc . 127.0.0.1 {\nexcept miek.nl\n}\n", false, ".", nil, ""},
|
||||||
|
{"grpc . 127.0.0.1", false, ".", nil, ""},
|
||||||
|
{"grpc . 127.0.0.1:53", false, ".", nil, ""},
|
||||||
|
{"grpc . 127.0.0.1:8080", false, ".", nil, ""},
|
||||||
|
{"grpc . [::1]:53", false, ".", nil, ""},
|
||||||
|
{"grpc . [2003::1]:53", false, ".", nil, ""},
|
||||||
|
// negative
|
||||||
|
{"grpc . a27.0.0.1", true, "", nil, "not an IP"},
|
||||||
|
{"grpc . 127.0.0.1 {\nblaatl\n}\n", true, "", nil, "unknown property"},
|
||||||
|
{`grpc . ::1
|
||||||
|
grpc com ::2`, true, "", nil, "plugin"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
c := caddy.NewTestController("grpc", test.input)
|
||||||
|
g, err := parseGRPC(c)
|
||||||
|
|
||||||
|
if test.shouldErr && err == nil {
|
||||||
|
t.Errorf("Test %d: expected error but found %s for input %s", i, err, test.input)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if !test.shouldErr {
|
||||||
|
t.Errorf("Test %d: expected no error but found one for input %s, got: %v", i, test.input, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(err.Error(), test.expectedErr) {
|
||||||
|
t.Errorf("Test %d: expected error to contain: %v, found error: %v, input: %s", i, test.expectedErr, err, test.input)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !test.shouldErr && g.from != test.expectedFrom {
|
||||||
|
t.Errorf("Test %d: expected: %s, got: %s", i, test.expectedFrom, g.from)
|
||||||
|
}
|
||||||
|
if !test.shouldErr && test.expectedIgnored != nil {
|
||||||
|
if !reflect.DeepEqual(g.ignored, test.expectedIgnored) {
|
||||||
|
t.Errorf("Test %d: expected: %q, actual: %q", i, test.expectedIgnored, g.ignored)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSetupTLS(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
input string
|
||||||
|
shouldErr bool
|
||||||
|
expectedServerName string
|
||||||
|
expectedErr string
|
||||||
|
}{
|
||||||
|
// positive
|
||||||
|
{`grpc . 127.0.0.1 {
|
||||||
|
tls_servername dns
|
||||||
|
}`, false, "dns", ""},
|
||||||
|
{`grpc . 127.0.0.1 {
|
||||||
|
tls_servername dns
|
||||||
|
}`, false, "", ""},
|
||||||
|
{`grpc . 127.0.0.1 {
|
||||||
|
tls
|
||||||
|
}`, false, "", ""},
|
||||||
|
{`grpc . 127.0.0.1`, false, "", ""},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
c := caddy.NewTestController("dns", test.input)
|
||||||
|
g, err := parseGRPC(c)
|
||||||
|
|
||||||
|
if test.shouldErr && err == nil {
|
||||||
|
t.Errorf("Test %d: expected error but found %s for input %s", i, err, test.input)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if !test.shouldErr {
|
||||||
|
t.Errorf("Test %d: expected no error but found one for input %s, got: %v", i, test.input, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(err.Error(), test.expectedErr) {
|
||||||
|
t.Errorf("Test %d: expected error to contain: %v, found error: %v, input: %s", i, test.expectedErr, err, test.input)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !test.shouldErr && test.expectedServerName != "" && g.tlsConfig != nil && test.expectedServerName != g.tlsConfig.ServerName {
|
||||||
|
t.Errorf("Test %d: expected: %q, actual: %q", i, test.expectedServerName, g.tlsConfig.ServerName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSetupResolvconf(t *testing.T) {
|
||||||
|
const resolv = "resolv.conf"
|
||||||
|
if err := ioutil.WriteFile(resolv,
|
||||||
|
[]byte(`nameserver 10.10.255.252
|
||||||
|
nameserver 10.10.255.253`), 0666); err != nil {
|
||||||
|
t.Fatalf("Failed to write resolv.conf file: %s", err)
|
||||||
|
}
|
||||||
|
defer os.Remove(resolv)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
input string
|
||||||
|
shouldErr bool
|
||||||
|
expectedErr string
|
||||||
|
expectedNames []string
|
||||||
|
}{
|
||||||
|
// pass
|
||||||
|
{`grpc . ` + resolv, false, "", []string{"10.10.255.252:53", "10.10.255.253:53"}},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
c := caddy.NewTestController("grpc", test.input)
|
||||||
|
f, err := parseGRPC(c)
|
||||||
|
|
||||||
|
if test.shouldErr && err == nil {
|
||||||
|
t.Errorf("Test %d: expected error but found %s for input %s", i, err, test.input)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if !test.shouldErr {
|
||||||
|
t.Errorf("Test %d: expected no error but found one for input %s, got: %v", i, test.input, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(err.Error(), test.expectedErr) {
|
||||||
|
t.Errorf("Test %d: expected error to contain: %v, found error: %v, input: %s", i, test.expectedErr, err, test.input)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !test.shouldErr {
|
||||||
|
for j, n := range test.expectedNames {
|
||||||
|
addr := f.proxies[j].addr
|
||||||
|
if n != addr {
|
||||||
|
t.Errorf("Test %d, expected %q, got %q", j, n, addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue