diff --git a/.circleci/config.yml b/.circleci/config.yml index 7b6bfb56e..cd9c2b2f6 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -21,8 +21,8 @@ integrationDefaults: &integrationDefaults image: ubuntu-1604:201903-01 working_directory: ~/go/src/${CIRCLE_PROJECT_USERNAME}/coredns environment: - - K8S_VERSION: v1.18.2 - - KIND_VERSION: v0.8.1 + - K8S_VERSION: v1.19.1 + - KIND_VERSION: v0.9.0 - KUBECONFIG: /home/circleci/.kube/kind-config-kind setupKubernetes: &setupKubernetes diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 90a005177..f9373eee9 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -11,16 +11,18 @@ import ( "github.com/coredns/coredns/plugin/kubernetes/object" api "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" ) const ( podIPIndex = "PodIP" - svcNameNamespaceIndex = "NameNamespace" + svcNameNamespaceIndex = "ServiceNameNamespace" svcIPIndex = "ServiceIP" epNameNamespaceIndex = "EndpointNameNamespace" epIPIndex = "EndpointsIP" @@ -81,6 +83,7 @@ type dnsControl struct { type dnsControlOpts struct { initPodCache bool initEndpointsCache bool + useEndpointSlices bool ignoreEmptyService bool // Label handling. @@ -130,15 +133,31 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts } if opts.initEndpointsCache { + var ( + apiObj runtime.Object + listWatch cache.ListWatch + to func(bool) object.ToFunc + latency object.RecordLatencyFunc + ) + if opts.useEndpointSlices { + apiObj = &discovery.EndpointSlice{} + listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + to = object.EndpointSliceToEndpoints + latency = dns.recordEndpointSliceDNSProgrammingLatency + } else { + apiObj = &api.Endpoints{} + listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + to = object.ToEndpoints + latency = dns.recordEndpointDNSProgrammingLatency + } dns.epLister, dns.epController = object.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - }, - &api.Endpoints{}, + &listWatch, + apiObj, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(object.ToEndpoints(opts.skipAPIObjectsCleanup), dns.recordDNSProgrammingLatency), + object.DefaultProcessor(to(opts.skipAPIObjectsCleanup), latency), ) } @@ -154,8 +173,12 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts return &dns } -func (dns *dnsControl) recordDNSProgrammingLatency(obj meta.Object) { - recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj.(*api.Endpoints)) +func (dns *dnsControl) recordEndpointDNSProgrammingLatency(obj meta.Object) { + recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj) +} + +func (dns *dnsControl) recordEndpointSliceDNSProgrammingLatency(obj meta.Object) { + recordDNSProgrammingLatency(dns.SvcIndex(object.ServiceKey(obj.GetLabels()[discovery.LabelServiceName], obj.GetNamespace())), obj) } func podIPIndexFunc(obj interface{}) ([]string, error) { @@ -207,8 +230,7 @@ func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s l if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Services(ns).List(ctx, opts) - return listV1, err + return c.CoreV1().Services(ns).List(ctx, opts) } } @@ -221,8 +243,16 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label opts.FieldSelector = opts.FieldSelector + "," } opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" - listV1, err := c.CoreV1().Pods(ns).List(ctx, opts) - return listV1, err + return c.CoreV1().Pods(ns).List(ctx, opts) + } +} + +func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { + return func(opts meta.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = s.String() + } + return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts) } } @@ -231,8 +261,7 @@ func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Endpoints(ns).List(ctx, opts) - return listV1, err + return c.CoreV1().Endpoints(ns).List(ctx, opts) } } @@ -241,8 +270,56 @@ func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Sel if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Namespaces().List(ctx, opts) - return listV1, err + return c.CoreV1().Namespaces().List(ctx, opts) + } +} + +func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.CoreV1().Services(ns).Watch(ctx, options) + } +} + +func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + if len(options.FieldSelector) > 0 { + options.FieldSelector = options.FieldSelector + "," + } + options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" + return c.CoreV1().Pods(ns).Watch(ctx, options) + } +} + +func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.DiscoveryV1beta1().EndpointSlices(ns).Watch(ctx, options) + } +} + +func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.CoreV1().Endpoints(ns).Watch(ctx, options) + } +} + +func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.CoreV1().Namespaces().Watch(ctx, options) } } @@ -442,7 +519,7 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) { } func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service { - return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace())) + return dns.SvcIndex(object.ServiceKey(endpoints.GetName(), endpoints.GetNamespace())) } // subsetsEquivalent checks if two endpoint subsets are significantly equivalent diff --git a/plugin/kubernetes/handler_test.go b/plugin/kubernetes/handler_test.go index 12ebc2765..f5630ccdd 100644 --- a/plugin/kubernetes/handler_test.go +++ b/plugin/kubernetes/handler_test.go @@ -643,8 +643,9 @@ var epsIndex = map[string][]*object.Endpoints{ }, }, }, - Name: "svc1", + Name: "svc1-slice1", Namespace: "testns", + Index: object.EndpointsKey("svc1", "testns"), }}, "svcempty.testns": {{ Subsets: []object.EndpointSubset{ @@ -655,8 +656,9 @@ var epsIndex = map[string][]*object.Endpoints{ }, }, }, - Name: "svcempty", + Name: "svcempty-slice1", Namespace: "testns", + Index: object.EndpointsKey("svcempty", "testns"), }}, "hdls1.testns": {{ Subsets: []object.EndpointSubset{ @@ -674,8 +676,9 @@ var epsIndex = map[string][]*object.Endpoints{ }, }, }, - Name: "hdls1", + Name: "hdls1-slice1", Namespace: "testns", + Index: object.EndpointsKey("hdls1", "testns"), }}, "hdlsprtls.testns": {{ Subsets: []object.EndpointSubset{ @@ -686,8 +689,9 @@ var epsIndex = map[string][]*object.Endpoints{ Ports: []object.EndpointPort{{Port: -1}}, }, }, - Name: "hdlsprtls", + Name: "hdlsprtls-slice1", Namespace: "testns", + Index: object.EndpointsKey("hdlsprtls", "testns"), }}, } diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index 83b4b02d1..991323e76 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net" + "strconv" "strings" "github.com/coredns/coredns/plugin" @@ -18,6 +19,7 @@ import ( "github.com/miekg/dns" api "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" @@ -244,6 +246,20 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) { k.opts.zones = k.Zones k.opts.endpointNameMode = k.endpointNameMode + // Enable use of endpoint slices if the API supports the discovery v1 beta1 api + if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil { + k.opts.useEndpointSlices = true + } + // Disable use of endpoint slices for k8s versions 1.18 and earlier. Endpoint slices were + // introduced in 1.17 but EndpointSliceMirroring was not added until 1.19. + sv, _ := kubeClient.ServerVersion() + major, _ := strconv.Atoi(sv.Major) + minor, _ := strconv.Atoi(sv.Minor) + if k.opts.useEndpointSlices && major <= 1 && minor <= 18 { + log.Info("watching Endpoints instead of EndpointSlices in k8s versions < 1.19") + k.opts.useEndpointSlices = false + } + k.APIConn = newdnsController(ctx, kubeClient, k.opts) return err @@ -433,8 +449,9 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg. if endpointsList == nil { endpointsList = endpointsListFunc() } + for _, ep := range endpointsList { - if ep.Name != svc.Name || ep.Namespace != svc.Namespace { + if object.EndpointsKey(svc.Name, svc.Namespace) != ep.Index { continue } diff --git a/plugin/kubernetes/kubernetes_test.go b/plugin/kubernetes/kubernetes_test.go index eddb7645d..7458a3bdc 100644 --- a/plugin/kubernetes/kubernetes_test.go +++ b/plugin/kubernetes/kubernetes_test.go @@ -137,8 +137,9 @@ func (APIConnServiceTest) EpIndex(string) []*object.Endpoints { }, }, }, - Name: "svc1", + Name: "svc1-slice1", Namespace: "testns", + Index: object.EndpointsKey("svc1", "testns"), }, { Subsets: []object.EndpointSubset{ @@ -151,22 +152,9 @@ func (APIConnServiceTest) EpIndex(string) []*object.Endpoints { }, }, }, - Name: "hdls1", - Namespace: "testns", - }, - { - Subsets: []object.EndpointSubset{ - { - Addresses: []object.EndpointAddress{ - {IP: "172.0.0.3"}, - }, - Ports: []object.EndpointPort{ - {Port: 80, Protocol: "tcp", Name: "http"}, - }, - }, - }, - Name: "hdls1", + Name: "hdls1-slice1", Namespace: "testns", + Index: object.EndpointsKey("hdls1", "testns"), }, { Subsets: []object.EndpointSubset{ @@ -194,8 +182,9 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints { }, }, }, - Name: "svc1", + Name: "svc1-slice1", Namespace: "testns", + Index: object.EndpointsKey("svc1", "testns"), }, { Subsets: []object.EndpointSubset{ @@ -208,22 +197,24 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints { }, }, }, - Name: "hdls1", + Name: "hdls1-slice1", Namespace: "testns", + Index: object.EndpointsKey("hdls1", "testns"), }, { Subsets: []object.EndpointSubset{ { Addresses: []object.EndpointAddress{ - {IP: "172.0.0.3"}, + {IP: "172.0.0.2"}, }, Ports: []object.EndpointPort{ {Port: 80, Protocol: "tcp", Name: "http"}, }, }, }, - Name: "hdls1", + Name: "hdls1-slice2", Namespace: "testns", + Index: object.EndpointsKey("hdls1", "testns"), }, { Subsets: []object.EndpointSubset{ @@ -275,6 +266,9 @@ func TestServices(t *testing.T) { // External Services {qname: "external.testns.svc.interwebs.test.", qtype: dns.TypeCNAME, answer: svcAns{host: "coredns.io", key: "/" + coredns + "/test/interwebs/svc/testns/external"}}, + + // Headless Services + {qname: "hdls1.testns.svc.interwebs.test.", qtype: dns.TypeA, answer: svcAns{host: "172.0.0.2", key: "/" + coredns + "/test/interwebs/svc/testns/hdls1/172-0-0-2"}}, } for i, test := range tests { diff --git a/plugin/kubernetes/metrics.go b/plugin/kubernetes/metrics.go index 44220a6f7..8adeb6940 100644 --- a/plugin/kubernetes/metrics.go +++ b/plugin/kubernetes/metrics.go @@ -5,10 +5,10 @@ import ( "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/kubernetes/object" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) var ( @@ -37,11 +37,11 @@ var ( durationSinceFunc = time.Since ) -func recordDNSProgrammingLatency(svcs []*object.Service, endpoints *api.Endpoints) { +func recordDNSProgrammingLatency(svcs []*object.Service, endpoints meta.Object) { // getLastChangeTriggerTime is the time.Time value of the EndpointsLastChangeTriggerTime // annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set var lastChangeTriggerTime time.Time - stringVal, ok := endpoints.Annotations[api.EndpointsLastChangeTriggerTime] + stringVal, ok := endpoints.GetAnnotations()[api.EndpointsLastChangeTriggerTime] if ok { ts, err := time.Parse(time.RFC3339Nano, stringVal) if err != nil { diff --git a/plugin/kubernetes/metrics_test.go b/plugin/kubernetes/metrics_test.go index 0ab6f3c20..43b5ca382 100644 --- a/plugin/kubernetes/metrics_test.go +++ b/plugin/kubernetes/metrics_test.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" api "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -20,15 +21,92 @@ const ( namespace = "testns" ) -func TestDNSProgrammingLatency(t *testing.T) { +var expected = ` + # HELP coredns_kubernetes_dns_programming_duration_seconds Histogram of the time (in seconds) it took to program a dns instance. + # TYPE coredns_kubernetes_dns_programming_duration_seconds histogram + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2 + coredns_kubernetes_dns_programming_duration_seconds_sum{service_kind="headless_with_selector"} 3 + coredns_kubernetes_dns_programming_duration_seconds_count{service_kind="headless_with_selector"} 2 + ` + +func TestDNSProgrammingLatencyEndpointSlices(t *testing.T) { client := fake.NewSimpleClientset() now := time.Now() ctx := context.TODO() controller := newdnsController(ctx, client, dnsControlOpts{ initEndpointsCache: true, + useEndpointSlices: true, // This is needed as otherwise the fake k8s client doesn't work properly. skipAPIObjectsCleanup: true, }) + + durationSinceFunc = func(t time.Time) time.Duration { + return now.Sub(t) + } + DNSProgrammingLatency.Reset() + go controller.Run() + + endpoints1 := []discovery.Endpoint{{ + Addresses: []string{"1.2.3.4"}, + }} + + endpoints2 := []discovery.Endpoint{{ + Addresses: []string{"1.2.3.45"}, + }} + + createService(t, client, controller, "my-service", api.ClusterIPNone) + createEndpointSlice(t, client, "my-service", now.Add(-2*time.Second), endpoints1) + updateEndpointSlice(t, client, "my-service", now.Add(-1*time.Second), endpoints2) + + createEndpointSlice(t, client, "endpoints-no-service", now.Add(-4*time.Second), nil) + + createService(t, client, controller, "clusterIP-service", "10.40.0.12") + createEndpointSlice(t, client, "clusterIP-service", now.Add(-8*time.Second), nil) + + createService(t, client, controller, "headless-no-annotation", api.ClusterIPNone) + createEndpointSlice(t, client, "headless-no-annotation", nil, nil) + + createService(t, client, controller, "headless-wrong-annotation", api.ClusterIPNone) + createEndpointSlice(t, client, "headless-wrong-annotation", "wrong-value", nil) + + controller.Stop() + + if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil { + t.Error(err) + } +} + +func TestDnsProgrammingLatencyEndpoints(t *testing.T) { + client := fake.NewSimpleClientset() + now := time.Now() + ctx := context.TODO() + controller := newdnsController(ctx, client, dnsControlOpts{ + initEndpointsCache: true, + useEndpointSlices: false, + // This is needed as otherwise the fake k8s client doesn't work properly. + skipAPIObjectsCleanup: true, + }) + durationSinceFunc = func(t time.Time) time.Duration { return now.Sub(t) } @@ -59,33 +137,7 @@ func TestDNSProgrammingLatency(t *testing.T) { createEndpoints(t, client, "headless-wrong-annotation", "wrong-value", nil) controller.Stop() - expected := ` - # HELP coredns_kubernetes_dns_programming_duration_seconds Histogram of the time (in seconds) it took to program a dns instance. - # TYPE coredns_kubernetes_dns_programming_duration_seconds histogram - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2 - coredns_kubernetes_dns_programming_duration_seconds_sum{service_kind="headless_with_selector"} 3 - coredns_kubernetes_dns_programming_duration_seconds_count{service_kind="headless_with_selector"} 2 - ` + if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil { t.Error(err) } @@ -105,6 +157,24 @@ func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []ap } } +func buildEndpointSlice(name string, lastChangeTriggerTime interface{}, endpoints []discovery.Endpoint) *discovery.EndpointSlice { + annotations := make(map[string]string) + switch v := lastChangeTriggerTime.(type) { + case string: + annotations[api.EndpointsLastChangeTriggerTime] = v + case time.Time: + annotations[api.EndpointsLastChangeTriggerTime] = v.Format(time.RFC3339Nano) + } + return &discovery.EndpointSlice{ + ObjectMeta: meta.ObjectMeta{ + Namespace: namespace, Name: name + "-12345", + Labels: map[string]string{discovery.LabelServiceName: name}, + Annotations: annotations, + }, + Endpoints: endpoints, + } +} + func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { ctx := context.TODO() _, err := client.CoreV1().Endpoints(namespace).Create(ctx, buildEndpoints(name, triggerTime, subsets), meta.CreateOptions{}) @@ -121,11 +191,27 @@ func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, tri } } -func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIP string) { +func createEndpointSlice(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, endpoints []discovery.Endpoint) { + ctx := context.TODO() + _, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(ctx, buildEndpointSlice(name, triggerTime, endpoints), meta.CreateOptions{}) + if err != nil { + t.Fatal(err) + } +} + +func updateEndpointSlice(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, endpoints []discovery.Endpoint) { + ctx := context.TODO() + _, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Update(ctx, buildEndpointSlice(name, triggerTime, endpoints), meta.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } +} + +func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) { ctx := context.TODO() if _, err := client.CoreV1().Services(namespace).Create(ctx, &api.Service{ ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name}, - Spec: api.ServiceSpec{ClusterIP: clusterIP}, + Spec: api.ServiceSpec{ClusterIP: clusterIp}, }, meta.CreateOptions{}); err != nil { t.Fatal(err) } diff --git a/plugin/kubernetes/ns.go b/plugin/kubernetes/ns.go index 2d4bc398a..ba687cb13 100644 --- a/plugin/kubernetes/ns.go +++ b/plugin/kubernetes/ns.go @@ -4,7 +4,6 @@ import ( "net" "strings" - "github.com/coredns/coredns/plugin/kubernetes/object" "github.com/miekg/dns" api "k8s.io/api/core/v1" ) @@ -27,7 +26,7 @@ func (k *Kubernetes) nsAddrs(external bool, zone string) []dns.RR { // Collect IPs for all Services of the Endpoints for _, endpoint := range endpoints { - svcs := k.APIConn.SvcIndex(object.ServiceKey(endpoint.Name, endpoint.Namespace)) + svcs := k.APIConn.SvcIndex(endpoint.Index) for _, svc := range svcs { if external { svcName := strings.Join([]string{svc.Name, svc.Namespace, zone}, ".") diff --git a/plugin/kubernetes/ns_test.go b/plugin/kubernetes/ns_test.go index 0dc55f489..682303104 100644 --- a/plugin/kubernetes/ns_test.go +++ b/plugin/kubernetes/ns_test.go @@ -61,43 +61,28 @@ func (APIConnTest) EpIndexReverse(ip string) []*object.Endpoints { } eps := []*object.Endpoints{ { - Subsets: []object.EndpointSubset{ - { - Addresses: []object.EndpointAddress{ - { - IP: "10.244.0.20", - }, - }, - }, - }, - Name: "dns-service", + Name: "dns-service-slice1", Namespace: "kube-system", + Index: object.EndpointsKey("dns-service", "kube-system"), + Subsets: []object.EndpointSubset{ + {Addresses: []object.EndpointAddress{{IP: "10.244.0.20"}}}, + }, }, { - Subsets: []object.EndpointSubset{ - { - Addresses: []object.EndpointAddress{ - { - IP: "10.244.0.20", - }, - }, - }, - }, - Name: "hdls-dns-service", + Name: "hdls-dns-service-slice1", Namespace: "kube-system", + Index: object.EndpointsKey("hdls-dns-service", "kube-system"), + Subsets: []object.EndpointSubset{ + {Addresses: []object.EndpointAddress{{IP: "10.244.0.20"}}}, + }, }, { - Subsets: []object.EndpointSubset{ - { - Addresses: []object.EndpointAddress{ - { - IP: "10.244.0.20", - }, - }, - }, - }, - Name: "dns6-service", + Name: "dns6-service-slice1", Namespace: "kube-system", + Index: object.EndpointsKey("dns6-service", "kube-system"), + Subsets: []object.EndpointSubset{ + {Addresses: []object.EndpointAddress{{IP: "10.244.0.20"}}}, + }, }, } return eps diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index d4c495861..304aaa861 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -4,6 +4,7 @@ import ( "fmt" api "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/runtime" ) @@ -56,6 +57,17 @@ func ToEndpoints(skipCleanup bool) ToFunc { } } +// EndpointSliceToEndpoints returns a function that converts an *discovery.EndpointSlice to a *Endpoints. +func EndpointSliceToEndpoints(skipCleanup bool) ToFunc { + return func(obj interface{}) (interface{}, error) { + eps, ok := obj.(*discovery.EndpointSlice) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } + return endpointSliceToEndpoints(skipCleanup, eps), nil + } +} + // toEndpoints converts an *api.Endpoints to a *Endpoints. func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { e := &Endpoints{ @@ -108,6 +120,49 @@ func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { return e } +// endpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. +func endpointSliceToEndpoints(skipCleanup bool, ends *discovery.EndpointSlice) *Endpoints { + e := &Endpoints{ + Version: ends.GetResourceVersion(), + Name: ends.GetName(), + Namespace: ends.GetNamespace(), + Index: EndpointsKey(ends.Labels[discovery.LabelServiceName], ends.GetNamespace()), + Subsets: make([]EndpointSubset, 1), + } + + if len(ends.Ports) == 0 { + // Add sentinel if there are no ports. + e.Subsets[0].Ports = []EndpointPort{{Port: -1}} + } else { + e.Subsets[0].Ports = make([]EndpointPort, len(ends.Ports)) + for k, p := range ends.Ports { + ep := EndpointPort{Port: *p.Port, Name: *p.Name, Protocol: string(*p.Protocol)} + e.Subsets[0].Ports[k] = ep + } + } + + for _, end := range ends.Endpoints { + for _, a := range end.Addresses { + ea := EndpointAddress{IP: a} + if end.Hostname != nil { + ea.Hostname = *end.Hostname + } + if end.TargetRef != nil { + ea.TargetRefName = end.TargetRef.Name + } + // EndpointSlice does not contain NodeName, leave blank + e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, ea) + e.IndexIP = append(e.IndexIP, a) + } + } + + if !skipCleanup { + *ends = discovery.EndpointSlice{} + } + + return e +} + // CopyWithoutSubsets copies e, without the subsets. func (e *Endpoints) CopyWithoutSubsets() *Endpoints { e1 := &Endpoints{ diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go index f37af4796..afd134e56 100644 --- a/plugin/kubernetes/object/informer.go +++ b/plugin/kubernetes/object/informer.go @@ -21,10 +21,11 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache. return clientState, cache.New(cfg) } -type recordLatencyFunc func(meta.Object) +// RecordLatencyFunc is a function for recording api object delta latency +type RecordLatencyFunc func(meta.Object) // DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion. -func DefaultProcessor(convert ToFunc, recordLatency recordLatencyFunc) ProcessorBuilder { +func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) ProcessorBuilder { return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { return func(obj interface{}) error { for _, d := range obj.(cache.Deltas) { diff --git a/plugin/kubernetes/reverse.go b/plugin/kubernetes/reverse.go index b80a91fc2..2a5c5cdce 100644 --- a/plugin/kubernetes/reverse.go +++ b/plugin/kubernetes/reverse.go @@ -46,7 +46,7 @@ func (k *Kubernetes) serviceRecordForIP(ip, name string) []msg.Service { for _, eps := range ep.Subsets { for _, addr := range eps.Addresses { if addr.IP == ip { - domain := strings.Join([]string{endpointHostname(addr, k.endpointNameMode), ep.Name, ep.Namespace, Svc, k.primaryZone()}, ".") + domain := strings.Join([]string{endpointHostname(addr, k.endpointNameMode), ep.Index, Svc, k.primaryZone()}, ".") svcs = append(svcs, msg.Service{Host: domain, TTL: k.ttl}) } } diff --git a/plugin/kubernetes/reverse_test.go b/plugin/kubernetes/reverse_test.go index 5869d34bc..3b3b6872b 100644 --- a/plugin/kubernetes/reverse_test.go +++ b/plugin/kubernetes/reverse_test.go @@ -56,14 +56,11 @@ func (APIConnReverseTest) SvcIndexReverse(ip string) []*object.Service { } func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints { - ep1 := object.Endpoints{ + ep1s1 := object.Endpoints{ Subsets: []object.EndpointSubset{ { Addresses: []object.EndpointAddress{ {IP: "10.0.0.100", Hostname: "ep1a"}, - {IP: "1234:abcd::1", Hostname: "ep1b"}, - {IP: "fd00:77:30::a", Hostname: "ip6svc1ex"}, - {IP: "fd00:77:30::2:9ba6", Hostname: "ip6svc1in"}, {IP: "10.0.0.99", Hostname: "double-ep"}, // this endpoint is used by two services }, Ports: []object.EndpointPort{ @@ -71,8 +68,41 @@ func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints { }, }, }, - Name: "svc1", + Name: "svc1-slice1", Namespace: "testns", + Index: object.EndpointsKey("svc1", "testns"), + } + ep1s2 := object.Endpoints{ + Subsets: []object.EndpointSubset{ + { + Addresses: []object.EndpointAddress{ + {IP: "1234:abcd::1", Hostname: "ep1b"}, + {IP: "fd00:77:30::a", Hostname: "ip6svc1ex"}, + {IP: "fd00:77:30::2:9ba6", Hostname: "ip6svc1in"}, + }, + Ports: []object.EndpointPort{ + {Port: 80, Protocol: "tcp", Name: "http"}, + }, + }, + }, + Name: "svc1-slice2", + Namespace: "testns", + Index: object.EndpointsKey("svc1", "testns"), + } + ep1s3 := object.Endpoints{ + Subsets: []object.EndpointSubset{ + { + Addresses: []object.EndpointAddress{ + {IP: "10.0.0.100", Hostname: "ep1a"}, // duplicate endpointslice address + }, + Ports: []object.EndpointPort{ + {Port: 80, Protocol: "tcp", Name: "http"}, + }, + }, + }, + Name: "svc1-ccccc", + Namespace: "testns", + Index: object.EndpointsKey("svc1", "testns"), } ep2 := object.Endpoints{ Subsets: []object.EndpointSubset{ @@ -85,20 +115,21 @@ func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints { }, }, }, - Name: "svc2", + Name: "svc2-slice1", Namespace: "testns", + Index: object.EndpointsKey("svc2", "testns"), } switch ip { - case "10.0.0.100": - fallthrough case "1234:abcd::1": fallthrough case "fd00:77:30::a": fallthrough case "fd00:77:30::2:9ba6": - return []*object.Endpoints{&ep1} - case "10.0.0.99": - return []*object.Endpoints{&ep1, &ep2} + return []*object.Endpoints{&ep1s2} + case "10.0.0.100": // two EndpointSlices for a Service contain this IP (EndpointSlice skew) + return []*object.Endpoints{&ep1s1, &ep1s3} + case "10.0.0.99": // two different Services select this IP + return []*object.Endpoints{&ep1s1, &ep2} } return nil } diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go index 03bbc6330..700a7421e 100644 --- a/plugin/kubernetes/setup.go +++ b/plugin/kubernetes/setup.go @@ -113,6 +113,7 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) { opts := dnsControlOpts{ initEndpointsCache: true, + useEndpointSlices: false, ignoreEmptyService: false, } k8s.opts = opts diff --git a/plugin/kubernetes/watch.go b/plugin/kubernetes/watch.go deleted file mode 100644 index d15ed4cf9..000000000 --- a/plugin/kubernetes/watch.go +++ /dev/null @@ -1,54 +0,0 @@ -package kubernetes - -import ( - "context" - - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" -) - -func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { - return func(options meta.ListOptions) (watch.Interface, error) { - if s != nil { - options.LabelSelector = s.String() - } - w, err := c.CoreV1().Services(ns).Watch(ctx, options) - return w, err - } -} - -func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { - return func(options meta.ListOptions) (watch.Interface, error) { - if s != nil { - options.LabelSelector = s.String() - } - if len(options.FieldSelector) > 0 { - options.FieldSelector = options.FieldSelector + "," - } - options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" - w, err := c.CoreV1().Pods(ns).Watch(ctx, options) - return w, err - } -} - -func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { - return func(options meta.ListOptions) (watch.Interface, error) { - if s != nil { - options.LabelSelector = s.String() - } - w, err := c.CoreV1().Endpoints(ns).Watch(ctx, options) - return w, err - } -} - -func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { - return func(options meta.ListOptions) (watch.Interface, error) { - if s != nil { - options.LabelSelector = s.String() - } - w, err := c.CoreV1().Namespaces().Watch(ctx, options) - return w, err - } -} diff --git a/plugin/kubernetes/xfr.go b/plugin/kubernetes/xfr.go index 0392f0252..550a70dfd 100644 --- a/plugin/kubernetes/xfr.go +++ b/plugin/kubernetes/xfr.go @@ -84,10 +84,6 @@ func (k *Kubernetes) Transfer(zone string, serial uint32) (<-chan []dns.RR, erro endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace) for _, ep := range endpointsList { - if ep.Name != svc.Name || ep.Namespace != svc.Namespace { - continue - } - for _, eps := range ep.Subsets { srvWeight := calcSRVWeight(len(eps.Addresses)) for _, addr := range eps.Addresses {