diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 890785d71..f17576eb4 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -11,7 +11,8 @@ import ( "github.com/coredns/coredns/plugin/kubernetes/object" api "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1beta1" + discovery "k8s.io/api/discovery/v1" + discoveryV1beta1 "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -180,6 +181,23 @@ func (dns *dnsControl) WatchEndpoints(ctx context.Context) { dns.epLock.Unlock() } +// WatchEndpointSliceV1beta1 will set the endpoint Lister and Controller to watch v1beta1 +// instead of the default v1. +func (dns *dnsControl) WatchEndpointSliceV1beta1(ctx context.Context) { + dns.epLock.Lock() + dns.epLister, dns.epController = object.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: endpointSliceListFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector), + WatchFunc: endpointSliceWatchFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector), + }, + &discoveryV1beta1.EndpointSlice{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, + cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, + object.DefaultProcessor(object.EndpointSliceV1beta1ToEndpoints, dns.EndpointSliceLatencyRecorder()), + ) + dns.epLock.Unlock() +} + func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder { return &object.EndpointLatencyRecorder{ ServiceFunc: func(o meta.Object) []*object.Service { @@ -262,13 +280,21 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label return c.CoreV1().Pods(ns).List(ctx, opts) } } +func endpointSliceListFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { + return func(opts meta.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = s.String() + } + return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts) + } +} func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) { if s != nil { opts.LabelSelector = s.String() } - return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts) + return c.DiscoveryV1().EndpointSlices(ns).List(ctx, opts) } } @@ -312,7 +338,7 @@ func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labe } } -func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { +func endpointSliceWatchFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { options.LabelSelector = s.String() @@ -321,6 +347,15 @@ func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns stri } } +func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.DiscoveryV1().EndpointSlices(ns).Watch(ctx, options) + } +} + func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index bf1f01664..ea99a81ce 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -20,7 +20,8 @@ import ( "github.com/miekg/dns" api "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1beta1" + discovery "k8s.io/api/discovery/v1" + discoveryV1beta1 "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" @@ -256,10 +257,15 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o go func() { if initEndpointWatch { // Revert to watching Endpoints for incompatible K8s. - // This can be remove when all supported k8s versions support endpointslices. - if ok := k.endpointSliceSupported(kubeClient); !ok { + // This can be removed when all supported k8s versions support endpointslices. + ok, v := k.endpointSliceSupported(kubeClient) + if !ok { k.APIConn.(*dnsControl).WatchEndpoints(ctx) } + // Revert to EndpointSlice v1beta1 if v1 is not supported + if ok && v == discoveryV1beta1.SchemeGroupVersion.String() { + k.APIConn.(*dnsControl).WatchEndpointSliceV1beta1(ctx) + } } k.APIConn.Run() }() @@ -290,9 +296,12 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o // endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints) // based on the supportability of endpointslices in the API and server version. It will return true when endpointslices // should be watched, and false when endpoints should be watched. -// If the API supports discovery v1 beta1, and the server versions >= 1.19, endpointslices are watched. -// This function should be removed, along with non-slice endpoint watch code, when support for k8s < 1.19 is dropped. -func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bool { +// If the API supports discovery, and the server versions >= 1.19, true is returned. +// Also returned is the discovery version supported: "v1" if v1 is supported, and v1beta1 if v1beta1 is supported and +// v1 is not supported. +// This function should be removed, when all supported versions of k8s support v1. +func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) (bool, string) { + var sliceVer string useEndpointSlices := false ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() @@ -303,9 +312,13 @@ func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bo if err != nil { continue } - // Enable use of endpoint slices if the API supports the discovery v1 beta1 api + // Enable use of endpoint slices if the API supports the discovery api if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil { useEndpointSlices = true + sliceVer = discovery.SchemeGroupVersion.String() + } else if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discoveryV1beta1.SchemeGroupVersion.String()); err == nil { + useEndpointSlices = true + sliceVer = discoveryV1beta1.SchemeGroupVersion.String() } // Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled // by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19. @@ -317,7 +330,7 @@ func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bo log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19") useEndpointSlices = false } - return useEndpointSlices + return useEndpointSlices, sliceVer } } } diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index fc25aa4fb..9a5347a9b 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -4,7 +4,8 @@ import ( "fmt" api "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1beta1" + discovery "k8s.io/api/discovery/v1" + discoveryV1beta1 "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -151,6 +152,51 @@ func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) { return e, nil } +// EndpointSliceV1beta1ToEndpoints converts a v1beta1 *discovery.EndpointSlice to a *Endpoints. +func EndpointSliceV1beta1ToEndpoints(obj meta.Object) (meta.Object, error) { + ends, ok := obj.(*discoveryV1beta1.EndpointSlice) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } + e := &Endpoints{ + Version: ends.GetResourceVersion(), + Name: ends.GetName(), + Namespace: ends.GetNamespace(), + Index: EndpointsKey(ends.Labels[discovery.LabelServiceName], ends.GetNamespace()), + Subsets: make([]EndpointSubset, 1), + } + + if len(ends.Ports) == 0 { + // Add sentinel if there are no ports. + e.Subsets[0].Ports = []EndpointPort{{Port: -1}} + } else { + e.Subsets[0].Ports = make([]EndpointPort, len(ends.Ports)) + for k, p := range ends.Ports { + ep := EndpointPort{Port: *p.Port, Name: *p.Name, Protocol: string(*p.Protocol)} + e.Subsets[0].Ports[k] = ep + } + } + + for _, end := range ends.Endpoints { + for _, a := range end.Addresses { + ea := EndpointAddress{IP: a} + if end.Hostname != nil { + ea.Hostname = *end.Hostname + } + if end.TargetRef != nil { + ea.TargetRefName = end.TargetRef.Name + } + // EndpointSlice does not contain NodeName, leave blank + e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, ea) + e.IndexIP = append(e.IndexIP, a) + } + } + + *ends = discoveryV1beta1.EndpointSlice{} + + return e, nil +} + // CopyWithoutSubsets copies e, without the subsets. func (e *Endpoints) CopyWithoutSubsets() *Endpoints { e1 := &Endpoints{