plugin/kubernetes: Watch EndpointSlices (#4209)

* initial commit

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* convert endpointslices to object.endpoints

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* add opt hard coded for now

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* check that server supports endpointslice

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* fix import grouping

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* dont use endpoint slice in 1.17 or 1.18

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* bump kind/k8s in circle ci to latest

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* drop k8s to latest supported by kind

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* use endpointslice name as endoint Name; index by Service name

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* use index key comparison in nsAddrs()

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* add Index to object.Endpoint fixtures; fix direct endpoint name compares

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* add slice dup check and test

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* todo

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* add ep-slice skew dup test for reverse

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* nsaddrs: de-dup ep-slice skew dups; add test

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* remove todo

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* address various feedback

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* consolidate endpoint/slice informer code

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* fix endpoint informer consolidation; use clearer func name

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* log info; use major/minor fields

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* fix nsAddr and unit test

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* add latency tracking for endpointslices

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* endpointslice latency unit test & fix

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* code shuffling

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* rename endpointslices in tests

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* remove de-dup from nsAddrs and test

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* remove de-dup from findServices / test

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
This commit is contained in:
Chris O'Haver 2020-10-30 08:14:30 -04:00 committed by GitHub
parent c840caf1ef
commit 272ccb195d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 374 additions and 182 deletions

View file

@ -21,8 +21,8 @@ integrationDefaults: &integrationDefaults
image: ubuntu-1604:201903-01
working_directory: ~/go/src/${CIRCLE_PROJECT_USERNAME}/coredns
environment:
- K8S_VERSION: v1.18.2
- KIND_VERSION: v0.8.1
- K8S_VERSION: v1.19.1
- KIND_VERSION: v0.9.0
- KUBECONFIG: /home/circleci/.kube/kind-config-kind
setupKubernetes: &setupKubernetes

View file

@ -11,16 +11,18 @@ import (
"github.com/coredns/coredns/plugin/kubernetes/object"
api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
const (
podIPIndex = "PodIP"
svcNameNamespaceIndex = "NameNamespace"
svcNameNamespaceIndex = "ServiceNameNamespace"
svcIPIndex = "ServiceIP"
epNameNamespaceIndex = "EndpointNameNamespace"
epIPIndex = "EndpointsIP"
@ -81,6 +83,7 @@ type dnsControl struct {
type dnsControlOpts struct {
initPodCache bool
initEndpointsCache bool
useEndpointSlices bool
ignoreEmptyService bool
// Label handling.
@ -130,15 +133,31 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
}
if opts.initEndpointsCache {
var (
apiObj runtime.Object
listWatch cache.ListWatch
to func(bool) object.ToFunc
latency object.RecordLatencyFunc
)
if opts.useEndpointSlices {
apiObj = &discovery.EndpointSlice{}
listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
to = object.EndpointSliceToEndpoints
latency = dns.recordEndpointSliceDNSProgrammingLatency
} else {
apiObj = &api.Endpoints{}
listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
to = object.ToEndpoints
latency = dns.recordEndpointDNSProgrammingLatency
}
dns.epLister, dns.epController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Endpoints{},
&listWatch,
apiObj,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(object.ToEndpoints(opts.skipAPIObjectsCleanup), dns.recordDNSProgrammingLatency),
object.DefaultProcessor(to(opts.skipAPIObjectsCleanup), latency),
)
}
@ -154,8 +173,12 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
return &dns
}
func (dns *dnsControl) recordDNSProgrammingLatency(obj meta.Object) {
recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj.(*api.Endpoints))
func (dns *dnsControl) recordEndpointDNSProgrammingLatency(obj meta.Object) {
recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj)
}
func (dns *dnsControl) recordEndpointSliceDNSProgrammingLatency(obj meta.Object) {
recordDNSProgrammingLatency(dns.SvcIndex(object.ServiceKey(obj.GetLabels()[discovery.LabelServiceName], obj.GetNamespace())), obj)
}
func podIPIndexFunc(obj interface{}) ([]string, error) {
@ -207,8 +230,7 @@ func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s l
if s != nil {
opts.LabelSelector = s.String()
}
listV1, err := c.CoreV1().Services(ns).List(ctx, opts)
return listV1, err
return c.CoreV1().Services(ns).List(ctx, opts)
}
}
@ -221,8 +243,16 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label
opts.FieldSelector = opts.FieldSelector + ","
}
opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
listV1, err := c.CoreV1().Pods(ns).List(ctx, opts)
return listV1, err
return c.CoreV1().Pods(ns).List(ctx, opts)
}
}
func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
}
}
@ -231,8 +261,7 @@ func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s
if s != nil {
opts.LabelSelector = s.String()
}
listV1, err := c.CoreV1().Endpoints(ns).List(ctx, opts)
return listV1, err
return c.CoreV1().Endpoints(ns).List(ctx, opts)
}
}
@ -241,8 +270,56 @@ func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Sel
if s != nil {
opts.LabelSelector = s.String()
}
listV1, err := c.CoreV1().Namespaces().List(ctx, opts)
return listV1, err
return c.CoreV1().Namespaces().List(ctx, opts)
}
}
func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
return c.CoreV1().Services(ns).Watch(ctx, options)
}
}
func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
if len(options.FieldSelector) > 0 {
options.FieldSelector = options.FieldSelector + ","
}
options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
return c.CoreV1().Pods(ns).Watch(ctx, options)
}
}
func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
return c.DiscoveryV1beta1().EndpointSlices(ns).Watch(ctx, options)
}
}
func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
return c.CoreV1().Endpoints(ns).Watch(ctx, options)
}
}
func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
return c.CoreV1().Namespaces().Watch(ctx, options)
}
}
@ -442,7 +519,7 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
}
func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service {
return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace()))
return dns.SvcIndex(object.ServiceKey(endpoints.GetName(), endpoints.GetNamespace()))
}
// subsetsEquivalent checks if two endpoint subsets are significantly equivalent

