diff --git a/.travis/kubernetes/dns-test.yaml b/.travis/kubernetes/dns-test.yaml index 91140830f..1f5587ca3 100644 --- a/.travis/kubernetes/dns-test.yaml +++ b/.travis/kubernetes/dns-test.yaml @@ -91,6 +91,26 @@ spec: name: c-port protocol: UDP --- +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: de-d1 + namespace: test-1 +spec: + replicas: 2 + template: + metadata: + labels: + app: app-d + spec: + containers: + - name: app-d-c + image: gcr.io/google_containers/pause-amd64:3.0 + ports: + - containerPort: 1234 + name: c-port + protocol: UDP +--- apiVersion: v1 kind: Service metadata: @@ -149,3 +169,17 @@ spec: - name: c-port port: 1234 protocol: UDP +--- +apiVersion: v1 +kind: Service +metadata: + name: headless-svc + namespace: test-1 +spec: + selector: + app: app-d + clusterIP: None + ports: + - name: c-port + port: 1234 + protocol: UDP diff --git a/middleware/kubernetes/controller.go b/middleware/kubernetes/controller.go index e387b17fd..150cc843f 100644 --- a/middleware/kubernetes/controller.go +++ b/middleware/kubernetes/controller.go @@ -39,9 +39,11 @@ type dnsController struct { svcController *cache.Controller nsController *cache.Controller + epController *cache.Controller svcLister cache.StoreToServiceLister nsLister storeToNamespaceLister + epLister cache.StoreToEndpointsLister // stopLock is used to enforce only a single call to Stop is active. // Needed because we allow stopping through an http endpoint and @@ -76,6 +78,13 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati }, &api.Namespace{}, resyncPeriod, cache.ResourceEventHandlerFuncs{}) + dns.epLister.Store, dns.epController = cache.NewInformer( + &cache.ListWatch{ + ListFunc: endpointsListFunc(dns.client, namespace, dns.selector), + WatchFunc: endpointsWatchFunc(dns.client, namespace, dns.selector), + }, + &api.Endpoints{}, resyncPeriod, cache.ResourceEventHandlerFuncs{}) + return &dns } @@ -85,6 +94,7 @@ func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) fun opts.LabelSelector = *s } listV1, err := c.Core().Services(ns).List(opts) + if err != nil { return nil, err } @@ -119,6 +129,14 @@ func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) { return in, true } return watch.Event{Type: in.Type, Object: &apiObj}, true + case *v1.Endpoints: + var apiObj api.Endpoints + err := v1.Convert_v1_Endpoints_To_api_Endpoints(v1Obj, &apiObj, nil) + if err != nil { + log.Printf("[ERROR] Could not convert v1.Endpoint: %s", err) + return in, true + } + return watch.Event{Type: in.Type, Object: &apiObj}, true } log.Printf("[WARN] Unhandled v1 type in event: %v", in) @@ -169,6 +187,38 @@ func namespaceWatchFunc(c *kubernetes.Clientset, s *labels.Selector) func(option } } +func endpointsListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { + return func(opts api.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = *s + } + listV1, err := c.Core().Endpoints(ns).List(opts) + + if err != nil { + return nil, err + } + var listAPI api.EndpointsList + err = v1.Convert_v1_EndpointsList_To_api_EndpointsList(listV1, &listAPI, nil) + if err != nil { + return nil, err + } + return &listAPI, err + } +} + +func endpointsWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) { + return func(options api.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = *s + } + w, err := c.Core().Endpoints(ns).Watch(options) + if err != nil { + return nil, err + } + return watch.Filter(w, v1ToAPIFilter), nil + } +} + func (dns *dnsController) controllersInSync() bool { return dns.svcController.HasSynced() } @@ -193,6 +243,7 @@ func (dns *dnsController) Stop() error { func (dns *dnsController) Run() { go dns.svcController.Run(dns.stopCh) go dns.nsController.Run(dns.stopCh) + go dns.epController.Run(dns.stopCh) <-dns.stopCh } diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go index 78c14ee82..5ab11ca95 100644 --- a/middleware/kubernetes/kubernetes.go +++ b/middleware/kubernetes/kubernetes.go @@ -55,7 +55,7 @@ func (k *Kubernetes) Services(state request.Request, exact bool, opt middleware. } // PrimaryZone will return the first non-reverse zone being handled by this middleware -func (k *Kubernetes) PrimaryZone() (string) { +func (k *Kubernetes) PrimaryZone() string { return k.Zones[k.primaryZone] } @@ -228,15 +228,38 @@ func (k *Kubernetes) getRecordsForServiceItems(serviceItems []*api.Service, zone var records []msg.Service for _, item := range serviceItems { - clusterIP := item.Spec.ClusterIP - // Create records for each exposed port... key := k.NameTemplate.RecordNameFromNameValues(nametemplate.NameValues{TypeName: "svc", ServiceName: item.ObjectMeta.Name, Namespace: item.ObjectMeta.Namespace, Zone: zone}) + key = strings.Replace(key, ".", "/", -1) - for i, p := range item.Spec.Ports { - s := msg.Service{Key: msg.Path(strconv.Itoa(i)+"."+key, "coredns"), Host: clusterIP, Port: int(p.Port)} - records = append(records, s) + clusterIP := item.Spec.ClusterIP + if clusterIP == api.ClusterIPNone { + // This is a headless service, create records for each pod + epList, _ := k.APIConn.epLister.List() + for _, ep := range epList.Items { + if ep.ObjectMeta.Name == item.ObjectMeta.Name && ep.ObjectMeta.Namespace == item.ObjectMeta.Namespace { + for _, eps := range ep.Subsets { + for i, port := range eps.Ports { + for j, addr := range eps.Addresses { + refid := strconv.Itoa(j*1024 + i) + s := msg.Service{ + Key: msg.Path(strings.ToLower(refid+"._"+port.Name+"._"+string(port.Protocol)+"."+key), "coredns"), + Host: addr.IP, Port: int(port.Port), + } + records = append(records, s) + } + } + } + } + } + } else { + // Create records for each exposed port... + + for _, p := range item.Spec.Ports { + s := msg.Service{Key: msg.Path(strings.ToLower("_"+p.Name+"._"+string(p.Protocol)+"."+key), "coredns"), Host: clusterIP, Port: int(p.Port)} + records = append(records, s) + } } } @@ -265,6 +288,7 @@ func (k *Kubernetes) getServices(namespace string, nsWildcard bool, servicename if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(item.Namespace, k.Namespaces)) { continue } + resultItems = append(resultItems, item) } } diff --git a/test/kubernetes_test.go b/test/kubernetes_test.go index 68ec319cf..215b8191c 100644 --- a/test/kubernetes_test.go +++ b/test/kubernetes_test.go @@ -59,6 +59,8 @@ var dnsTestCases = []test.Case{ test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"), test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"), test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"), + test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"), + test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"), }, }, { @@ -68,6 +70,8 @@ var dnsTestCases = []test.Case{ test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"), test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"), test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"), + test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"), + test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"), }, }, { @@ -87,6 +91,16 @@ var dnsTestCases = []test.Case{ test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"), test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"), test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"), + test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"), + test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"), + }, + }, + { + Qname: "headless-svc.test-1.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"), + test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"), }, }, //TODO: Fix below to all use test.SRV not test.A! @@ -137,6 +151,8 @@ var dnsTestCases = []test.Case{ test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."), test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."), test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."), + test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), + test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), }, }, { @@ -147,6 +163,8 @@ var dnsTestCases = []test.Case{ test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."), test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."), test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."), + test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), + test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), }, }, { @@ -167,6 +185,8 @@ var dnsTestCases = []test.Case{ test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."), test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."), test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."), + test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), + test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), }, }, { @@ -223,6 +243,7 @@ func TestKubernetesIntegration(t *testing.T) { time.Sleep(5 * time.Second) for _, tc := range dnsTestCases { + dnsClient := new(dns.Client) dnsMessage := new(dns.Msg)