diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 7edd18ca9..a785a003d 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -128,33 +128,37 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts object.DefaultProcessor(object.ToService, nil), ) + podLister, podController := object.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + }, + &api.Pod{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, + cache.Indexers{podIPIndex: podIPIndexFunc}, + object.DefaultProcessor(object.ToPod, nil), + ) + dns.podLister = podLister if opts.initPodCache { - dns.podLister, dns.podController = object.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - }, - &api.Pod{}, - cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, - cache.Indexers{podIPIndex: podIPIndexFunc}, - object.DefaultProcessor(object.ToPod, nil), - ) + dns.podController = podController } + epLister, epController := object.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + }, + &discovery.EndpointSlice{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, + cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, + object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()), + ) + dns.epLock.Lock() + dns.epLister = epLister if opts.initEndpointsCache { - dns.epLock.Lock() - dns.epLister, dns.epController = object.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - }, - &discovery.EndpointSlice{}, - cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, - cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()), - ) - dns.epLock.Unlock() + dns.epController = epController } + dns.epLock.Unlock() dns.nsLister, dns.nsController = object.NewIndexerInformer( &cache.ListWatch{ @@ -561,8 +565,8 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) { } // GetNodeByName return the node by name. If nothing is found an error is -// returned. This query causes a roundtrip to the k8s API server, so use -// sparingly. Currently this is only used for Federation. +// returned. This query causes a round trip to the k8s API server, so use +// sparingly. Currently, this is only used for Federation. func (dns *dnsControl) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { v1node, err := dns.client.CoreV1().Nodes().Get(ctx, name, meta.GetOptions{}) return v1node, err diff --git a/plugin/kubernetes/controller_test.go b/plugin/kubernetes/controller_test.go index 8f906e0c8..c36ab6605 100644 --- a/plugin/kubernetes/controller_test.go +++ b/plugin/kubernetes/controller_test.go @@ -1,16 +1,19 @@ - package kubernetes +package kubernetes import ( "context" "net" "strconv" "testing" + "time" "github.com/coredns/coredns/plugin/kubernetes/object" + "github.com/coredns/coredns/plugin/pkg/dnstest" "github.com/coredns/coredns/plugin/test" "github.com/miekg/dns" api "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -25,23 +28,39 @@ func inc(ip net.IP) { } } -func BenchmarkController(b *testing.B) { +func kubernetesWithFakeClient(ctx context.Context, zone, cidr string, initEndpointsCache bool, svcType string) *Kubernetes { client := fake.NewSimpleClientset() dco := dnsControlOpts{ - zones: []string{"cluster.local."}, + zones: []string{zone}, + initEndpointsCache: initEndpointsCache, } - ctx := context.Background() controller := newdnsController(ctx, client, dco) - cidr := "10.0.0.0/19" // Add resources - generateEndpoints(cidr, client) - generateSvcs(cidr, "all", client) - m := new(dns.Msg) - m.SetQuestion("svc1.testns.svc.cluster.local.", dns.TypeA) + _, err := client.CoreV1().Namespaces().Create(ctx, &api.Namespace{ObjectMeta: meta.ObjectMeta{Name: "testns"}}, meta.CreateOptions{}) + if err != nil { + log.Fatal(err) + } + generateSvcs(cidr, svcType, client) + generateEndpointSlices(cidr, client) k := New([]string{"cluster.local."}) k.APIConn = controller + return k +} + +func BenchmarkController(b *testing.B) { + ctx := context.Background() + k := kubernetesWithFakeClient(ctx, "cluster.local.", "10.0.0.0/24", true, "all") + + go k.APIConn.Run() + defer k.APIConn.Stop() + for !k.APIConn.HasSynced() { + time.Sleep(time.Millisecond) + } + rw := &test.ResponseWriter{} + m := new(dns.Msg) + m.SetQuestion("svc1.testns.svc.cluster.local.", dns.TypeA) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -49,7 +68,47 @@ func BenchmarkController(b *testing.B) { } } -func generateEndpoints(cidr string, client kubernetes.Interface) { +func TestEndpointsDisabled(t *testing.T) { + ctx := context.Background() + k := kubernetesWithFakeClient(ctx, "cluster.local.", "10.0.0.0/30", false, "headless") + k.opts.initEndpointsCache = false + + go k.APIConn.Run() + defer k.APIConn.Stop() + for !k.APIConn.HasSynced() { + time.Sleep(time.Millisecond) + } + + rw := &dnstest.Recorder{ResponseWriter: &test.ResponseWriter{}} + m := new(dns.Msg) + m.SetQuestion("svc2.testns.svc.cluster.local.", dns.TypeA) + k.ServeDNS(ctx, rw, m) + if rw.Msg.Rcode != dns.RcodeNameError { + t.Errorf("Expected NXDOMAIN, got %v", dns.RcodeToString[rw.Msg.Rcode]) + } +} + +func TestEndpointsEnabled(t *testing.T) { + ctx := context.Background() + k := kubernetesWithFakeClient(ctx, "cluster.local.", "10.0.0.0/30", true, "headless") + k.opts.initEndpointsCache = true + + go k.APIConn.Run() + defer k.APIConn.Stop() + for !k.APIConn.HasSynced() { + time.Sleep(time.Millisecond) + } + + rw := &dnstest.Recorder{ResponseWriter: &test.ResponseWriter{}} + m := new(dns.Msg) + m.SetQuestion("svc2.testns.svc.cluster.local.", dns.TypeA) + k.ServeDNS(ctx, rw, m) + if rw.Msg.Rcode != dns.RcodeSuccess { + t.Errorf("Expected SUCCESS, got %v", dns.RcodeToString[rw.Msg.Rcode]) + } +} + +func generateEndpointSlices(cidr string, client kubernetes.Interface) { // https://groups.google.com/d/msg/golang-nuts/zlcYA4qk-94/TWRFHeXJCcYJ ip, ipnet, err := net.ParseCIDR(cidr) if err != nil { @@ -57,30 +116,36 @@ func generateEndpoints(cidr string, client kubernetes.Interface) { } count := 1 - ep := &api.Endpoints{ - Subsets: []api.EndpointSubset{{ - Ports: []api.EndpointPort{ - { - Port: 80, - Protocol: "tcp", - Name: "http", - }, + port := int32(80) + protocol := api.Protocol("tcp") + name := "http" + eps := &discovery.EndpointSlice{ + Ports: []discovery.EndpointPort{ + { + Port: &port, + Protocol: &protocol, + Name: &name, }, - }}, + }, ObjectMeta: meta.ObjectMeta{ Namespace: "testns", }, } ctx := context.TODO() for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { - ep.Subsets[0].Addresses = []api.EndpointAddress{ + hostname := "foo" + strconv.Itoa(count) + eps.Endpoints = []discovery.Endpoint{ { - IP: ip.String(), - Hostname: "foo" + strconv.Itoa(count), + Addresses: []string{ip.String()}, + Hostname: &hostname, }, } - ep.ObjectMeta.Name = "svc" + strconv.Itoa(count) - client.CoreV1().Endpoints("testns").Create(ctx, ep, meta.CreateOptions{}) + eps.ObjectMeta.Name = "svc" + strconv.Itoa(count) + eps.ObjectMeta.Labels = map[string]string{discovery.LabelServiceName: eps.ObjectMeta.Name} + _, err := client.DiscoveryV1().EndpointSlices("testns").Create(ctx, eps, meta.CreateOptions{}) + if err != nil { + log.Fatal(err) + } count++ } } @@ -144,7 +209,7 @@ func createHeadlessSvc(suffix int, client kubernetes.Interface, ip net.IP) { ctx := context.TODO() client.CoreV1().Services("testns").Create(ctx, &api.Service{ ObjectMeta: meta.ObjectMeta{ - Name: "hdls" + strconv.Itoa(suffix), + Name: "svc" + strconv.Itoa(suffix), Namespace: "testns", }, Spec: api.ServiceSpec{ @@ -157,7 +222,7 @@ func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) { ctx := context.TODO() client.CoreV1().Services("testns").Create(ctx, &api.Service{ ObjectMeta: meta.ObjectMeta{ - Name: "external" + strconv.Itoa(suffix), + Name: "svc" + strconv.Itoa(suffix), Namespace: "testns", }, Spec: api.ServiceSpec{