View file

@ -643,8 +643,9 @@ var epsIndex = map[string][]*object.Endpoints{
},
},
},
Name: "svc1",
Name: "svc1-slice1",
Namespace: "testns",
Index: object.EndpointsKey("svc1", "testns"),
}},
"svcempty.testns": {{
Subsets: []object.EndpointSubset{
@ -655,8 +656,9 @@ var epsIndex = map[string][]*object.Endpoints{
},
},
},
Name: "svcempty",
Name: "svcempty-slice1",
Namespace: "testns",
Index: object.EndpointsKey("svcempty", "testns"),
}},
"hdls1.testns": {{
Subsets: []object.EndpointSubset{
@ -674,8 +676,9 @@ var epsIndex = map[string][]*object.Endpoints{
},
},
},
Name: "hdls1",
Name: "hdls1-slice1",
Namespace: "testns",
Index: object.EndpointsKey("hdls1", "testns"),
}},
"hdlsprtls.testns": {{
Subsets: []object.EndpointSubset{
@ -686,8 +689,9 @@ var epsIndex = map[string][]*object.Endpoints{
Ports: []object.EndpointPort{{Port: -1}},
},
},
Name: "hdlsprtls",
Name: "hdlsprtls-slice1",
Namespace: "testns",
Index: object.EndpointsKey("hdlsprtls", "testns"),
}},
}

