From a3dd8cdf8de1a9aacc36fe8cf0f268e69bd8a45d Mon Sep 17 00:00:00 2001 From: Matt Greenfield Date: Fri, 22 Mar 2019 08:32:40 -0600 Subject: [PATCH] Add `namespace_labels` configuration for kubernetes plugin (#2707) --- plugin/kubernetes/README.md | 5 ++ plugin/kubernetes/controller.go | 13 +++-- plugin/kubernetes/external.go | 2 +- plugin/kubernetes/handler_test.go | 57 ++++++++++++++++++++++ plugin/kubernetes/kubernetes.go | 56 +++++++++++++-------- plugin/kubernetes/namespace.go | 21 +++++--- plugin/kubernetes/namespace_test.go | 72 +++++++++++++++++++++++++++ plugin/kubernetes/setup.go | 16 ++++++ plugin/kubernetes/setup_test.go | 76 +++++++++++++++++++++++++---- 9 files changed, 278 insertions(+), 40 deletions(-) create mode 100644 plugin/kubernetes/namespace_test.go diff --git a/plugin/kubernetes/README.md b/plugin/kubernetes/README.md index 2c7f9861e..35161cb82 100644 --- a/plugin/kubernetes/README.md +++ b/plugin/kubernetes/README.md @@ -56,6 +56,11 @@ kubernetes [ZONES...] { * `kubeconfig` **KUBECONFIG** **CONTEXT** authenticates the connection to a remote k8s cluster using a kubeconfig file. It supports TLS, username and password, or token-based authentication. This option is ignored if connecting in-cluster (i.e., the endpoint is not specified). * `namespaces` **NAMESPACE [NAMESPACE...]** only exposes the k8s namespaces listed. If this option is omitted all namespaces are exposed +* `namespace_labels` **EXPRESSION** only expose the records for Kubernetes namespaces that match this label selector. + The label selector syntax is described in the + [Kubernetes User Guide - Labels](http://kubernetes.io/docs/user-guide/labels/). An example that + only exposes namespaces labeled as "istio-injection=enabled", would use: + `labels istio-injection=enabled`. * `labels` **EXPRESSION** only exposes the records for Kubernetes objects that match this label selector. The label selector syntax is described in the [Kubernetes User Guide - Labels](https://kubernetes.io/docs/user-guide/labels/). An example that diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 1c2b7d8ad..41416ae0a 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -54,6 +54,7 @@ type dnsControl struct { client kubernetes.Interface selector labels.Selector + namespaceSelector labels.Selector svcController cache.Controller podController cache.Controller @@ -81,9 +82,12 @@ type dnsControlOpts struct { initEndpointsCache bool resyncPeriod time.Duration ignoreEmptyService bool + // Label handling. labelSelector *meta.LabelSelector selector labels.Selector + namespaceLabelSelector *meta.LabelSelector + namespaceSelector labels.Selector zones []string endpointNameMode bool @@ -94,6 +98,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns dns := dnsControl{ client: kubeClient, selector: opts.selector, + namespaceSelector: opts.namespaceSelector, stopCh: make(chan struct{}), zones: opts.zones, endpointNameMode: opts.endpointNameMode, @@ -140,10 +145,12 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns dns.nsLister, dns.nsController = cache.NewInformer( &cache.ListWatch{ - ListFunc: namespaceListFunc(dns.client, dns.selector), - WatchFunc: namespaceWatchFunc(dns.client, dns.selector), + ListFunc: namespaceListFunc(dns.client, dns.namespaceSelector), + WatchFunc: namespaceWatchFunc(dns.client, dns.namespaceSelector), }, - &api.Namespace{}, opts.resyncPeriod, cache.ResourceEventHandlerFuncs{}) + &api.Namespace{}, + opts.resyncPeriod, + cache.ResourceEventHandlerFuncs{}) return &dns } diff --git a/plugin/kubernetes/external.go b/plugin/kubernetes/external.go index 1770872b3..91a8a2ed1 100644 --- a/plugin/kubernetes/external.go +++ b/plugin/kubernetes/external.go @@ -30,7 +30,7 @@ func (k *Kubernetes) External(state request.Request) ([]msg.Service, int) { port := "*" protocol := "*" namespace := segs[last] - if !k.namespaceExposed(namespace) || !k.namespace(namespace) { + if !k.namespaceExposed(namespace) { return nil, dns.RcodeNameError } diff --git a/plugin/kubernetes/handler_test.go b/plugin/kubernetes/handler_test.go index 956d611af..e1a8212ca 100644 --- a/plugin/kubernetes/handler_test.go +++ b/plugin/kubernetes/handler_test.go @@ -2,6 +2,7 @@ package kubernetes import ( "context" + "fmt" "testing" "time" @@ -380,6 +381,59 @@ func TestServeDNS(t *testing.T) { } } +var nsTestCases = []test.Case{ + // A Service for an "exposed" namespace that "does exist" + { + Qname: "svc1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("svc1.testns.svc.cluster.local. 5 IN A 10.0.0.1"), + }, + }, + // A service for an "exposed" namespace that "doesn't exist" + { + Qname: "svc1.nsnoexist.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeNameError, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1551484803 7200 1800 86400 30"), + }, + }, +} + +func TestServeNamespaceDNS(t *testing.T) { + k := New([]string{"cluster.local."}) + k.APIConn = &APIConnServeTest{} + k.Next = test.NextHandler(dns.RcodeSuccess, nil) + // if no namespaces are explicitly exposed, then they are all implicitly exposed + k.Namespaces = map[string]struct{}{} + ctx := context.TODO() + + for i, tc := range nsTestCases { + r := tc.Msg() + + w := dnstest.NewRecorder(&test.ResponseWriter{}) + + _, err := k.ServeDNS(ctx, w, r) + if err != tc.Error { + t.Errorf("Test %d expected no error, got %v", i, err) + return + } + if tc.Error != nil { + continue + } + + resp := w.Msg + if resp == nil { + t.Fatalf("Test %d, got nil message and no error for %q", i, r.Question[0].Name) + } + + // Before sorting, make sure that CNAMES do not appear after their target records + test.CNAMEOrder(resp) + + test.SortAndCheck(resp, tc) + } +} + var notSyncedTestCases = []test.Case{ { // We should get ServerFailure instead of NameError for missing records when we kubernetes hasn't synced @@ -627,6 +681,9 @@ func (APIConnServeTest) GetNamespaceByName(name string) (*api.Namespace, error) if name == "pod-nons" { // handler_pod_verified_test.go uses this for non-existent namespace. return &api.Namespace{}, nil } + if name == "nsnoexist" { + return nil, fmt.Errorf("namespace not found") + } return &api.Namespace{ ObjectMeta: meta.ObjectMeta{ Name: name, diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index e9dcfb360..10f058c5a 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -217,6 +217,15 @@ func (k *Kubernetes) InitKubeCache() (err error) { k.opts.selector = selector } + if k.opts.namespaceLabelSelector != nil { + var selector labels.Selector + selector, err = meta.LabelSelectorAsSelector(k.opts.namespaceLabelSelector) + if err != nil { + return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err) + } + k.opts.namespaceSelector = selector + } + k.opts.initPodCache = k.podMode == podModeVerified k.opts.zones = k.Zones @@ -302,13 +311,15 @@ func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, } namespace := r.namespace + if !wildcard(namespace) && !k.namespaceExposed(namespace) { + return nil, errNoItems + } + podname := r.service - zonePath := msg.Path(zone, coredns) - ip := "" // handle empty pod name if podname == "" { - if k.namespace(namespace) || wildcard(namespace) { + if k.namespaceExposed(namespace) || wildcard(namespace) { // NODATA return nil, nil } @@ -316,6 +327,8 @@ func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, return nil, errNoItems } + zonePath := msg.Path(zone, coredns) + ip := "" if strings.Count(podname, "-") == 3 && !strings.Contains(podname, "--") { ip = strings.Replace(podname, "-", ".", -1) } else { @@ -323,7 +336,7 @@ func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, } if k.podMode == podModeInsecure { - if !wildcard(namespace) && !k.namespace(namespace) { // no wildcard, but namespace does not exist + if !wildcard(namespace) && !k.namespaceExposed(namespace) { // no wildcard, but namespace does not exist return nil, errNoItems } @@ -338,8 +351,8 @@ func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, // PodModeVerified err = errNoItems if wildcard(podname) && !wildcard(namespace) { - // If namespace exist, err should be nil, so that we return nodata instead of NXDOMAIN - if k.namespace(namespace) { + // If namespace exists, err should be nil, so that we return NODATA instead of NXDOMAIN + if k.namespaceExposed(namespace) { err = nil } } @@ -368,12 +381,24 @@ func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, // findServices returns the services matching r from the cache. func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.Service, err error) { - zonePath := msg.Path(zone, coredns) + if !wildcard(r.namespace) && !k.namespaceExposed(r.namespace) { + return nil, errNoItems + } + + // handle empty service name + if r.service == "" { + if k.namespaceExposed(r.namespace) || wildcard(r.namespace) { + // NODATA + return nil, nil + } + // NXDOMAIN + return nil, errNoItems + } err = errNoItems if wildcard(r.service) && !wildcard(r.namespace) { - // If namespace exist, err should be nil, so that we return nodata instead of NXDOMAIN - if k.namespace(r.namespace) { + // If namespace exists, err should be nil, so that we return NODATA instead of NXDOMAIN + if k.namespaceExposed(r.namespace) { err = nil } } @@ -384,16 +409,6 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg. serviceList []*object.Service ) - // handle empty service name - if r.service == "" { - if k.namespace(r.namespace) || wildcard(r.namespace) { - // NODATA - return nil, nil - } - // NXDOMAIN - return nil, errNoItems - } - if wildcard(r.service) || wildcard(r.namespace) { serviceList = k.APIConn.ServiceList() endpointsListFunc = func() []*object.Endpoints { return k.APIConn.EndpointsList() } @@ -403,12 +418,13 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg. endpointsListFunc = func() []*object.Endpoints { return k.APIConn.EpIndex(idx) } } + zonePath := msg.Path(zone, coredns) for _, svc := range serviceList { if !(match(r.namespace, svc.Namespace) && match(r.service, svc.Name)) { continue } - // If namespace has a wildcard, filter results against Corefile namespace list. + // If request namespace is a wildcard, filter results against Corefile namespace list. // (Namespaces without a wildcard were filtered before the call to this function.) if wildcard(r.namespace) && !k.namespaceExposed(svc.Namespace) { continue diff --git a/plugin/kubernetes/namespace.go b/plugin/kubernetes/namespace.go index 7dafc7ab3..6eab13867 100644 --- a/plugin/kubernetes/namespace.go +++ b/plugin/kubernetes/namespace.go @@ -1,20 +1,27 @@ package kubernetes -// namespace checks if namespace n exists in this cluster. This returns true -// even for non exposed namespaces, see namespaceExposed. -func (k *Kubernetes) namespace(n string) bool { - ns, err := k.APIConn.GetNamespaceByName(n) +// filteredNamespaceExists checks if namespace exists in this cluster +// according to any `namespace_labels` plugin configuration specified. +// Returns true even for namespaces not exposed by plugin configuration, +// see namespaceExposed. +func (k *Kubernetes) filteredNamespaceExists(namespace string) bool { + ns, err := k.APIConn.GetNamespaceByName(namespace) if err != nil { return false } - return ns.ObjectMeta.Name == n + return ns.ObjectMeta.Name == namespace } -// namespaceExposed returns true when the namespace is exposed. -func (k *Kubernetes) namespaceExposed(namespace string) bool { +// configuredNamespace returns true when the namespace is exposed through the plugin +// `namespaces` configuration. +func (k *Kubernetes) configuredNamespace(namespace string) bool { _, ok := k.Namespaces[namespace] if len(k.Namespaces) > 0 && !ok { return false } return true } + +func (k *Kubernetes) namespaceExposed(namespace string) bool { + return k.configuredNamespace(namespace) && k.filteredNamespaceExists(namespace) +} diff --git a/plugin/kubernetes/namespace_test.go b/plugin/kubernetes/namespace_test.go new file mode 100644 index 000000000..d7d2cbcce --- /dev/null +++ b/plugin/kubernetes/namespace_test.go @@ -0,0 +1,72 @@ +package kubernetes + +import ( + "testing" +) + +func TestFilteredNamespaceExists(t *testing.T) { + tests := []struct{ + expected bool + kubernetesNamespaces map[string]struct{} + testNamespace string + }{ + {true, map[string]struct{}{}, "foobar" }, + {false, map[string]struct{}{}, "nsnoexist" }, + } + + k := Kubernetes{} + k.APIConn = &APIConnServeTest{} + for i, test := range tests { + k.Namespaces = test.kubernetesNamespaces + actual := k.filteredNamespaceExists(test.testNamespace) + if actual != test.expected { + t.Errorf("Test %d failed. Filtered namespace %s was expected to exist", i, test.testNamespace) + } + } +} + +func TestNamespaceExposed(t *testing.T) { + tests := []struct{ + expected bool + kubernetesNamespaces map[string]struct{} + testNamespace string + }{ + {true, map[string]struct{}{ "foobar": {} }, "foobar" }, + {false, map[string]struct{}{ "foobar": {} }, "nsnoexist" }, + {true, map[string]struct{}{}, "foobar" }, + {true, map[string]struct{}{}, "nsnoexist" }, + } + + k := Kubernetes{} + k.APIConn = &APIConnServeTest{} + for i, test := range tests { + k.Namespaces = test.kubernetesNamespaces + actual := k.configuredNamespace(test.testNamespace) + if actual != test.expected { + t.Errorf("Test %d failed. Namespace %s was expected to be exposed", i, test.testNamespace) + } + } +} + +func TestNamespaceValid(t *testing.T) { + tests := []struct{ + expected bool + kubernetesNamespaces map[string]struct{} + testNamespace string + }{ + {true, map[string]struct{}{ "foobar": {} }, "foobar" }, + {false, map[string]struct{}{ "foobar": {} }, "nsnoexist" }, + {true, map[string]struct{}{}, "foobar" }, + {false, map[string]struct{}{}, "nsnoexist" }, + } + + k := Kubernetes{} + k.APIConn = &APIConnServeTest{} + for i, test := range tests { + k.Namespaces = test.kubernetesNamespaces + actual := k.namespaceExposed(test.testNamespace) + if actual != test.expected { + t.Errorf("Test %d failed. Namespace %s was expected to be valid", i, test.testNamespace) + } + } +} diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go index fe3cc2a0a..c38626e9b 100644 --- a/plugin/kubernetes/setup.go +++ b/plugin/kubernetes/setup.go @@ -234,6 +234,18 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) { continue } return nil, c.ArgErr() + case "namespace_labels": + args := c.RemainingArgs() + if len(args) > 0 { + namespaceLabelSelectorString := strings.Join(args, " ") + nls, err := meta.ParseToLabelSelector(namespaceLabelSelectorString) + if err != nil { + return nil, fmt.Errorf("unable to parse namespace_label selector value: '%v': %v", namespaceLabelSelectorString, err) + } + k8s.opts.namespaceLabelSelector = nls + continue + } + return nil, c.ArgErr() case "fallthrough": k8s.Fall.SetZonesFromArgs(c.RemainingArgs()) case "upstream": @@ -293,6 +305,10 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) { } } + if len(k8s.Namespaces) != 0 && k8s.opts.namespaceLabelSelector != nil { + return nil, c.Errf("namespaces and namespace_labels cannot both be set") + } + return k8s, nil } diff --git a/plugin/kubernetes/setup_test.go b/plugin/kubernetes/setup_test.go index ef8a493ae..e7af4e1e2 100644 --- a/plugin/kubernetes/setup_test.go +++ b/plugin/kubernetes/setup_test.go @@ -13,15 +13,16 @@ import ( func TestKubernetesParse(t *testing.T) { tests := []struct { - input string // Corefile data as string - shouldErr bool // true if test case is expected to produce an error. - expectedErrContent string // substring from the expected error. Empty for positive cases. - expectedZoneCount int // expected count of defined zones. - expectedNSCount int // expected count of namespaces. - expectedResyncPeriod time.Duration // expected resync period value - expectedLabelSelector string // expected label selector value - expectedPodMode string - expectedFallthrough fall.F + input string // Corefile data as string + shouldErr bool // true if test case is expected to produce an error. + expectedErrContent string // substring from the expected error. Empty for positive cases. + expectedZoneCount int // expected count of defined zones. + expectedNSCount int // expected count of namespaces. + expectedResyncPeriod time.Duration // expected resync period value + expectedLabelSelector string // expected label selector value + expectedNamespaceLabelSelector string // expected namespace label selector value + expectedPodMode string + expectedFallthrough fall.F }{ // positive { @@ -32,6 +33,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -43,6 +45,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -55,6 +58,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -68,6 +72,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -81,6 +86,7 @@ func TestKubernetesParse(t *testing.T) { 1, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -94,6 +100,7 @@ func TestKubernetesParse(t *testing.T) { 2, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -107,6 +114,7 @@ func TestKubernetesParse(t *testing.T) { 0, 30 * time.Second, "", + "", podModeDisabled, fall.Zero, }, @@ -120,6 +128,7 @@ func TestKubernetesParse(t *testing.T) { 0, 15 * time.Minute, "", + "", podModeDisabled, fall.Zero, }, @@ -133,6 +142,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "environment=prod", + "", podModeDisabled, fall.Zero, }, @@ -146,6 +156,36 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "application=nginx,environment in (production,qa,staging)", + "", + podModeDisabled, + fall.Zero, + }, + { + `kubernetes coredns.local { + namespace_labels istio-injection=enabled +}`, + false, + "", + 1, + 0, + defaultResyncPeriod, + "", + "istio-injection=enabled", + podModeDisabled, + fall.Zero, + }, + { + `kubernetes coredns.local { + namespaces foo bar + namespace_labels istio-injection=enabled +}`, + true, + "Error during parsing: namespaces and namespace_labels cannot both be set", + -1, + 0, + defaultResyncPeriod, + "", + "istio-injection=enabled", podModeDisabled, fall.Zero, }, @@ -163,6 +203,7 @@ func TestKubernetesParse(t *testing.T) { 2, 15 * time.Minute, "application=nginx,environment in (production,qa,staging)", + "", podModeDisabled, fall.Root, }, @@ -177,6 +218,7 @@ func TestKubernetesParse(t *testing.T) { -1, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -190,6 +232,7 @@ func TestKubernetesParse(t *testing.T) { -1, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -203,6 +246,7 @@ func TestKubernetesParse(t *testing.T) { 0, 0 * time.Minute, "", + "", podModeDisabled, fall.Zero, }, @@ -216,6 +260,7 @@ func TestKubernetesParse(t *testing.T) { 0, 0 * time.Second, "", + "", podModeDisabled, fall.Zero, }, @@ -229,6 +274,7 @@ func TestKubernetesParse(t *testing.T) { 0, 0 * time.Second, "", + "", podModeDisabled, fall.Zero, }, @@ -242,6 +288,7 @@ func TestKubernetesParse(t *testing.T) { 0, 0 * time.Second, "", + "", podModeDisabled, fall.Zero, }, @@ -255,6 +302,7 @@ func TestKubernetesParse(t *testing.T) { 0, 0 * time.Second, "", + "", podModeDisabled, fall.Zero, }, @@ -269,6 +317,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -283,6 +332,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "", + "", podModeInsecure, fall.Zero, }, @@ -297,6 +347,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "", + "", podModeVerified, fall.Zero, }, @@ -311,6 +362,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "", + "", podModeVerified, fall.Zero, }, @@ -325,6 +377,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.F{Zones: []string{"ip6.arpa.", "inaddr.arpa.", "foo.com."}}, }, @@ -339,6 +392,7 @@ func TestKubernetesParse(t *testing.T) { 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -352,6 +406,7 @@ kubernetes cluster.local`, 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -365,6 +420,7 @@ kubernetes cluster.local`, 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -378,6 +434,7 @@ kubernetes cluster.local`, 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, }, @@ -391,6 +448,7 @@ kubernetes cluster.local`, 0, defaultResyncPeriod, "", + "", podModeDisabled, fall.Zero, },