From ca5097ca138553d081c82db24d1cb2baa96bfca3 Mon Sep 17 00:00:00 2001 From: Chris O'Haver Date: Mon, 12 Feb 2018 14:27:16 -0500 Subject: [PATCH] reqd changes (#1522) --- plugin/kubernetes/autopath.go | 2 +- plugin/kubernetes/kubernetes.go | 17 +- plugin/kubernetes/setup.go | 323 +++++++++++++----------- plugin/kubernetes/setup_reverse_test.go | 2 +- plugin/kubernetes/setup_test.go | 10 +- plugin/kubernetes/setup_ttl_test.go | 2 +- 6 files changed, 187 insertions(+), 169 deletions(-) diff --git a/plugin/kubernetes/autopath.go b/plugin/kubernetes/autopath.go index 45fe26088..cf5a7f06a 100644 --- a/plugin/kubernetes/autopath.go +++ b/plugin/kubernetes/autopath.go @@ -10,7 +10,7 @@ import ( // AutoPath implements the AutoPathFunc call from the autopath plugin. // It returns a per-query search path or nil indicating no searchpathing should happen. func (k *Kubernetes) AutoPath(state request.Request) []string { - // Check if the query falls in a zone we are actually authoriative for and thus if we want autopath. + // Check if the query falls in a zone we are actually authoritative for and thus if we want autopath. zone := plugin.Zones(k.Zones).Matches(state.Name()) if zone == "" { return nil diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index 6afb1d83f..fc648208d 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -43,6 +43,7 @@ type Kubernetes struct { endpointNameMode bool Fall fall.F ttl uint32 + opts dnsControlOpts primaryZoneIndex int interfaceAddrsFunc func() net.IP @@ -238,8 +239,8 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) { } -// initKubeCache initializes a new Kubernetes cache. -func (k *Kubernetes) initKubeCache(opts dnsControlOpts) (err error) { +// InitKubeCache initializes a new Kubernetes cache. +func (k *Kubernetes) InitKubeCache() (err error) { config, err := k.getClientConfig() if err != nil { @@ -251,18 +252,18 @@ func (k *Kubernetes) initKubeCache(opts dnsControlOpts) (err error) { return fmt.Errorf("failed to create kubernetes notification controller: %q", err) } - if opts.labelSelector != nil { + if k.opts.labelSelector != nil { var selector labels.Selector - selector, err = meta.LabelSelectorAsSelector(opts.labelSelector) + selector, err = meta.LabelSelectorAsSelector(k.opts.labelSelector) if err != nil { - return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", opts.labelSelector, err) + return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err) } - opts.selector = selector + k.opts.selector = selector } - opts.initPodCache = k.podMode == podModeVerified + k.opts.initPodCache = k.podMode == podModeVerified - k.APIConn = newdnsController(kubeClient, opts) + k.APIConn = newdnsController(kubeClient, k.opts) return err } diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go index f79724dee..fda3e1701 100644 --- a/plugin/kubernetes/setup.go +++ b/plugin/kubernetes/setup.go @@ -26,25 +26,36 @@ func init() { } func setup(c *caddy.Controller) error { - kubernetes, initOpts, err := kubernetesParse(c) + k, err := kubernetesParse(c) if err != nil { return plugin.Error("kubernetes", err) } - err = kubernetes.initKubeCache(initOpts) + err = k.InitKubeCache() if err != nil { return plugin.Error("kubernetes", err) } - // Register KubeCache start and stop functions with Caddy + k.RegisterKubeCache(c) + + dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { + k.Next = next + return k + }) + + return nil +} + +// RegisterKubeCache registers KubeCache start and stop functions with Caddy +func (k *Kubernetes) RegisterKubeCache(c *caddy.Controller) { c.OnStartup(func() error { - go kubernetes.APIConn.Run() - if kubernetes.APIProxy != nil { - kubernetes.APIProxy.Run() + go k.APIConn.Run() + if k.APIProxy != nil { + k.APIProxy.Run() } synced := false for synced == false { - synced = kubernetes.APIConn.HasSynced() + synced = k.APIConn.HasSynced() time.Sleep(100 * time.Millisecond) } @@ -52,21 +63,31 @@ func setup(c *caddy.Controller) error { }) c.OnShutdown(func() error { - if kubernetes.APIProxy != nil { - kubernetes.APIProxy.Stop() + if k.APIProxy != nil { + k.APIProxy.Stop() } - return kubernetes.APIConn.Stop() + return k.APIConn.Stop() }) - - dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { - kubernetes.Next = next - return kubernetes - }) - - return nil } -func kubernetesParse(c *caddy.Controller) (*Kubernetes, dnsControlOpts, error) { +func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) { + var k8s *Kubernetes + var err error + for i := 1; c.Next(); i++ { + if i > 1 { + return nil, fmt.Errorf("only one kubernetes section allowed per server block") + } + k8s, err = ParseStanza(c) + if err != nil { + return k8s, err + } + } + return k8s, nil +} + +// ParseStanza parses a kubernetes stanza +func ParseStanza(c *caddy.Controller) (*Kubernetes, error) { + k8s := New([]string{""}) k8s.interfaceAddrsFunc = localPodIP k8s.autoPathSearch = searchFromResolvConf() @@ -74,145 +95,141 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, dnsControlOpts, error) { opts := dnsControlOpts{ resyncPeriod: defaultResyncPeriod, } + k8s.opts = opts - for i := 1; c.Next(); i++ { - if i > 1 { - return nil, opts, fmt.Errorf("only one kubernetes section allowed per server block") + zones := c.RemainingArgs() + + if len(zones) != 0 { + k8s.Zones = zones + for i := 0; i < len(k8s.Zones); i++ { + k8s.Zones[i] = plugin.Host(k8s.Zones[i]).Normalize() } - zones := c.RemainingArgs() - - if len(zones) != 0 { - k8s.Zones = zones - for i := 0; i < len(k8s.Zones); i++ { - k8s.Zones[i] = plugin.Host(k8s.Zones[i]).Normalize() - } - } else { - k8s.Zones = make([]string, len(c.ServerBlockKeys)) - for i := 0; i < len(c.ServerBlockKeys); i++ { - k8s.Zones[i] = plugin.Host(c.ServerBlockKeys[i]).Normalize() - } - } - - k8s.primaryZoneIndex = -1 - for i, z := range k8s.Zones { - if strings.HasSuffix(z, "in-addr.arpa.") || strings.HasSuffix(z, "ip6.arpa.") { - continue - } - k8s.primaryZoneIndex = i - break - } - - if k8s.primaryZoneIndex == -1 { - return nil, opts, errors.New("non-reverse zone name must be used") - } - - for c.NextBlock() { - switch c.Val() { - case "endpoint_pod_names": - args := c.RemainingArgs() - if len(args) > 0 { - return nil, opts, c.ArgErr() - } - k8s.endpointNameMode = true - continue - case "pods": - args := c.RemainingArgs() - if len(args) == 1 { - switch args[0] { - case podModeDisabled, podModeInsecure, podModeVerified: - k8s.podMode = args[0] - default: - return nil, opts, fmt.Errorf("wrong value for pods: %s, must be one of: disabled, verified, insecure", args[0]) - } - continue - } - return nil, opts, c.ArgErr() - case "namespaces": - args := c.RemainingArgs() - if len(args) > 0 { - for _, a := range args { - k8s.Namespaces[a] = true - } - continue - } - return nil, opts, c.ArgErr() - case "endpoint": - args := c.RemainingArgs() - if len(args) > 0 { - k8s.APIServerList = args - continue - } - return nil, opts, c.ArgErr() - case "tls": // cert key cacertfile - args := c.RemainingArgs() - if len(args) == 3 { - k8s.APIClientCert, k8s.APIClientKey, k8s.APICertAuth = args[0], args[1], args[2] - continue - } - return nil, opts, c.ArgErr() - case "resyncperiod": - args := c.RemainingArgs() - if len(args) > 0 { - rp, err := time.ParseDuration(args[0]) - if err != nil { - return nil, opts, fmt.Errorf("unable to parse resync duration value: '%v': %v", args[0], err) - } - opts.resyncPeriod = rp - continue - } - return nil, opts, c.ArgErr() - case "labels": - args := c.RemainingArgs() - if len(args) > 0 { - labelSelectorString := strings.Join(args, " ") - ls, err := meta.ParseToLabelSelector(labelSelectorString) - if err != nil { - return nil, opts, fmt.Errorf("unable to parse label selector value: '%v': %v", labelSelectorString, err) - } - opts.labelSelector = ls - continue - } - return nil, opts, c.ArgErr() - case "fallthrough": - k8s.Fall.SetZonesFromArgs(c.RemainingArgs()) - case "upstream": - args := c.RemainingArgs() - if len(args) == 0 { - return nil, opts, c.ArgErr() - } - ups, err := dnsutil.ParseHostPortOrFile(args...) - if err != nil { - return nil, opts, err - } - k8s.Proxy = proxy.NewLookup(ups) - case "ttl": - args := c.RemainingArgs() - if len(args) == 0 { - return nil, opts, c.ArgErr() - } - t, err := strconv.Atoi(args[0]) - if err != nil { - return nil, opts, err - } - if t < 5 || t > 3600 { - return nil, opts, c.Errf("ttl must be in range [5, 3600]: %d", t) - } - k8s.ttl = uint32(t) - case "transfer": - tos, froms, err := parse.Transfer(c, false) - if err != nil { - return nil, opts, err - } - if len(froms) != 0 { - return nil, opts, c.Errf("transfer from is not supported with this plugin") - } - k8s.TransferTo = tos - default: - return nil, opts, c.Errf("unknown property '%s'", c.Val()) - } + } else { + k8s.Zones = make([]string, len(c.ServerBlockKeys)) + for i := 0; i < len(c.ServerBlockKeys); i++ { + k8s.Zones[i] = plugin.Host(c.ServerBlockKeys[i]).Normalize() } } - return k8s, opts, nil + + k8s.primaryZoneIndex = -1 + for i, z := range k8s.Zones { + if strings.HasSuffix(z, "in-addr.arpa.") || strings.HasSuffix(z, "ip6.arpa.") { + continue + } + k8s.primaryZoneIndex = i + break + } + + if k8s.primaryZoneIndex == -1 { + return nil, errors.New("non-reverse zone name must be used") + } + + for c.NextBlock() { + switch c.Val() { + case "endpoint_pod_names": + args := c.RemainingArgs() + if len(args) > 0 { + return nil, c.ArgErr() + } + k8s.endpointNameMode = true + continue + case "pods": + args := c.RemainingArgs() + if len(args) == 1 { + switch args[0] { + case podModeDisabled, podModeInsecure, podModeVerified: + k8s.podMode = args[0] + default: + return nil, fmt.Errorf("wrong value for pods: %s, must be one of: disabled, verified, insecure", args[0]) + } + continue + } + return nil, c.ArgErr() + case "namespaces": + args := c.RemainingArgs() + if len(args) > 0 { + for _, a := range args { + k8s.Namespaces[a] = true + } + continue + } + return nil, c.ArgErr() + case "endpoint": + args := c.RemainingArgs() + if len(args) > 0 { + k8s.APIServerList = args + continue + } + return nil, c.ArgErr() + case "tls": // cert key cacertfile + args := c.RemainingArgs() + if len(args) == 3 { + k8s.APIClientCert, k8s.APIClientKey, k8s.APICertAuth = args[0], args[1], args[2] + continue + } + return nil, c.ArgErr() + case "resyncperiod": + args := c.RemainingArgs() + if len(args) > 0 { + rp, err := time.ParseDuration(args[0]) + if err != nil { + return nil, fmt.Errorf("unable to parse resync duration value: '%v': %v", args[0], err) + } + k8s.opts.resyncPeriod = rp + continue + } + return nil, c.ArgErr() + case "labels": + args := c.RemainingArgs() + if len(args) > 0 { + labelSelectorString := strings.Join(args, " ") + ls, err := meta.ParseToLabelSelector(labelSelectorString) + if err != nil { + return nil, fmt.Errorf("unable to parse label selector value: '%v': %v", labelSelectorString, err) + } + k8s.opts.labelSelector = ls + continue + } + return nil, c.ArgErr() + case "fallthrough": + k8s.Fall.SetZonesFromArgs(c.RemainingArgs()) + case "upstream": + args := c.RemainingArgs() + if len(args) == 0 { + return nil, c.ArgErr() + } + ups, err := dnsutil.ParseHostPortOrFile(args...) + if err != nil { + return nil, err + } + k8s.Proxy = proxy.NewLookup(ups) + case "ttl": + args := c.RemainingArgs() + if len(args) == 0 { + return nil, c.ArgErr() + } + t, err := strconv.Atoi(args[0]) + if err != nil { + return nil, err + } + if t < 5 || t > 3600 { + return nil, c.Errf("ttl must be in range [5, 3600]: %d", t) + } + k8s.ttl = uint32(t) + case "transfer": + tos, froms, err := parse.Transfer(c, false) + if err != nil { + return nil, err + } + if len(froms) != 0 { + return nil, c.Errf("transfer from is not supported with this plugin") + } + k8s.TransferTo = tos + default: + return nil, c.Errf("unknown property '%s'", c.Val()) + } + } + return k8s, nil } func searchFromResolvConf() []string { diff --git a/plugin/kubernetes/setup_reverse_test.go b/plugin/kubernetes/setup_reverse_test.go index 3ba92a9ec..a2c69d96d 100644 --- a/plugin/kubernetes/setup_reverse_test.go +++ b/plugin/kubernetes/setup_reverse_test.go @@ -18,7 +18,7 @@ func TestKubernetesParseReverseZone(t *testing.T) { for i, tc := range tests { c := caddy.NewTestController("dns", tc.input) - k, _, err := kubernetesParse(c) + k, err := kubernetesParse(c) if err != nil { t.Fatalf("Test %d: Expected no error, got %q", i, err) } diff --git a/plugin/kubernetes/setup_test.go b/plugin/kubernetes/setup_test.go index fed8513cf..4d9124332 100644 --- a/plugin/kubernetes/setup_test.go +++ b/plugin/kubernetes/setup_test.go @@ -400,7 +400,7 @@ kubernetes cluster.local`, for i, test := range tests { c := caddy.NewTestController("dns", test.input) - k8sController, opts, err := kubernetesParse(c) + k8sController, err := kubernetesParse(c) if test.shouldErr && err == nil { t.Errorf("Test %d: Expected error, but did not find error for input '%s'. Error was: '%v'", i, test.input, err) @@ -440,14 +440,14 @@ kubernetes cluster.local`, } // ResyncPeriod - foundResyncPeriod := opts.resyncPeriod + foundResyncPeriod := k8sController.opts.resyncPeriod if foundResyncPeriod != test.expectedResyncPeriod { t.Errorf("Test %d: Expected kubernetes controller to be initialized with resync period '%s'. Instead found period '%s' for input '%s'", i, test.expectedResyncPeriod, foundResyncPeriod, test.input) } // Labels - if opts.labelSelector != nil { - foundLabelSelectorString := meta.FormatLabelSelector(opts.labelSelector) + if k8sController.opts.labelSelector != nil { + foundLabelSelectorString := meta.FormatLabelSelector(k8sController.opts.labelSelector) if foundLabelSelectorString != test.expectedLabelSelector { t.Errorf("Test %d: Expected kubernetes controller to be initialized with label selector '%s'. Instead found selector '%s' for input '%s'", i, test.expectedLabelSelector, foundLabelSelectorString, test.input) } @@ -524,7 +524,7 @@ func TestKubernetesEndpointsParse(t *testing.T) { for i, test := range tests { c := caddy.NewTestController("dns", test.input) - k8sController, _, err := kubernetesParse(c) + k8sController, err := kubernetesParse(c) if test.shouldErr && err == nil { t.Errorf("Test %d: Expected error, but did not find error for input '%s'. Error was: '%v'", i, test.input, err) diff --git a/plugin/kubernetes/setup_ttl_test.go b/plugin/kubernetes/setup_ttl_test.go index d58f91576..e923c7846 100644 --- a/plugin/kubernetes/setup_ttl_test.go +++ b/plugin/kubernetes/setup_ttl_test.go @@ -26,7 +26,7 @@ func TestKubernetesParseTTL(t *testing.T) { for i, tc := range tests { c := caddy.NewTestController("dns", tc.input) - k, _, err := kubernetesParse(c) + k, err := kubernetesParse(c) if err != nil && !tc.shouldErr { t.Fatalf("Test %d: Expected no error, got %q", i, err) }