diff --git a/plugin/kubernetes/README.md b/plugin/kubernetes/README.md index 4557300c2..655e24a74 100644 --- a/plugin/kubernetes/README.md +++ b/plugin/kubernetes/README.md @@ -104,6 +104,20 @@ kubernetes [ZONES...] { Enabling zone transfer is done by using the *transfer* plugin. +## Startup + +When CoreDNS starts with the *kubernetes* plugin enabled, it will delay serving DNS for up to 5 seconds +until it can connect to the Kubernetes API and synchronize all object watches. If this cannot happen within +5 seconds, then CoreDNS will start serving DNS while the *kubernetes* plugin continues to try to connect +and synchronize all object watches. CoreDNS will answer SERVFAIL to any request made for a Kubernetes record +that has not yet been synchronized. + +## Monitoring Kubernetes Endpoints + +By default the *kubernetes* plugin watches Endpoints via the `discovery.EndpointSlices` API. However the +`api.Endpoints` API is used instead if the Kubernetes version does not support the `EndpointSliceProxying` +feature gate by default (i.e. Kubernetes version < 1.19). + ## Ready This plugin reports readiness to the ready plugin. This will happen after it has synced to the diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index d10d9f313..890785d71 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -59,6 +59,10 @@ type dnsControl struct { selector labels.Selector namespaceSelector labels.Selector + // epLock is used to lock reads of epLister and epController while they are being replaced + // with the api.Endpoints Lister/Controller on k8s systems that don't use discovery.EndpointSlices + epLock sync.RWMutex + svcController cache.Controller podController cache.Controller epController cache.Controller @@ -83,7 +87,6 @@ type dnsControl struct { type dnsControlOpts struct { initPodCache bool initEndpointsCache bool - useEndpointSlices bool ignoreEmptyService bool // Label handling. @@ -132,32 +135,18 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts } if opts.initEndpointsCache { - var ( - apiObj runtime.Object - listWatch cache.ListWatch - to object.ToFunc - latency *object.EndpointLatencyRecorder - ) - if opts.useEndpointSlices { - apiObj = &discovery.EndpointSlice{} - listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) - listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) - to = object.EndpointSliceToEndpoints - latency = dns.EndpointSliceLatencyRecorder() - } else { - apiObj = &api.Endpoints{} - listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) - listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) - to = object.ToEndpoints - latency = dns.EndpointsLatencyRecorder() - } + dns.epLock.Lock() dns.epLister, dns.epController = object.NewIndexerInformer( - &listWatch, - apiObj, + &cache.ListWatch{ + ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + }, + &discovery.EndpointSlice{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(to, latency), + object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()), ) + dns.epLock.Unlock() } dns.nsLister, dns.nsController = cache.NewInformer( @@ -172,6 +161,25 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts return &dns } +// WatchEndpoints will set the endpoint Lister and Controller to watch object.Endpoints +// instead of the default discovery.EndpointSlice. This is used in older k8s clusters where +// discovery.EndpointSlice is not fully supported. +// This can be removed when all supported k8s versions fully support EndpointSlice. +func (dns *dnsControl) WatchEndpoints(ctx context.Context) { + dns.epLock.Lock() + dns.epLister, dns.epController = object.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + }, + &api.Endpoints{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, + cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, + object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder()), + ) + dns.epLock.Unlock() +} + func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder { return &object.EndpointLatencyRecorder{ ServiceFunc: func(o meta.Object) []*object.Service { @@ -351,7 +359,11 @@ func (dns *dnsControl) Stop() error { func (dns *dnsControl) Run() { go dns.svcController.Run(dns.stopCh) if dns.epController != nil { - go dns.epController.Run(dns.stopCh) + go func() { + dns.epLock.RLock() + dns.epController.Run(dns.stopCh) + dns.epLock.RUnlock() + }() } if dns.podController != nil { go dns.podController.Run(dns.stopCh) @@ -365,7 +377,9 @@ func (dns *dnsControl) HasSynced() bool { a := dns.svcController.HasSynced() b := true if dns.epController != nil { + dns.epLock.RLock() b = dns.epController.HasSynced() + dns.epLock.RUnlock() } c := true if dns.podController != nil { @@ -388,6 +402,8 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) { } func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) { + dns.epLock.RLock() + defer dns.epLock.RUnlock() os := dns.epLister.List() for _, o := range os { ep, ok := o.(*object.Endpoints) @@ -446,6 +462,8 @@ func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) { } func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) { + dns.epLock.RLock() + defer dns.epLock.RUnlock() os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx) if err != nil { return nil @@ -461,6 +479,8 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) { } func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) { + dns.epLock.RLock() + defer dns.epLock.RUnlock() os, err := dns.epLister.ByIndex(epIPIndex, ip) if err != nil { return nil diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index 7b3d822c2..bf1f01664 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -8,6 +8,7 @@ import ( "net" "strconv" "strings" + "time" "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/etcd/msg" @@ -213,22 +214,22 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) { } // InitKubeCache initializes a new Kubernetes cache. -func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) { +func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, onShut func() error, err error) { config, err := k.getClientConfig() if err != nil { - return err + return nil, nil, err } kubeClient, err := kubernetes.NewForConfig(config) if err != nil { - return fmt.Errorf("failed to create kubernetes notification controller: %q", err) + return nil, nil, fmt.Errorf("failed to create kubernetes notification controller: %q", err) } if k.opts.labelSelector != nil { var selector labels.Selector selector, err = meta.LabelSelectorAsSelector(k.opts.labelSelector) if err != nil { - return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err) + return nil, nil, fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err) } k.opts.selector = selector } @@ -237,7 +238,7 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) { var selector labels.Selector selector, err = meta.LabelSelectorAsSelector(k.opts.namespaceLabelSelector) if err != nil { - return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err) + return nil, nil, fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err) } k.opts.namespaceSelector = selector } @@ -246,25 +247,79 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) { k.opts.zones = k.Zones k.opts.endpointNameMode = k.endpointNameMode - // Enable use of endpoint slices if the API supports the discovery v1 beta1 api - if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil { - k.opts.useEndpointSlices = true - } - // Disable use of endpoint slices for k8s versions 1.18 and earlier. Endpoint slices were - // introduced in 1.17 but EndpointSliceMirroring was not added until 1.19. - // if err != nil, we continue with the above default which is to use endpoint slices. - if sv, err := kubeClient.ServerVersion(); err == nil { - major, _ := strconv.Atoi(sv.Major) - minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+")) - if k.opts.useEndpointSlices && major <= 1 && minor <= 18 { - log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19") - k.opts.useEndpointSlices = false - } - } k.APIConn = newdnsController(ctx, kubeClient, k.opts) - return err + initEndpointWatch := k.opts.initEndpointsCache + + onStart = func() error { + go func() { + if initEndpointWatch { + // Revert to watching Endpoints for incompatible K8s. + // This can be remove when all supported k8s versions support endpointslices. + if ok := k.endpointSliceSupported(kubeClient); !ok { + k.APIConn.(*dnsControl).WatchEndpoints(ctx) + } + } + k.APIConn.Run() + }() + + timeout := time.After(5 * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if k.APIConn.HasSynced() { + return nil + } + case <-timeout: + log.Warning("starting server with unsynced Kubernetes API") + return nil + } + } + } + + onShut = func() error { + return k.APIConn.Stop() + } + + return onStart, onShut, err +} + +// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints) +// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices +// should be watched, and false when endpoints should be watched. +// If the API supports discovery v1 beta1, and the server versions >= 1.19, endpointslices are watched. +// This function should be removed, along with non-slice endpoint watch code, when support for k8s < 1.19 is dropped. +func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bool { + useEndpointSlices := false + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + sv, err := kubeClient.ServerVersion() + if err != nil { + continue + } + // Enable use of endpoint slices if the API supports the discovery v1 beta1 api + if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil { + useEndpointSlices = true + } + // Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled + // by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19. + // DNS results should be built from the same source data that the proxy uses. This decision assumes + // k8s EndpointSliceProxying featuregate is at the default (i.e. only enabled for k8s >= 1.19). + major, _ := strconv.Atoi(sv.Major) + minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+")) + if useEndpointSlices && major <= 1 && minor <= 18 { + log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19") + useEndpointSlices = false + } + return useEndpointSlices + } + } } // Records looks up services in kubernetes. diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go index 89ec439fb..8b9bd2c42 100644 --- a/plugin/kubernetes/setup.go +++ b/plugin/kubernetes/setup.go @@ -7,7 +7,6 @@ import ( "os" "strconv" "strings" - "time" "github.com/coredns/caddy" "github.com/coredns/coredns/core/dnsserver" @@ -39,12 +38,16 @@ func setup(c *caddy.Controller) error { return plugin.Error(pluginName, err) } - err = k.InitKubeCache(context.Background()) + onStart, onShut, err := k.InitKubeCache(context.Background()) if err != nil { return plugin.Error(pluginName, err) } - - k.RegisterKubeCache(c) + if onStart != nil { + c.OnStartup(onStart) + } + if onShut != nil { + c.OnShutdown(onShut) + } dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { k.Next = next @@ -60,30 +63,6 @@ func setup(c *caddy.Controller) error { return nil } -// RegisterKubeCache registers KubeCache start and stop functions with Caddy -func (k *Kubernetes) RegisterKubeCache(c *caddy.Controller) { - c.OnStartup(func() error { - go k.APIConn.Run() - - timeout := time.After(5 * time.Second) - ticker := time.NewTicker(100 * time.Millisecond) - for { - select { - case <-ticker.C: - if k.APIConn.HasSynced() { - return nil - } - case <-timeout: - return nil - } - } - }) - - c.OnShutdown(func() error { - return k.APIConn.Stop() - }) -} - func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) { var ( k8s *Kubernetes @@ -113,7 +92,6 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) { opts := dnsControlOpts{ initEndpointsCache: true, - useEndpointSlices: false, ignoreEmptyService: false, } k8s.opts = opts