plugin/kubernetes: do endpoint/slice check in retry loop (#4492)

* do endpoint/slice check in retry loop

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
This commit is contained in:
Chris O'Haver 2021-03-26 08:54:39 -04:00 committed by GitHub
parent ea41dd23a0
commit 9f72db12e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 141 additions and 74 deletions

View file

@ -104,6 +104,20 @@ kubernetes [ZONES...] {
Enabling zone transfer is done by using the *transfer* plugin. Enabling zone transfer is done by using the *transfer* plugin.
## Startup
When CoreDNS starts with the *kubernetes* plugin enabled, it will delay serving DNS for up to 5 seconds
until it can connect to the Kubernetes API and synchronize all object watches. If this cannot happen within
5 seconds, then CoreDNS will start serving DNS while the *kubernetes* plugin continues to try to connect
and synchronize all object watches. CoreDNS will answer SERVFAIL to any request made for a Kubernetes record
that has not yet been synchronized.
## Monitoring Kubernetes Endpoints
By default the *kubernetes* plugin watches Endpoints via the `discovery.EndpointSlices` API. However the
`api.Endpoints` API is used instead if the Kubernetes version does not support the `EndpointSliceProxying`
feature gate by default (i.e. Kubernetes version < 1.19).
## Ready ## Ready
This plugin reports readiness to the ready plugin. This will happen after it has synced to the This plugin reports readiness to the ready plugin. This will happen after it has synced to the

View file

@ -59,6 +59,10 @@ type dnsControl struct {
selector labels.Selector selector labels.Selector
namespaceSelector labels.Selector namespaceSelector labels.Selector
// epLock is used to lock reads of epLister and epController while they are being replaced
// with the api.Endpoints Lister/Controller on k8s systems that don't use discovery.EndpointSlices
epLock sync.RWMutex
svcController cache.Controller svcController cache.Controller
podController cache.Controller podController cache.Controller
epController cache.Controller epController cache.Controller
@ -83,7 +87,6 @@ type dnsControl struct {
type dnsControlOpts struct { type dnsControlOpts struct {
initPodCache bool initPodCache bool
initEndpointsCache bool initEndpointsCache bool
useEndpointSlices bool
ignoreEmptyService bool ignoreEmptyService bool
// Label handling. // Label handling.
@ -132,32 +135,18 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
} }
if opts.initEndpointsCache { if opts.initEndpointsCache {
var ( dns.epLock.Lock()
apiObj runtime.Object
listWatch cache.ListWatch
to object.ToFunc
latency *object.EndpointLatencyRecorder
)
if opts.useEndpointSlices {
apiObj = &discovery.EndpointSlice{}
listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
to = object.EndpointSliceToEndpoints
latency = dns.EndpointSliceLatencyRecorder()
} else {
apiObj = &api.Endpoints{}
listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
to = object.ToEndpoints
latency = dns.EndpointsLatencyRecorder()
}
dns.epLister, dns.epController = object.NewIndexerInformer( dns.epLister, dns.epController = object.NewIndexerInformer(
&listWatch, &cache.ListWatch{
apiObj, ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(to, latency), object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
) )
dns.epLock.Unlock()
} }
dns.nsLister, dns.nsController = cache.NewInformer( dns.nsLister, dns.nsController = cache.NewInformer(
@ -172,6 +161,25 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
return &dns return &dns
} }
// WatchEndpoints will set the endpoint Lister and Controller to watch object.Endpoints
// instead of the default discovery.EndpointSlice. This is used in older k8s clusters where
// discovery.EndpointSlice is not fully supported.
// This can be removed when all supported k8s versions fully support EndpointSlice.
func (dns *dnsControl) WatchEndpoints(ctx context.Context) {
dns.epLock.Lock()
dns.epLister, dns.epController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Endpoints{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder()),
)
dns.epLock.Unlock()
}
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder { func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{ return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service { ServiceFunc: func(o meta.Object) []*object.Service {
@ -351,7 +359,11 @@ func (dns *dnsControl) Stop() error {
func (dns *dnsControl) Run() { func (dns *dnsControl) Run() {
go dns.svcController.Run(dns.stopCh) go dns.svcController.Run(dns.stopCh)
if dns.epController != nil { if dns.epController != nil {
go dns.epController.Run(dns.stopCh) go func() {
dns.epLock.RLock()
dns.epController.Run(dns.stopCh)
dns.epLock.RUnlock()
}()
} }
if dns.podController != nil { if dns.podController != nil {
go dns.podController.Run(dns.stopCh) go dns.podController.Run(dns.stopCh)
@ -365,7 +377,9 @@ func (dns *dnsControl) HasSynced() bool {
a := dns.svcController.HasSynced() a := dns.svcController.HasSynced()
b := true b := true
if dns.epController != nil { if dns.epController != nil {
dns.epLock.RLock()
b = dns.epController.HasSynced() b = dns.epController.HasSynced()
dns.epLock.RUnlock()
} }
c := true c := true
if dns.podController != nil { if dns.podController != nil {
@ -388,6 +402,8 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
} }
func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) { func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
dns.epLock.RLock()
defer dns.epLock.RUnlock()
os := dns.epLister.List() os := dns.epLister.List()
for _, o := range os { for _, o := range os {
ep, ok := o.(*object.Endpoints) ep, ok := o.(*object.Endpoints)
@ -446,6 +462,8 @@ func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) {
} }
func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) { func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
dns.epLock.RLock()
defer dns.epLock.RUnlock()
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx) os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
if err != nil { if err != nil {
return nil return nil
@ -461,6 +479,8 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
} }
func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) { func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
dns.epLock.RLock()
defer dns.epLock.RUnlock()
os, err := dns.epLister.ByIndex(epIPIndex, ip) os, err := dns.epLister.ByIndex(epIPIndex, ip)
if err != nil { if err != nil {
return nil return nil

View file

@ -8,6 +8,7 @@ import (
"net" "net"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/etcd/msg" "github.com/coredns/coredns/plugin/etcd/msg"
@ -213,22 +214,22 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
} }
// InitKubeCache initializes a new Kubernetes cache. // InitKubeCache initializes a new Kubernetes cache.
func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) { func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, onShut func() error, err error) {
config, err := k.getClientConfig() config, err := k.getClientConfig()
if err != nil { if err != nil {
return err return nil, nil, err
} }
kubeClient, err := kubernetes.NewForConfig(config) kubeClient, err := kubernetes.NewForConfig(config)
if err != nil { if err != nil {
return fmt.Errorf("failed to create kubernetes notification controller: %q", err) return nil, nil, fmt.Errorf("failed to create kubernetes notification controller: %q", err)
} }
if k.opts.labelSelector != nil { if k.opts.labelSelector != nil {
var selector labels.Selector var selector labels.Selector
selector, err = meta.LabelSelectorAsSelector(k.opts.labelSelector) selector, err = meta.LabelSelectorAsSelector(k.opts.labelSelector)
if err != nil { if err != nil {
return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err) return nil, nil, fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err)
} }
k.opts.selector = selector k.opts.selector = selector
} }
@ -237,7 +238,7 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
var selector labels.Selector var selector labels.Selector
selector, err = meta.LabelSelectorAsSelector(k.opts.namespaceLabelSelector) selector, err = meta.LabelSelectorAsSelector(k.opts.namespaceLabelSelector)
if err != nil { if err != nil {
return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err) return nil, nil, fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err)
} }
k.opts.namespaceSelector = selector k.opts.namespaceSelector = selector
} }
@ -246,25 +247,79 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
k.opts.zones = k.Zones k.opts.zones = k.Zones
k.opts.endpointNameMode = k.endpointNameMode k.opts.endpointNameMode = k.endpointNameMode
// Enable use of endpoint slices if the API supports the discovery v1 beta1 api
if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
k.opts.useEndpointSlices = true
}
// Disable use of endpoint slices for k8s versions 1.18 and earlier. Endpoint slices were
// introduced in 1.17 but EndpointSliceMirroring was not added until 1.19.
// if err != nil, we continue with the above default which is to use endpoint slices.
if sv, err := kubeClient.ServerVersion(); err == nil {
major, _ := strconv.Atoi(sv.Major)
minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+"))
if k.opts.useEndpointSlices && major <= 1 && minor <= 18 {
log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
k.opts.useEndpointSlices = false
}
}
k.APIConn = newdnsController(ctx, kubeClient, k.opts) k.APIConn = newdnsController(ctx, kubeClient, k.opts)
return err initEndpointWatch := k.opts.initEndpointsCache
onStart = func() error {
go func() {
if initEndpointWatch {
// Revert to watching Endpoints for incompatible K8s.
// This can be remove when all supported k8s versions support endpointslices.
if ok := k.endpointSliceSupported(kubeClient); !ok {
k.APIConn.(*dnsControl).WatchEndpoints(ctx)
}
}
k.APIConn.Run()
}()
timeout := time.After(5 * time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if k.APIConn.HasSynced() {
return nil
}
case <-timeout:
log.Warning("starting server with unsynced Kubernetes API")
return nil
}
}
}
onShut = func() error {
return k.APIConn.Stop()
}
return onStart, onShut, err
}
// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints)
// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices
// should be watched, and false when endpoints should be watched.
// If the API supports discovery v1 beta1, and the server versions >= 1.19, endpointslices are watched.
// This function should be removed, along with non-slice endpoint watch code, when support for k8s < 1.19 is dropped.
func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bool {
useEndpointSlices := false
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
sv, err := kubeClient.ServerVersion()
if err != nil {
continue
}
// Enable use of endpoint slices if the API supports the discovery v1 beta1 api
if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
useEndpointSlices = true
}
// Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled
// by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19.
// DNS results should be built from the same source data that the proxy uses. This decision assumes
// k8s EndpointSliceProxying featuregate is at the default (i.e. only enabled for k8s >= 1.19).
major, _ := strconv.Atoi(sv.Major)
minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+"))
if useEndpointSlices && major <= 1 && minor <= 18 {
log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
useEndpointSlices = false
}
return useEndpointSlices
}
}
} }
// Records looks up services in kubernetes. // Records looks up services in kubernetes.

View file

@ -7,7 +7,6 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/coredns/caddy" "github.com/coredns/caddy"
"github.com/coredns/coredns/core/dnsserver" "github.com/coredns/coredns/core/dnsserver"
@ -39,12 +38,16 @@ func setup(c *caddy.Controller) error {
return plugin.Error(pluginName, err) return plugin.Error(pluginName, err)
} }
err = k.InitKubeCache(context.Background()) onStart, onShut, err := k.InitKubeCache(context.Background())
if err != nil { if err != nil {
return plugin.Error(pluginName, err) return plugin.Error(pluginName, err)
} }
if onStart != nil {
k.RegisterKubeCache(c) c.OnStartup(onStart)
}
if onShut != nil {
c.OnShutdown(onShut)
}
dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
k.Next = next k.Next = next
@ -60,30 +63,6 @@ func setup(c *caddy.Controller) error {
return nil return nil
} }
// RegisterKubeCache registers KubeCache start and stop functions with Caddy
func (k *Kubernetes) RegisterKubeCache(c *caddy.Controller) {
c.OnStartup(func() error {
go k.APIConn.Run()
timeout := time.After(5 * time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
if k.APIConn.HasSynced() {
return nil
}
case <-timeout:
return nil
}
}
})
c.OnShutdown(func() error {
return k.APIConn.Stop()
})
}
func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) { func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
var ( var (
k8s *Kubernetes k8s *Kubernetes
@ -113,7 +92,6 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) {
opts := dnsControlOpts{ opts := dnsControlOpts{
initEndpointsCache: true, initEndpointsCache: true,
useEndpointSlices: false,
ignoreEmptyService: false, ignoreEmptyService: false,
} }
k8s.opts = opts k8s.opts = opts