diff --git a/middleware/kubernetes/controller.go b/middleware/kubernetes/controller.go index 150cc843f..e6e79366a 100644 --- a/middleware/kubernetes/controller.go +++ b/middleware/kubernetes/controller.go @@ -1,6 +1,7 @@ package kubernetes import ( + "errors" "fmt" "log" "sync" @@ -24,6 +25,8 @@ type storeToNamespaceLister struct { cache.Store } +const podIPIndex = "PodIP" + // List lists all Namespaces in the store. func (s *storeToNamespaceLister) List() (ns api.NamespaceList, err error) { for _, m := range s.Store.List() { @@ -38,10 +41,12 @@ type dnsController struct { selector *labels.Selector svcController *cache.Controller + podController *cache.Controller nsController *cache.Controller epController *cache.Controller svcLister cache.StoreToServiceLister + podLister cache.StoreToPodLister nsLister storeToNamespaceLister epLister cache.StoreToEndpointsLister @@ -54,7 +59,7 @@ type dnsController struct { } // newDNSController creates a controller for CoreDNS. -func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Duration, lselector *labels.Selector) *dnsController { +func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Duration, lselector *labels.Selector, initPodCache bool) *dnsController { dns := dnsController{ client: kubeClient, selector: lselector, @@ -71,6 +76,18 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati cache.ResourceEventHandlerFuncs{}, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + if initPodCache { + dns.podLister.Indexer, dns.podController = cache.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: podListFunc(dns.client, namespace, dns.selector), + WatchFunc: podWatchFunc(dns.client, namespace, dns.selector), + }, + &api.Pod{}, // TODO replace with a lighter-weight custom struct + resyncPeriod, + cache.ResourceEventHandlerFuncs{}, + cache.Indexers{podIPIndex: podIPIndexFunc}) + } + dns.nsLister.Store, dns.nsController = cache.NewInformer( &cache.ListWatch{ ListFunc: namespaceListFunc(dns.client, dns.selector), @@ -88,6 +105,14 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati return &dns } +func podIPIndexFunc(obj interface{}) ([]string, error) { + p, ok := obj.(*api.Pod) + if !ok { + return nil, errors.New("obj was not an *api.Pod") + } + return []string{p.Status.PodIP}, nil +} + func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { return func(opts api.ListOptions) (runtime.Object, error) { if s != nil { @@ -107,6 +132,26 @@ func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) fun } } +func podListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { + return func(opts api.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = *s + } + listV1, err := c.Core().Pods(ns).List(opts) + + if err != nil { + return nil, err + } + var listAPI api.PodList + err = v1.Convert_v1_PodList_To_api_PodList(listV1, &listAPI, nil) + if err != nil { + return nil, err + } + + return &listAPI, err + } +} + func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) { if in.Type == watch.Error { return in, true @@ -121,6 +166,14 @@ func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) { return in, true } return watch.Event{Type: in.Type, Object: &apiObj}, true + case *v1.Pod: + var apiObj api.Pod + err := v1.Convert_v1_Pod_To_api_Pod(v1Obj, &apiObj, nil) + if err != nil { + log.Printf("[ERROR] Could not convert v1.Pod: %s", err) + return in, true + } + return watch.Event{Type: in.Type, Object: &apiObj}, true case *v1.Namespace: var apiObj api.Namespace err := v1.Convert_v1_Namespace_To_api_Namespace(v1Obj, &apiObj, nil) @@ -156,6 +209,20 @@ func serviceWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) fu } } +func podWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) { + return func(options api.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = *s + } + w, err := c.Core().Pods(ns).Watch(options) + + if err != nil { + return nil, err + } + return watch.Filter(w, v1ToAPIFilter), nil + } +} + func namespaceListFunc(c *kubernetes.Clientset, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { return func(opts api.ListOptions) (runtime.Object, error) { if s != nil { @@ -244,6 +311,9 @@ func (dns *dnsController) Run() { go dns.svcController.Run(dns.stopCh) go dns.nsController.Run(dns.stopCh) go dns.epController.Run(dns.stopCh) + if dns.podController != nil { + go dns.podController.Run(dns.stopCh) + } <-dns.stopCh } diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go index 9141cd40a..c59f98835 100644 --- a/middleware/kubernetes/kubernetes.go +++ b/middleware/kubernetes/kubernetes.go @@ -45,6 +45,7 @@ type Kubernetes struct { const ( PodModeDisabled = "disabled" // default. pod requests are ignored + PodModeVerified = "verified" // Pod requests are answered only if they exist PodModeInsecure = "insecure" // ALL pod requests are answered without verfying they exist DnsSchemaVersion = "1.0.0" // https://github.com/kubernetes/dns/blob/master/docs/specification.md ) @@ -197,7 +198,7 @@ func (k *Kubernetes) InitKubeCache() error { log.Printf("[INFO] Kubernetes middleware configured with the label selector '%s'. Only kubernetes objects matching this label selector will be exposed.", unversionedapi.FormatLabelSelector(k.LabelSelector)) } - k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector) + k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector, k.PodMode == PodModeVerified) return err } @@ -384,9 +385,30 @@ func (k *Kubernetes) findPods(namespace, podname string) (pods []pod, err error) return pods, nil } - // TODO: implement cache verified pod responses - return pods, nil + // PodModeVerified + objList, err := k.APIConn.podLister.Indexer.ByIndex(podIPIndex, ip) + if err != nil { + return nil, err + } + nsWildcard := symbolContainsWildcard(namespace) + for _, o := range objList { + p, ok := o.(*api.Pod) + if !ok { + return nil, errors.New("expected type *api.Pod") + } + // If namespace has a wildcard, filter results against Corefile namespace list. + if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(p.Namespace, k.Namespaces)) { + continue + } + // check for matching ip and namespace + if ip == p.Status.PodIP && symbolMatches(namespace, p.Namespace, nsWildcard) { + s := pod{name: podname, namespace: namespace, addr: ip} + pods = append(pods, s) + return pods, nil + } + } + return pods, nil } // Get retrieves matching data from the cache. diff --git a/middleware/kubernetes/setup.go b/middleware/kubernetes/setup.go index 0813ef7b3..7fd9804e0 100644 --- a/middleware/kubernetes/setup.go +++ b/middleware/kubernetes/setup.go @@ -88,10 +88,10 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) { args := c.RemainingArgs() if len(args) == 1 { switch args[0] { - case PodModeDisabled, PodModeInsecure: + case PodModeDisabled, PodModeInsecure, PodModeVerified: k8s.PodMode = args[0] default: - return nil, errors.New("pods must be one of: disabled, insecure") + return nil, errors.New("pods must be one of: disabled, verified, insecure") } continue } diff --git a/test/kubernetes_test.go b/test/kubernetes_test.go index 69bd6502c..cdb8add2b 100644 --- a/test/kubernetes_test.go +++ b/test/kubernetes_test.go @@ -242,6 +242,19 @@ var dnsTestCasesPodsInsecure = []test.Case{ }, } +var dnsTestCasesPodsVerified = []test.Case{ + { + Qname: "10-20-0-101.test-1.pod.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeNameError, + Answer: []dns.RR{}, + }, + { + Qname: "10-20-0-101.test-X.pod.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeNameError, + Answer: []dns.RR{}, + }, +} + func createTestServer(t *testing.T, corefile string) (*caddy.Instance, string) { server, err := CoreDNSServer(corefile) if err != nil { @@ -315,3 +328,15 @@ func TestKubernetesIntegrationPodsInsecure(t *testing.T) { ` doIntegrationTests(t, corefile, dnsTestCasesPodsInsecure) } + +func TestKubernetesIntegrationPodsVerified(t *testing.T) { + corefile := + `.:0 { + kubernetes cluster.local 0.0.10.in-addr.arpa { + endpoint http://localhost:8080 + namespaces test-1 + pods verified + } +` + doIntegrationTests(t, corefile, dnsTestCasesPodsVerified) +}