plugin/kubernetes: Add noendpoints option (#1536)

* add noendpoints option

* go fmt
This commit is contained in:
Chris O'Haver 2018-02-16 11:05:52 -05:00 committed by John Belamaric
parent 2cad04ec10
commit 9719a47c1b
4 changed files with 96 additions and 15 deletions

View file

@ -86,6 +86,8 @@ kubernetes [ZONES...] {
to a file structured like resolv.conf.
* `ttl` allows you to set a custom TTL for responses. The default (and allowed minimum) is to use
5 seconds, the maximum is capped at 3600 seconds.
* `noendpoints` will turn off the serving of endpoint records by disabling the watch on endpoints.
All endpoint queries and headless service queries will result in an NXDOMAIN.
* `fallthrough` **[ZONES...]** If a query for a record in the zones for which the plugin is authoritative
results in NXDOMAIN, normally that is what the response will be. However, if you specify this option,
the query will instead be passed on down the plugin chain, which can include another plugin to handle

View file

@ -76,8 +76,9 @@ type dnsControl struct {
}
type dnsControlOpts struct {
initPodCache bool
resyncPeriod time.Duration
initPodCache bool
initEndpointsCache bool
resyncPeriod time.Duration
// Label handling.
labelSelector *meta.LabelSelector
selector labels.Selector
@ -113,15 +114,17 @@ func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dn
cache.Indexers{podIPIndex: podIPIndexFunc})
}
dns.epLister, dns.epController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(dns.client, namespace, dns.selector),
WatchFunc: endpointsWatchFunc(dns.client, namespace, dns.selector),
},
&api.Endpoints{},
opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc})
if opts.initEndpointsCache {
dns.epLister, dns.epController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(dns.client, namespace, dns.selector),
WatchFunc: endpointsWatchFunc(dns.client, namespace, dns.selector),
},
&api.Endpoints{},
opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc})
}
dns.nsLister.Store, dns.nsController = cache.NewInformer(
&cache.ListWatch{
@ -307,7 +310,9 @@ func (dns *dnsControl) Stop() error {
// Run starts the controller.
func (dns *dnsControl) Run() {
go dns.svcController.Run(dns.stopCh)
go dns.epController.Run(dns.stopCh)
if dns.epController != nil {
go dns.epController.Run(dns.stopCh)
}
if dns.podController != nil {
go dns.podController.Run(dns.stopCh)
}
@ -318,7 +323,10 @@ func (dns *dnsControl) Run() {
// HasSynced calls on all controllers.
func (dns *dnsControl) HasSynced() bool {
a := dns.svcController.HasSynced()
b := dns.epController.HasSynced()
b := true
if dns.epController != nil {
b = dns.epController.HasSynced()
}
c := true
if dns.podController != nil {
c = dns.podController.HasSynced()
@ -431,6 +439,9 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*api.Endpoints) {
}
func (dns *dnsControl) EndpointsList() (eps []*api.Endpoints) {
if dns.epLister == nil {
return nil
}
os := dns.epLister.List()
for _, o := range os {
ep, ok := o.(*api.Endpoints)

View file

@ -92,7 +92,8 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) {
k8s.autoPathSearch = searchFromResolvConf()
opts := dnsControlOpts{
resyncPeriod: defaultResyncPeriod,
initEndpointsCache: true,
resyncPeriod: defaultResyncPeriod,
}
k8s.opts = opts
@ -221,6 +222,11 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) {
return nil, c.Errf("transfer from is not supported with this plugin")
}
k8s.TransferTo = tos
case "noendpoints":
if len(c.RemainingArgs()) != 0 {
return nil, c.ArgErr()
}
k8s.opts.initEndpointsCache = false
default:
return nil, c.Errf("unknown property '%s'", c.Val())
}

View file

@ -491,7 +491,7 @@ kubernetes cluster.local`,
}
}
func TestKubernetesEndpointsParse(t *testing.T) {
func TestKubernetesParseEndpointPodNames(t *testing.T) {
tests := []struct {
input string // Corefile data as string
shouldErr bool // true if test case is exected to produce an error.
@ -553,3 +553,65 @@ func TestKubernetesEndpointsParse(t *testing.T) {
}
}
}
func TestKubernetesParseNoEndpoints(t *testing.T) {
tests := []struct {
input string // Corefile data as string
shouldErr bool // true if test case is exected to produce an error.
expectedErrContent string // substring from the expected error. Empty for positive cases.
expectedEndpointsInit bool
}{
// valid
{
`kubernetes coredns.local {
noendpoints
}`,
false,
"",
false,
},
// invalid
{
`kubernetes coredns.local {
noendpoints ixnay on the endpointsay
}`,
true,
"rong argument count or unexpected",
true,
},
// not set
{
`kubernetes coredns.local {
}`,
false,
"",
true,
},
}
for i, test := range tests {
c := caddy.NewTestController("dns", test.input)
k8sController, err := kubernetesParse(c)
if test.shouldErr && err == nil {
t.Errorf("Test %d: Expected error, but did not find error for input '%s'. Error was: '%v'", i, test.input, err)
}
if err != nil {
if !test.shouldErr {
t.Errorf("Test %d: Expected no error but found one for input %s. Error was: %v", i, test.input, err)
continue
}
if !strings.Contains(err.Error(), test.expectedErrContent) {
t.Errorf("Test %d: Expected error to contain: %v, found error: %v, input: %s", i, test.expectedErrContent, err, test.input)
}
continue
}
foundEndpointsInit := k8sController.opts.initEndpointsCache
if foundEndpointsInit != test.expectedEndpointsInit {
t.Errorf("Test %d: Expected kubernetes controller to be initialized with endpoints watch '%v'. Instead found endpoints watch '%v' for input '%s'", i, test.expectedEndpointsInit, foundEndpointsInit, test.input)
}
}
}