From 06cd8439182be8b9ed17280eaae411f9afb7c8f2 Mon Sep 17 00:00:00 2001 From: Chris O'Haver Date: Mon, 12 Jun 2023 11:22:57 -0400 Subject: [PATCH] plugin/kubernetes: Remove Endpoint and EndpointSlice v1beta Support (#6147) * remove endpoint and endpointslicev1beta watch support Signed-off-by: Chris O'Haver * adjust readme Signed-off-by: Chris O'Haver * informer object changes Signed-off-by: Chris O'Haver * remove unused funcs Signed-off-by: Chris O'Haver --------- Signed-off-by: Chris O'Haver --- plugin/kubernetes/README.md | 4 +- plugin/kubernetes/controller.go | 88 ---------------------- plugin/kubernetes/kubernetes.go | 80 -------------------- plugin/kubernetes/object/endpoint.go | 105 --------------------------- 4 files changed, 1 insertion(+), 276 deletions(-) diff --git a/plugin/kubernetes/README.md b/plugin/kubernetes/README.md index a62455630..0c50333e9 100644 --- a/plugin/kubernetes/README.md +++ b/plugin/kubernetes/README.md @@ -114,9 +114,7 @@ that has not yet been synchronized. ## Monitoring Kubernetes Endpoints -By default the *kubernetes* plugin watches Endpoints via the `discovery.EndpointSlices` API. However the -`api.Endpoints` API is used instead if the Kubernetes version does not support the `EndpointSliceProxying` -feature gate by default (i.e. Kubernetes version < 1.19). +The *kubernetes* plugin watches Endpoints via the `discovery.EndpointSlices` API. ## Ready diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index a785a003d..e7db294fc 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -12,7 +12,6 @@ import ( api "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" - discoveryV1beta1 "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -66,10 +65,6 @@ type dnsControl struct { selector labels.Selector namespaceSelector labels.Selector - // epLock is used to lock reads of epLister and epController while they are being replaced - // with the api.Endpoints Lister/Controller on k8s systems that don't use discovery.EndpointSlices - epLock sync.RWMutex - svcController cache.Controller podController cache.Controller epController cache.Controller @@ -153,12 +148,10 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()), ) - dns.epLock.Lock() dns.epLister = epLister if opts.initEndpointsCache { dns.epController = epController } - dns.epLock.Unlock() dns.nsLister, dns.nsController = object.NewIndexerInformer( &cache.ListWatch{ @@ -174,42 +167,6 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts return &dns } -// WatchEndpoints will set the endpoint Lister and Controller to watch object.Endpoints -// instead of the default discovery.EndpointSlice. This is used in older k8s clusters where -// discovery.EndpointSlice is not fully supported. -// This can be removed when all supported k8s versions fully support EndpointSlice. -func (dns *dnsControl) WatchEndpoints(ctx context.Context) { - dns.epLock.Lock() - dns.epLister, dns.epController = object.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - }, - &api.Endpoints{}, - cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, - cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder()), - ) - dns.epLock.Unlock() -} - -// WatchEndpointSliceV1beta1 will set the endpoint Lister and Controller to watch v1beta1 -// instead of the default v1. -func (dns *dnsControl) WatchEndpointSliceV1beta1(ctx context.Context) { - dns.epLock.Lock() - dns.epLister, dns.epController = object.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: endpointSliceListFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector), - WatchFunc: endpointSliceWatchFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector), - }, - &discoveryV1beta1.EndpointSlice{}, - cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, - cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(object.EndpointSliceV1beta1ToEndpoints, dns.EndpointSliceLatencyRecorder()), - ) - dns.epLock.Unlock() -} - func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder { return &object.EndpointLatencyRecorder{ ServiceFunc: func(o meta.Object) []*object.Service { @@ -298,14 +255,6 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label return c.CoreV1().Pods(ns).List(ctx, opts) } } -func endpointSliceListFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { - return func(opts meta.ListOptions) (runtime.Object, error) { - if s != nil { - opts.LabelSelector = s.String() - } - return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts) - } -} func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) { @@ -316,15 +265,6 @@ func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns strin } } -func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { - return func(opts meta.ListOptions) (runtime.Object, error) { - if s != nil { - opts.LabelSelector = s.String() - } - return c.CoreV1().Endpoints(ns).List(ctx, opts) - } -} - func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) { if s != nil { @@ -356,15 +296,6 @@ func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labe } } -func endpointSliceWatchFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { - return func(options meta.ListOptions) (watch.Interface, error) { - if s != nil { - options.LabelSelector = s.String() - } - return c.DiscoveryV1beta1().EndpointSlices(ns).Watch(ctx, options) - } -} - func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { @@ -374,15 +305,6 @@ func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns stri } } -func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { - return func(options meta.ListOptions) (watch.Interface, error) { - if s != nil { - options.LabelSelector = s.String() - } - return c.CoreV1().Endpoints(ns).Watch(ctx, options) - } -} - func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { @@ -413,9 +335,7 @@ func (dns *dnsControl) Run() { go dns.svcController.Run(dns.stopCh) if dns.epController != nil { go func() { - dns.epLock.RLock() dns.epController.Run(dns.stopCh) - dns.epLock.RUnlock() }() } if dns.podController != nil { @@ -430,9 +350,7 @@ func (dns *dnsControl) HasSynced() bool { a := dns.svcController.HasSynced() b := true if dns.epController != nil { - dns.epLock.RLock() b = dns.epController.HasSynced() - dns.epLock.RUnlock() } c := true if dns.podController != nil { @@ -455,8 +373,6 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) { } func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) { - dns.epLock.RLock() - defer dns.epLock.RUnlock() os := dns.epLister.List() for _, o := range os { ep, ok := o.(*object.Endpoints) @@ -531,8 +447,6 @@ func (dns *dnsControl) SvcExtIndexReverse(ip string) (svcs []*object.Service) { } func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) { - dns.epLock.RLock() - defer dns.epLock.RUnlock() os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx) if err != nil { return nil @@ -548,8 +462,6 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) { } func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) { - dns.epLock.RLock() - defer dns.epLock.RUnlock() os, err := dns.epLister.ByIndex(epIPIndex, ip) if err != nil { return nil diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index 9e4633ca8..14ea031a0 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "net" - "strconv" "strings" "time" @@ -19,9 +18,6 @@ import ( "github.com/miekg/dns" api "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1" - discoveryV1beta1 "k8s.io/api/discovery/v1beta1" - kerrors "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" @@ -262,22 +258,8 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o k.APIConn = newdnsController(ctx, kubeClient, k.opts) - initEndpointWatch := k.opts.initEndpointsCache - onStart = func() error { go func() { - if initEndpointWatch { - // Revert to watching Endpoints for incompatible K8s. - // This can be removed when all supported k8s versions support endpointslices. - ok, v := k.endpointSliceSupported(kubeClient) - if !ok { - k.APIConn.(*dnsControl).WatchEndpoints(ctx) - } - // Revert to EndpointSlice v1beta1 if v1 is not supported - if ok && v == discoveryV1beta1.SchemeGroupVersion.String() { - k.APIConn.(*dnsControl).WatchEndpointSliceV1beta1(ctx) - } - } k.APIConn.Run() }() @@ -311,68 +293,6 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o return onStart, onShut, err } -// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints) -// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices -// should be watched, and false when endpoints should be watched. -// If the API supports discovery, and the server versions >= 1.19, true is returned. -// Also returned is the discovery version supported: "v1" if v1 is supported, and v1beta1 if v1beta1 is supported and -// v1 is not supported. -// This function should be removed, when all supported versions of k8s support v1. -func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) (bool, string) { - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - logTicker := time.NewTicker(10 * time.Second) - defer logTicker.Stop() - var connErr error - for { - select { - case <-logTicker.C: - if connErr == nil { - continue - } - log.Warningf("Kubernetes API connection failure: %v", connErr) - case <-ticker.C: - sv, err := kubeClient.ServerVersion() - if err != nil { - connErr = err - continue - } - - // Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled - // by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19. - // DNS results should be built from the same source data that the proxy uses. This decision assumes - // k8s EndpointSliceProxying feature gate is at the default (i.e. only enabled for k8s >= 1.19). - major, _ := strconv.Atoi(sv.Major) - minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+")) - if major <= 1 && minor <= 18 { - log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19") - return false, "" - } - - // Enable use of endpoint slices if the API supports the discovery api - _, err = kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()) - if err == nil { - return true, discovery.SchemeGroupVersion.String() - } else if !kerrors.IsNotFound(err) { - connErr = err - continue - } - - _, err = kubeClient.Discovery().ServerResourcesForGroupVersion(discoveryV1beta1.SchemeGroupVersion.String()) - if err == nil { - return true, discoveryV1beta1.SchemeGroupVersion.String() - } else if !kerrors.IsNotFound(err) { - connErr = err - continue - } - - // Disable use of endpoint slices in case that it is disabled in k8s versions 1.19 and newer. - log.Info("Endpointslices API disabled. Watching Endpoints instead.") - return false, "" - } - } -} - // Records looks up services in kubernetes. func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error) { r, e := parseRequest(state.Name(), state.Zone) diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index 4af64f363..b8c2f63f7 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -3,9 +3,7 @@ package object import ( "fmt" - api "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" - discoveryV1beta1 "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -48,60 +46,6 @@ type EndpointPort struct { // EndpointsKey returns a string using for the index. func EndpointsKey(name, namespace string) string { return name + "." + namespace } -// ToEndpoints converts an *api.Endpoints to a *Endpoints. -func ToEndpoints(obj meta.Object) (meta.Object, error) { - end, ok := obj.(*api.Endpoints) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - e := &Endpoints{ - Version: end.GetResourceVersion(), - Name: end.GetName(), - Namespace: end.GetNamespace(), - Index: EndpointsKey(end.GetName(), end.GetNamespace()), - Subsets: make([]EndpointSubset, len(end.Subsets)), - } - for i, eps := range end.Subsets { - sub := EndpointSubset{ - Addresses: make([]EndpointAddress, len(eps.Addresses)), - } - if len(eps.Ports) == 0 { - // Add sentinel if there are no ports. - sub.Ports = []EndpointPort{{Port: -1}} - } else { - sub.Ports = make([]EndpointPort, len(eps.Ports)) - } - - for j, a := range eps.Addresses { - ea := EndpointAddress{IP: a.IP, Hostname: a.Hostname} - if a.NodeName != nil { - ea.NodeName = *a.NodeName - } - if a.TargetRef != nil { - ea.TargetRefName = a.TargetRef.Name - } - sub.Addresses[j] = ea - } - - for k, p := range eps.Ports { - ep := EndpointPort{Port: p.Port, Name: p.Name, Protocol: string(p.Protocol)} - sub.Ports[k] = ep - } - - e.Subsets[i] = sub - } - - for _, eps := range end.Subsets { - for _, a := range eps.Addresses { - e.IndexIP = append(e.IndexIP, a.IP) - } - } - - *end = api.Endpoints{} - - return e, nil -} - // EndpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) { ends, ok := obj.(*discovery.EndpointSlice) @@ -153,55 +97,6 @@ func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) { return e, nil } -// EndpointSliceV1beta1ToEndpoints converts a v1beta1 *discovery.EndpointSlice to a *Endpoints. -func EndpointSliceV1beta1ToEndpoints(obj meta.Object) (meta.Object, error) { - ends, ok := obj.(*discoveryV1beta1.EndpointSlice) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - e := &Endpoints{ - Version: ends.GetResourceVersion(), - Name: ends.GetName(), - Namespace: ends.GetNamespace(), - Index: EndpointsKey(ends.Labels[discovery.LabelServiceName], ends.GetNamespace()), - Subsets: make([]EndpointSubset, 1), - } - - if len(ends.Ports) == 0 { - // Add sentinel if there are no ports. - e.Subsets[0].Ports = []EndpointPort{{Port: -1}} - } else { - e.Subsets[0].Ports = make([]EndpointPort, len(ends.Ports)) - for k, p := range ends.Ports { - ep := EndpointPort{Port: *p.Port, Name: *p.Name, Protocol: string(*p.Protocol)} - e.Subsets[0].Ports[k] = ep - } - } - - for _, end := range ends.Endpoints { - if !endpointsliceReady(end.Conditions.Ready) { - continue - } - for _, a := range end.Addresses { - ea := EndpointAddress{IP: a} - if end.Hostname != nil { - ea.Hostname = *end.Hostname - } - // ignore pod names that are too long to be a valid label - if end.TargetRef != nil && len(end.TargetRef.Name) < 64 { - ea.TargetRefName = end.TargetRef.Name - } - // EndpointSlice does not contain NodeName, leave blank - e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, ea) - e.IndexIP = append(e.IndexIP, a) - } - } - - *ends = discoveryV1beta1.EndpointSlice{} - - return e, nil -} - func endpointsliceReady(ready *bool) bool { // Per API docs: a nil value indicates an unknown state. In most cases consumers // should interpret this unknown state as ready.