plugin/kubernetes: Support both v1 and v1beta1 EndpointSlices (#4570)
* support v1 and v1beta1 endpointslice Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * update comments Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
This commit is contained in:
parent
7354552296
commit
24547447d0
3 changed files with 106 additions and 12 deletions
|
@ -11,7 +11,8 @@ import (
|
||||||
"github.com/coredns/coredns/plugin/kubernetes/object"
|
"github.com/coredns/coredns/plugin/kubernetes/object"
|
||||||
|
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1beta1"
|
discovery "k8s.io/api/discovery/v1"
|
||||||
|
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
@ -180,6 +181,23 @@ func (dns *dnsControl) WatchEndpoints(ctx context.Context) {
|
||||||
dns.epLock.Unlock()
|
dns.epLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WatchEndpointSliceV1beta1 will set the endpoint Lister and Controller to watch v1beta1
|
||||||
|
// instead of the default v1.
|
||||||
|
func (dns *dnsControl) WatchEndpointSliceV1beta1(ctx context.Context) {
|
||||||
|
dns.epLock.Lock()
|
||||||
|
dns.epLister, dns.epController = object.NewIndexerInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: endpointSliceListFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector),
|
||||||
|
WatchFunc: endpointSliceWatchFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector),
|
||||||
|
},
|
||||||
|
&discoveryV1beta1.EndpointSlice{},
|
||||||
|
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
|
||||||
|
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
|
||||||
|
object.DefaultProcessor(object.EndpointSliceV1beta1ToEndpoints, dns.EndpointSliceLatencyRecorder()),
|
||||||
|
)
|
||||||
|
dns.epLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
|
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
|
||||||
return &object.EndpointLatencyRecorder{
|
return &object.EndpointLatencyRecorder{
|
||||||
ServiceFunc: func(o meta.Object) []*object.Service {
|
ServiceFunc: func(o meta.Object) []*object.Service {
|
||||||
|
@ -262,13 +280,21 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label
|
||||||
return c.CoreV1().Pods(ns).List(ctx, opts)
|
return c.CoreV1().Pods(ns).List(ctx, opts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
func endpointSliceListFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
|
||||||
|
return func(opts meta.ListOptions) (runtime.Object, error) {
|
||||||
|
if s != nil {
|
||||||
|
opts.LabelSelector = s.String()
|
||||||
|
}
|
||||||
|
return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
|
func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
|
||||||
return func(opts meta.ListOptions) (runtime.Object, error) {
|
return func(opts meta.ListOptions) (runtime.Object, error) {
|
||||||
if s != nil {
|
if s != nil {
|
||||||
opts.LabelSelector = s.String()
|
opts.LabelSelector = s.String()
|
||||||
}
|
}
|
||||||
return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
|
return c.DiscoveryV1().EndpointSlices(ns).List(ctx, opts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +338,7 @@ func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
func endpointSliceWatchFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
||||||
return func(options meta.ListOptions) (watch.Interface, error) {
|
return func(options meta.ListOptions) (watch.Interface, error) {
|
||||||
if s != nil {
|
if s != nil {
|
||||||
options.LabelSelector = s.String()
|
options.LabelSelector = s.String()
|
||||||
|
@ -321,6 +347,15 @@ func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns stri
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
||||||
|
return func(options meta.ListOptions) (watch.Interface, error) {
|
||||||
|
if s != nil {
|
||||||
|
options.LabelSelector = s.String()
|
||||||
|
}
|
||||||
|
return c.DiscoveryV1().EndpointSlices(ns).Watch(ctx, options)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
||||||
return func(options meta.ListOptions) (watch.Interface, error) {
|
return func(options meta.ListOptions) (watch.Interface, error) {
|
||||||
if s != nil {
|
if s != nil {
|
||||||
|
|
|
@ -20,7 +20,8 @@ import (
|
||||||
|
|
||||||
"github.com/miekg/dns"
|
"github.com/miekg/dns"
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1beta1"
|
discovery "k8s.io/api/discovery/v1"
|
||||||
|
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
@ -256,10 +257,15 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o
|
||||||
go func() {
|
go func() {
|
||||||
if initEndpointWatch {
|
if initEndpointWatch {
|
||||||
// Revert to watching Endpoints for incompatible K8s.
|
// Revert to watching Endpoints for incompatible K8s.
|
||||||
// This can be remove when all supported k8s versions support endpointslices.
|
// This can be removed when all supported k8s versions support endpointslices.
|
||||||
if ok := k.endpointSliceSupported(kubeClient); !ok {
|
ok, v := k.endpointSliceSupported(kubeClient)
|
||||||
|
if !ok {
|
||||||
k.APIConn.(*dnsControl).WatchEndpoints(ctx)
|
k.APIConn.(*dnsControl).WatchEndpoints(ctx)
|
||||||
}
|
}
|
||||||
|
// Revert to EndpointSlice v1beta1 if v1 is not supported
|
||||||
|
if ok && v == discoveryV1beta1.SchemeGroupVersion.String() {
|
||||||
|
k.APIConn.(*dnsControl).WatchEndpointSliceV1beta1(ctx)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
k.APIConn.Run()
|
k.APIConn.Run()
|
||||||
}()
|
}()
|
||||||
|
@ -290,9 +296,12 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o
|
||||||
// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints)
|
// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints)
|
||||||
// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices
|
// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices
|
||||||
// should be watched, and false when endpoints should be watched.
|
// should be watched, and false when endpoints should be watched.
|
||||||
// If the API supports discovery v1 beta1, and the server versions >= 1.19, endpointslices are watched.
|
// If the API supports discovery, and the server versions >= 1.19, true is returned.
|
||||||
// This function should be removed, along with non-slice endpoint watch code, when support for k8s < 1.19 is dropped.
|
// Also returned is the discovery version supported: "v1" if v1 is supported, and v1beta1 if v1beta1 is supported and
|
||||||
func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bool {
|
// v1 is not supported.
|
||||||
|
// This function should be removed, when all supported versions of k8s support v1.
|
||||||
|
func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) (bool, string) {
|
||||||
|
var sliceVer string
|
||||||
useEndpointSlices := false
|
useEndpointSlices := false
|
||||||
ticker := time.NewTicker(100 * time.Millisecond)
|
ticker := time.NewTicker(100 * time.Millisecond)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
@ -303,9 +312,13 @@ func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bo
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Enable use of endpoint slices if the API supports the discovery v1 beta1 api
|
// Enable use of endpoint slices if the API supports the discovery api
|
||||||
if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
|
if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
|
||||||
useEndpointSlices = true
|
useEndpointSlices = true
|
||||||
|
sliceVer = discovery.SchemeGroupVersion.String()
|
||||||
|
} else if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discoveryV1beta1.SchemeGroupVersion.String()); err == nil {
|
||||||
|
useEndpointSlices = true
|
||||||
|
sliceVer = discoveryV1beta1.SchemeGroupVersion.String()
|
||||||
}
|
}
|
||||||
// Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled
|
// Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled
|
||||||
// by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19.
|
// by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19.
|
||||||
|
@ -317,7 +330,7 @@ func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bo
|
||||||
log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
|
log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
|
||||||
useEndpointSlices = false
|
useEndpointSlices = false
|
||||||
}
|
}
|
||||||
return useEndpointSlices
|
return useEndpointSlices, sliceVer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1beta1"
|
discovery "k8s.io/api/discovery/v1"
|
||||||
|
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
)
|
)
|
||||||
|
@ -151,6 +152,51 @@ func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) {
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EndpointSliceV1beta1ToEndpoints converts a v1beta1 *discovery.EndpointSlice to a *Endpoints.
|
||||||
|
func EndpointSliceV1beta1ToEndpoints(obj meta.Object) (meta.Object, error) {
|
||||||
|
ends, ok := obj.(*discoveryV1beta1.EndpointSlice)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unexpected object %v", obj)
|
||||||
|
}
|
||||||
|
e := &Endpoints{
|
||||||
|
Version: ends.GetResourceVersion(),
|
||||||
|
Name: ends.GetName(),
|
||||||
|
Namespace: ends.GetNamespace(),
|
||||||
|
Index: EndpointsKey(ends.Labels[discovery.LabelServiceName], ends.GetNamespace()),
|
||||||
|
Subsets: make([]EndpointSubset, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ends.Ports) == 0 {
|
||||||
|
// Add sentinel if there are no ports.
|
||||||
|
e.Subsets[0].Ports = []EndpointPort{{Port: -1}}
|
||||||
|
} else {
|
||||||
|
e.Subsets[0].Ports = make([]EndpointPort, len(ends.Ports))
|
||||||
|
for k, p := range ends.Ports {
|
||||||
|
ep := EndpointPort{Port: *p.Port, Name: *p.Name, Protocol: string(*p.Protocol)}
|
||||||
|
e.Subsets[0].Ports[k] = ep
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, end := range ends.Endpoints {
|
||||||
|
for _, a := range end.Addresses {
|
||||||
|
ea := EndpointAddress{IP: a}
|
||||||
|
if end.Hostname != nil {
|
||||||
|
ea.Hostname = *end.Hostname
|
||||||
|
}
|
||||||
|
if end.TargetRef != nil {
|
||||||
|
ea.TargetRefName = end.TargetRef.Name
|
||||||
|
}
|
||||||
|
// EndpointSlice does not contain NodeName, leave blank
|
||||||
|
e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, ea)
|
||||||
|
e.IndexIP = append(e.IndexIP, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*ends = discoveryV1beta1.EndpointSlice{}
|
||||||
|
|
||||||
|
return e, nil
|
||||||
|
}
|
||||||
|
|
||||||
// CopyWithoutSubsets copies e, without the subsets.
|
// CopyWithoutSubsets copies e, without the subsets.
|
||||||
func (e *Endpoints) CopyWithoutSubsets() *Endpoints {
|
func (e *Endpoints) CopyWithoutSubsets() *Endpoints {
|
||||||
e1 := &Endpoints{
|
e1 := &Endpoints{
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue