diff --git a/middleware/kubernetes/README.md b/middleware/kubernetes/README.md index 4872b8fee..14e63c163 100644 --- a/middleware/kubernetes/README.md +++ b/middleware/kubernetes/README.md @@ -27,6 +27,9 @@ kubernetes ZONE [ZONE...] [ * `resyncperiod` specifies the Kubernetes data API **DURATION** period. * `endpoint` specifies the **URL** for a remove k8s API endpoint. If omitted, it will connect to k8s in-cluster using the cluster service account. + Multiple k8s API endpoints could be specified, separated by `,`s, e.g. + `endpoint http://k8s-endpoint1:8080,http://k8s-endpoint2:8080`. CoreDNS + will automatically perform a healthcheck and proxy to the healthy k8s API endpoint. * `tls` **CERT** **KEY** **CACERT** are the TLS cert, key and the CA cert file names for remote k8s connection. This option is ignored if connecting in-cluster (i.e. endpoint is not specified). * `namespaces` **NAMESPACE [NAMESPACE...]**, exposed only the k8s namespaces listed. diff --git a/middleware/kubernetes/apiproxy.go b/middleware/kubernetes/apiproxy.go new file mode 100644 index 000000000..966e5753b --- /dev/null +++ b/middleware/kubernetes/apiproxy.go @@ -0,0 +1,76 @@ +package kubernetes + +import ( + "fmt" + "io" + "log" + "net" + "net/http" + + "github.com/coredns/coredns/middleware/pkg/healthcheck" +) + +type proxyHandler struct { + healthcheck.HealthCheck +} + +type apiProxy struct { + http.Server + listener net.Listener + handler proxyHandler +} + +func (p *proxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + upstream := p.Select() + network := "tcp" + if upstream.Network != "" { + network = upstream.Network + } + address := upstream.Name + d, err := net.Dial(network, address) + if err != nil { + log.Printf("[ERROR] Unable to establish connection to upstream %s://%s: %s", network, address, err) + http.Error(w, fmt.Sprintf("Unable to establish connection to upstream %s://%s: %s", network, address, err), 500) + return + } + hj, ok := w.(http.Hijacker) + if !ok { + log.Printf("[ERROR] Unable to establish connection: no hijacker") + http.Error(w, "Unable to establish connection: no hijacker", 500) + return + } + nc, _, err := hj.Hijack() + if err != nil { + log.Printf("[ERROR] Unable to hijack connection: %s", err) + http.Error(w, fmt.Sprintf("Unable to hijack connection: %s", err), 500) + return + } + defer nc.Close() + defer d.Close() + + err = r.Write(d) + if err != nil { + log.Printf("[ERROR] Unable to copy connection to upstream %s://%s: %s", network, address, err) + http.Error(w, fmt.Sprintf("Unable to copy connection to upstream %s://%s: %s", network, address, err), 500) + return + } + + errChan := make(chan error, 2) + cp := func(dst io.Writer, src io.Reader) { + _, err := io.Copy(dst, src) + errChan <- err + } + go cp(d, nc) + go cp(nc, d) + <-errChan +} + +func (p *apiProxy) Run() { + p.handler.Start() + p.Serve(p.listener) +} + +func (p *apiProxy) Stop() { + p.handler.Stop() + p.Close() +} diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go index 209e09eb4..87c9fd4a7 100644 --- a/middleware/kubernetes/kubernetes.go +++ b/middleware/kubernetes/kubernetes.go @@ -7,11 +7,13 @@ import ( "log" "net" "strings" + "sync/atomic" "time" "github.com/coredns/coredns/middleware" "github.com/coredns/coredns/middleware/etcd/msg" "github.com/coredns/coredns/middleware/pkg/dnsutil" + "github.com/coredns/coredns/middleware/pkg/healthcheck" dnsstrings "github.com/coredns/coredns/middleware/pkg/strings" "github.com/coredns/coredns/middleware/proxy" "github.com/coredns/coredns/request" @@ -32,7 +34,8 @@ type Kubernetes struct { Zones []string primaryZone int Proxy proxy.Proxy // Proxy for looking up names during the resolution process - APIEndpoint string + APIServerList []string + APIProxy *apiProxy APICertAuth string APIClientCert string APIClientKey string @@ -173,8 +176,62 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) { overrides := &clientcmd.ConfigOverrides{} clusterinfo := clientcmdapi.Cluster{} authinfo := clientcmdapi.AuthInfo{} - if len(k.APIEndpoint) > 0 { - clusterinfo.Server = k.APIEndpoint + if len(k.APIServerList) > 0 { + endpoint := k.APIServerList[0] + if len(k.APIServerList) > 1 { + // Use a random port for api proxy, will get the value later through listener.Addr() + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("failed to create kubernetes api proxy: %v", err) + } + k.APIProxy = &apiProxy{ + listener: listener, + handler: proxyHandler{ + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 3 * time.Second, + MaxFails: 1, + Future: 10 * time.Second, + Path: "/", + Interval: 5 * time.Second, + }, + }, + } + k.APIProxy.handler.Hosts = make([]*healthcheck.UpstreamHost, len(k.APIServerList)) + for i, entry := range k.APIServerList { + + uh := &healthcheck.UpstreamHost{ + Name: strings.TrimPrefix(entry, "http://"), + + CheckDown: func(upstream *proxyHandler) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { + + down := false + + uh.CheckMu.Lock() + until := uh.OkUntil + uh.CheckMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + down = true + } + + fails := atomic.LoadInt32(&uh.Fails) + if fails >= upstream.MaxFails && upstream.MaxFails != 0 { + down = true + } + return down + } + }(&k.APIProxy.handler), + } + + k.APIProxy.handler.Hosts[i] = uh + } + k.APIProxy.Handler = &k.APIProxy.handler + + // Find the random port used for api proxy + endpoint = fmt.Sprintf("http://%s", listener.Addr()) + } + clusterinfo.Server = endpoint } else { cc, err := rest.InClusterConfig() if err != nil { diff --git a/middleware/kubernetes/setup.go b/middleware/kubernetes/setup.go index 6b8cbba6e..130dca083 100644 --- a/middleware/kubernetes/setup.go +++ b/middleware/kubernetes/setup.go @@ -39,10 +39,16 @@ func setup(c *caddy.Controller) error { // Register KubeCache start and stop functions with Caddy c.OnStartup(func() error { go kubernetes.APIConn.Run() + if kubernetes.APIProxy != nil { + go kubernetes.APIProxy.Run() + } return nil }) c.OnShutdown(func() error { + if kubernetes.APIProxy != nil { + kubernetes.APIProxy.Stop() + } return kubernetes.APIConn.Stop() }) @@ -140,7 +146,9 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) { case "endpoint": args := c.RemainingArgs() if len(args) > 0 { - k8s.APIEndpoint = args[0] + for _, endpoint := range strings.Split(args[0], ",") { + k8s.APIServerList = append(k8s.APIServerList, strings.TrimSpace(endpoint)) + } continue } return nil, c.ArgErr() diff --git a/middleware/pkg/healthcheck/healthcheck.go b/middleware/pkg/healthcheck/healthcheck.go index e0152a47b..fc9f698e6 100644 --- a/middleware/pkg/healthcheck/healthcheck.go +++ b/middleware/pkg/healthcheck/healthcheck.go @@ -19,6 +19,7 @@ type UpstreamHostDownFunc func(*UpstreamHost) bool type UpstreamHost struct { Conns int64 // must be first field to be 64-bit aligned on 32-bit systems Name string // IP address (and port) of this upstream host + Network string // Network (tcp, unix, etc) of the host, default "" is "tcp" Fails int32 FailTimeout time.Duration OkUntil time.Time diff --git a/test/kubernetes_test.go b/test/kubernetes_test.go index 1d00be3b8..d99ca1618 100644 --- a/test/kubernetes_test.go +++ b/test/kubernetes_test.go @@ -450,7 +450,7 @@ func doIntegrationTests(t *testing.T, corefile string, testCases []test.Case) { // Work-around for timing condition that results in no-data being returned in // test environment. - time.Sleep(1 * time.Second) + time.Sleep(3 * time.Second) for _, tc := range testCases { @@ -513,6 +513,27 @@ func TestKubernetesIntegration(t *testing.T) { doIntegrationTests(t, corefile, dnsTestCases) } +func TestKubernetesIntegrationAPIProxy(t *testing.T) { + + removeUpstreamConfig, upstreamServer, udp := createUpstreamServer(t) + defer upstreamServer.Stop() + defer removeUpstreamConfig() + + corefile := + `.:0 { + kubernetes cluster.local 0.0.10.in-addr.arpa { + endpoint http://nonexistance:8080,http://invalidip:8080,http://localhost:8080 + namespaces test-1 + pods disabled + upstream ` + udp + ` + } + erratic . { + drop 0 + } +` + doIntegrationTests(t, corefile, dnsTestCases) +} + func TestKubernetesIntegrationPodsInsecure(t *testing.T) { corefile := `.:0 {