View file

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net"
"strconv"
"strings"
"github.com/coredns/coredns/plugin"
@ -18,6 +19,7 @@ import (
"github.com/miekg/dns"
api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
@ -244,6 +246,20 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
k.opts.zones = k.Zones
k.opts.endpointNameMode = k.endpointNameMode
// Enable use of endpoint slices if the API supports the discovery v1 beta1 api
if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
k.opts.useEndpointSlices = true
}
// Disable use of endpoint slices for k8s versions 1.18 and earlier. Endpoint slices were
// introduced in 1.17 but EndpointSliceMirroring was not added until 1.19.
sv, _ := kubeClient.ServerVersion()
major, _ := strconv.Atoi(sv.Major)
minor, _ := strconv.Atoi(sv.Minor)
if k.opts.useEndpointSlices && major <= 1 && minor <= 18 {
log.Info("watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
k.opts.useEndpointSlices = false
}
k.APIConn = newdnsController(ctx, kubeClient, k.opts)
return err
@ -433,8 +449,9 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
if endpointsList == nil {
endpointsList = endpointsListFunc()
}
for _, ep := range endpointsList {
if ep.Name != svc.Name || ep.Namespace != svc.Namespace {
if object.EndpointsKey(svc.Name, svc.Namespace) != ep.Index {
continue
}

View file

@ -137,8 +137,9 @@ func (APIConnServiceTest) EpIndex(string) []*object.Endpoints {
},
},
},
Name: "svc1",
Name: "svc1-slice1",
Namespace: "testns",
Index: object.EndpointsKey("svc1", "testns"),
},
{
Subsets: []object.EndpointSubset{
@ -151,22 +152,9 @@ func (APIConnServiceTest) EpIndex(string) []*object.Endpoints {
},
},
},
Name: "hdls1",
Namespace: "testns",
},
{
Subsets: []object.EndpointSubset{
{
Addresses: []object.EndpointAddress{
{IP: "172.0.0.3"},
},
Ports: []object.EndpointPort{
{Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
Name: "hdls1",
Name: "hdls1-slice1",
Namespace: "testns",
Index: object.EndpointsKey("hdls1", "testns"),
},
{
Subsets: []object.EndpointSubset{
@ -194,8 +182,9 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints {
},
},
},
Name: "svc1",
Name: "svc1-slice1",
Namespace: "testns",
Index: object.EndpointsKey("svc1", "testns"),
},
{
Subsets: []object.EndpointSubset{
@ -208,22 +197,24 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints {
},
},
},
Name: "hdls1",
Name: "hdls1-slice1",
Namespace: "testns",
Index: object.EndpointsKey("hdls1", "testns"),
},
{
Subsets: []object.EndpointSubset{
{
Addresses: []object.EndpointAddress{
{IP: "172.0.0.3"},
{IP: "172.0.0.2"},
},
Ports: []object.EndpointPort{
{Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
Name: "hdls1",
Name: "hdls1-slice2",
Namespace: "testns",
Index: object.EndpointsKey("hdls1", "testns"),
},
{
Subsets: []object.EndpointSubset{
@ -275,6 +266,9 @@ func TestServices(t *testing.T) {
// External Services
{qname: "external.testns.svc.interwebs.test.", qtype: dns.TypeCNAME, answer: svcAns{host: "coredns.io", key: "/" + coredns + "/test/interwebs/svc/testns/external"}},
// Headless Services
{qname: "hdls1.testns.svc.interwebs.test.", qtype: dns.TypeA, answer: svcAns{host: "172.0.0.2", key: "/" + coredns + "/test/interwebs/svc/testns/hdls1/172-0-0-2"}},
}
for i, test := range tests {

View file

@ -5,10 +5,10 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var (
@ -37,11 +37,11 @@ var (
durationSinceFunc = time.Since
)
func recordDNSProgrammingLatency(svcs []*object.Service, endpoints *api.Endpoints) {
func recordDNSProgrammingLatency(svcs []*object.Service, endpoints meta.Object) {
// getLastChangeTriggerTime is the time.Time value of the EndpointsLastChangeTriggerTime
// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
var lastChangeTriggerTime time.Time
stringVal, ok := endpoints.Annotations[api.EndpointsLastChangeTriggerTime]
stringVal, ok := endpoints.GetAnnotations()[api.EndpointsLastChangeTriggerTime]
if ok {
ts, err := time.Parse(time.RFC3339Nano, stringVal)
if err != nil {

View file

@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
@ -20,15 +21,92 @@ const (
namespace = "testns"
)
func TestDNSProgrammingLatency(t *testing.T) {
var expected = `
# HELP coredns_kubernetes_dns_programming_duration_seconds Histogram of the time (in seconds) it took to program a dns instance.
# TYPE coredns_kubernetes_dns_programming_duration_seconds histogram
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2
coredns_kubernetes_dns_programming_duration_seconds_sum{service_kind="headless_with_selector"} 3
coredns_kubernetes_dns_programming_duration_seconds_count{service_kind="headless_with_selector"} 2
`
func TestDNSProgrammingLatencyEndpointSlices(t *testing.T) {
client := fake.NewSimpleClientset()
now := time.Now()
ctx := context.TODO()
controller := newdnsController(ctx, client, dnsControlOpts{
initEndpointsCache: true,
useEndpointSlices: true,
// This is needed as otherwise the fake k8s client doesn't work properly.
skipAPIObjectsCleanup: true,
})
durationSinceFunc = func(t time.Time) time.Duration {
return now.Sub(t)
}
DNSProgrammingLatency.Reset()
go controller.Run()
endpoints1 := []discovery.Endpoint{{
Addresses: []string{"1.2.3.4"},
}}
endpoints2 := []discovery.Endpoint{{
Addresses: []string{"1.2.3.45"},
}}
createService(t, client, controller, "my-service", api.ClusterIPNone)
createEndpointSlice(t, client, "my-service", now.Add(-2*time.Second), endpoints1)
updateEndpointSlice(t, client, "my-service", now.Add(-1*time.Second), endpoints2)
createEndpointSlice(t, client, "endpoints-no-service", now.Add(-4*time.Second), nil)
createService(t, client, controller, "clusterIP-service", "10.40.0.12")
createEndpointSlice(t, client, "clusterIP-service", now.Add(-8*time.Second), nil)
createService(t, client, controller, "headless-no-annotation", api.ClusterIPNone)
createEndpointSlice(t, client, "headless-no-annotation", nil, nil)
createService(t, client, controller, "headless-wrong-annotation", api.ClusterIPNone)
createEndpointSlice(t, client, "headless-wrong-annotation", "wrong-value", nil)
controller.Stop()
if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil {
t.Error(err)
}
}
func TestDnsProgrammingLatencyEndpoints(t *testing.T) {
client := fake.NewSimpleClientset()
now := time.Now()
ctx := context.TODO()
controller := newdnsController(ctx, client, dnsControlOpts{
initEndpointsCache: true,
useEndpointSlices: false,
// This is needed as otherwise the fake k8s client doesn't work properly.
skipAPIObjectsCleanup: true,
})
durationSinceFunc = func(t time.Time) time.Duration {
return now.Sub(t)
}
@ -59,33 +137,7 @@ func TestDNSProgrammingLatency(t *testing.T) {
createEndpoints(t, client, "headless-wrong-annotation", "wrong-value", nil)
controller.Stop()
expected := `
# HELP coredns_kubernetes_dns_programming_duration_seconds Histogram of the time (in seconds) it took to program a dns instance.
# TYPE coredns_kubernetes_dns_programming_duration_seconds histogram
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2
coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2
coredns_kubernetes_dns_programming_duration_seconds_sum{service_kind="headless_with_selector"} 3
coredns_kubernetes_dns_programming_duration_seconds_count{service_kind="headless_with_selector"} 2
`
if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil {
t.Error(err)
}
@ -105,6 +157,24 @@ func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []ap
}
}
func buildEndpointSlice(name string, lastChangeTriggerTime interface{}, endpoints []discovery.Endpoint) *discovery.EndpointSlice {
annotations := make(map[string]string)
switch v := lastChangeTriggerTime.(type) {
case string:
annotations[api.EndpointsLastChangeTriggerTime] = v
case time.Time:
annotations[api.EndpointsLastChangeTriggerTime] = v.Format(time.RFC3339Nano)
}
return &discovery.EndpointSlice{
ObjectMeta: meta.ObjectMeta{
Namespace: namespace, Name: name + "-12345",
Labels: map[string]string{discovery.LabelServiceName: name},
Annotations: annotations,
},
Endpoints: endpoints,
}
}
func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) {
ctx := context.TODO()
_, err := client.CoreV1().Endpoints(namespace).Create(ctx, buildEndpoints(name, triggerTime, subsets), meta.CreateOptions{})
@ -121,11 +191,27 @@ func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, tri
}
}
func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIP string) {
func createEndpointSlice(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, endpoints []discovery.Endpoint) {
ctx := context.TODO()
_, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(ctx, buildEndpointSlice(name, triggerTime, endpoints), meta.CreateOptions{})
if err != nil {
t.Fatal(err)
}
}
func updateEndpointSlice(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, endpoints []discovery.Endpoint) {
ctx := context.TODO()
_, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Update(ctx, buildEndpointSlice(name, triggerTime, endpoints), meta.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
}
func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) {
ctx := context.TODO()
if _, err := client.CoreV1().Services(namespace).Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name},
Spec: api.ServiceSpec{ClusterIP: clusterIP},
Spec: api.ServiceSpec{ClusterIP: clusterIp},
}, meta.CreateOptions{}); err != nil {
t.Fatal(err)
}

View file

@ -4,7 +4,6 @@ import (
"net"
"strings"
"github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/miekg/dns"
api "k8s.io/api/core/v1"
)
@ -27,7 +26,7 @@ func (k *Kubernetes) nsAddrs(external bool, zone string) []dns.RR {
// Collect IPs for all Services of the Endpoints
for _, endpoint := range endpoints {
svcs := k.APIConn.SvcIndex(object.ServiceKey(endpoint.Name, endpoint.Namespace))
svcs := k.APIConn.SvcIndex(endpoint.Index)
for _, svc := range svcs {
if external {
svcName := strings.Join([]string{svc.Name, svc.Namespace, zone}, ".")

View file

@ -61,43 +61,28 @@ func (APIConnTest) EpIndexReverse(ip string) []*object.Endpoints {
}
eps := []*object.Endpoints{
{
Subsets: []object.EndpointSubset{
{
Addresses: []object.EndpointAddress{
{
IP: "10.244.0.20",
},
},
},
},
Name: "dns-service",
Name: "dns-service-slice1",
Namespace: "kube-system",
},
{
Index: object.EndpointsKey("dns-service", "kube-system"),
Subsets: []object.EndpointSubset{
{Addresses: []object.EndpointAddress{{IP: "10.244.0.20"}}},
},
},
{
Addresses: []object.EndpointAddress{
{
IP: "10.244.0.20",
},
},
},
},
Name: "hdls-dns-service",
Name: "hdls-dns-service-slice1",
Namespace: "kube-system",
},
{
Index: object.EndpointsKey("hdls-dns-service", "kube-system"),
Subsets: []object.EndpointSubset{
{Addresses: []object.EndpointAddress{{IP: "10.244.0.20"}}},
},
},
{
Addresses: []object.EndpointAddress{
{
IP: "10.244.0.20",
},
},
},
},
Name: "dns6-service",
Name: "dns6-service-slice1",
Namespace: "kube-system",
Index: object.EndpointsKey("dns6-service", "kube-system"),
Subsets: []object.EndpointSubset{
{Addresses: []object.EndpointAddress{{IP: "10.244.0.20"}}},
},
},
}
return eps

View file

@ -4,6 +4,7 @@ import (
"fmt"
api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
)
@ -56,6 +57,17 @@ func ToEndpoints(skipCleanup bool) ToFunc {
}
}
// EndpointSliceToEndpoints returns a function that converts an *discovery.EndpointSlice to a *Endpoints.
func EndpointSliceToEndpoints(skipCleanup bool) ToFunc {
return func(obj interface{}) (interface{}, error) {
eps, ok := obj.(*discovery.EndpointSlice)
if !ok {
return nil, fmt.Errorf("unexpected object %v", obj)
}
return endpointSliceToEndpoints(skipCleanup, eps), nil
}
}
// toEndpoints converts an *api.Endpoints to a *Endpoints.
func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints {
e := &Endpoints{
@ -108,6 +120,49 @@ func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints {
return e
}
// endpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints.
func endpointSliceToEndpoints(skipCleanup bool, ends *discovery.EndpointSlice) *Endpoints {
e := &Endpoints{
Version: ends.GetResourceVersion(),
Name: ends.GetName(),
Namespace: ends.GetNamespace(),
Index: EndpointsKey(ends.Labels[discovery.LabelServiceName], ends.GetNamespace()),
Subsets: make([]EndpointSubset, 1),
}
if len(ends.Ports) == 0 {
// Add sentinel if there are no ports.
e.Subsets[0].Ports = []EndpointPort{{Port: -1}}
} else {
e.Subsets[0].Ports = make([]EndpointPort, len(ends.Ports))
for k, p := range ends.Ports {
ep := EndpointPort{Port: *p.Port, Name: *p.Name, Protocol: string(*p.Protocol)}
e.Subsets[0].Ports[k] = ep
}
}
for _, end := range ends.Endpoints {
for _, a := range end.Addresses {
ea := EndpointAddress{IP: a}
if end.Hostname != nil {
ea.Hostname = *end.Hostname
}
if end.TargetRef != nil {
ea.TargetRefName = end.TargetRef.Name
}
// EndpointSlice does not contain NodeName, leave blank
e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, ea)
e.IndexIP = append(e.IndexIP, a)
}
}
if !skipCleanup {
*ends = discovery.EndpointSlice{}
}
return e
}
// CopyWithoutSubsets copies e, without the subsets.
func (e *Endpoints) CopyWithoutSubsets() *Endpoints {
e1 := &Endpoints{

View file

@ -21,10 +21,11 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.
return clientState, cache.New(cfg)
}
type recordLatencyFunc func(meta.Object)
// RecordLatencyFunc is a function for recording api object delta latency
type RecordLatencyFunc func(meta.Object)
// DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion.
func DefaultProcessor(convert ToFunc, recordLatency recordLatencyFunc) ProcessorBuilder {
func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) ProcessorBuilder {
return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {

View file

@ -46,7 +46,7 @@ func (k *Kubernetes) serviceRecordForIP(ip, name string) []msg.Service {
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
if addr.IP == ip {
domain := strings.Join([]string{endpointHostname(addr, k.endpointNameMode), ep.Name, ep.Namespace, Svc, k.primaryZone()}, ".")
domain := strings.Join([]string{endpointHostname(addr, k.endpointNameMode), ep.Index, Svc, k.primaryZone()}, ".")
svcs = append(svcs, msg.Service{Host: domain, TTL: k.ttl})
}
}

View file

@ -56,14 +56,11 @@ func (APIConnReverseTest) SvcIndexReverse(ip string) []*object.Service {
}
func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints {
ep1 := object.Endpoints{
ep1s1 := object.Endpoints{
Subsets: []object.EndpointSubset{
{
Addresses: []object.EndpointAddress{
{IP: "10.0.0.100", Hostname: "ep1a"},
{IP: "1234:abcd::1", Hostname: "ep1b"},
{IP: "fd00:77:30::a", Hostname: "ip6svc1ex"},
{IP: "fd00:77:30::2:9ba6", Hostname: "ip6svc1in"},
{IP: "10.0.0.99", Hostname: "double-ep"}, // this endpoint is used by two services
},
Ports: []object.EndpointPort{
@ -71,8 +68,41 @@ func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints {
},
},
},
Name: "svc1",
Name: "svc1-slice1",
Namespace: "testns",
Index: object.EndpointsKey("svc1", "testns"),
}
ep1s2 := object.Endpoints{
Subsets: []object.EndpointSubset{
{
Addresses: []object.EndpointAddress{
{IP: "1234:abcd::1", Hostname: "ep1b"},
{IP: "fd00:77:30::a", Hostname: "ip6svc1ex"},
{IP: "fd00:77:30::2:9ba6", Hostname: "ip6svc1in"},
},
Ports: []object.EndpointPort{
{Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
Name: "svc1-slice2",
Namespace: "testns",
Index: object.EndpointsKey("svc1", "testns"),
}
ep1s3 := object.Endpoints{
Subsets: []object.EndpointSubset{
{
Addresses: []object.EndpointAddress{
{IP: "10.0.0.100", Hostname: "ep1a"}, // duplicate endpointslice address
},
Ports: []object.EndpointPort{
{Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
Name: "svc1-ccccc",
Namespace: "testns",
Index: object.EndpointsKey("svc1", "testns"),
}
ep2 := object.Endpoints{
Subsets: []object.EndpointSubset{
@ -85,20 +115,21 @@ func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints {
},
},
},
Name: "svc2",
Name: "svc2-slice1",
Namespace: "testns",
Index: object.EndpointsKey("svc2", "testns"),
}
switch ip {
case "10.0.0.100":
fallthrough
case "1234:abcd::1":
fallthrough
case "fd00:77:30::a":
fallthrough
case "fd00:77:30::2:9ba6":
return []*object.Endpoints{&ep1}
case "10.0.0.99":
return []*object.Endpoints{&ep1, &ep2}
return []*object.Endpoints{&ep1s2}
case "10.0.0.100": // two EndpointSlices for a Service contain this IP (EndpointSlice skew)
return []*object.Endpoints{&ep1s1, &ep1s3}
case "10.0.0.99": // two different Services select this IP
return []*object.Endpoints{&ep1s1, &ep2}
}
return nil
}

View file

@ -113,6 +113,7 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) {
opts := dnsControlOpts{
initEndpointsCache: true,
useEndpointSlices: false,
ignoreEmptyService: false,
}
k8s.opts = opts

View file

@ -1,54 +0,0 @@
package kubernetes
import (
"context"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
w, err := c.CoreV1().Services(ns).Watch(ctx, options)
return w, err
}
}
func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
if len(options.FieldSelector) > 0 {
options.FieldSelector = options.FieldSelector + ","
}
options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
w, err := c.CoreV1().Pods(ns).Watch(ctx, options)
return w, err
}
}
func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
w, err := c.CoreV1().Endpoints(ns).Watch(ctx, options)
return w, err
}
}
func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
w, err := c.CoreV1().Namespaces().Watch(ctx, options)
return w, err
}
}

View file

@ -84,10 +84,6 @@ func (k *Kubernetes) Transfer(zone string, serial uint32) (<-chan []dns.RR, erro
endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace)
for _, ep := range endpointsList {
if ep.Name != svc.Name || ep.Namespace != svc.Namespace {
continue
}
for _, eps := range ep.Subsets {
srvWeight := calcSRVWeight(len(eps.Addresses))
for _, addr := range eps.Addresses {