mw/kubernetes: resync to opts (#957)
* mw/kubernetes: resync to opts Only used to initialize the cache that already has a dnsControlopts, so remove it from the main kubernetes struct. * Fix test * mw/kubernetes: LabelSelector to options as well Labels select is also only used for init. Don't carry it in the main kubernetes struct. * remove this test: can't happen Caddyfile parser will only call setup when it sees kubernetes. * erge gone wrong
This commit is contained in:
parent
6a4e69eb9f
commit
12db6618c8
5 changed files with 52 additions and 63 deletions
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"k8s.io/client-go/1.5/kubernetes"
|
"k8s.io/client-go/1.5/kubernetes"
|
||||||
"k8s.io/client-go/1.5/pkg/api"
|
"k8s.io/client-go/1.5/pkg/api"
|
||||||
|
unversionedapi "k8s.io/client-go/1.5/pkg/api/unversioned"
|
||||||
"k8s.io/client-go/1.5/pkg/api/v1"
|
"k8s.io/client-go/1.5/pkg/api/v1"
|
||||||
"k8s.io/client-go/1.5/pkg/labels"
|
"k8s.io/client-go/1.5/pkg/labels"
|
||||||
"k8s.io/client-go/1.5/pkg/runtime"
|
"k8s.io/client-go/1.5/pkg/runtime"
|
||||||
|
@ -71,13 +72,17 @@ type dnsControl struct {
|
||||||
|
|
||||||
type dnsControlOpts struct {
|
type dnsControlOpts struct {
|
||||||
initPodCache bool
|
initPodCache bool
|
||||||
|
resyncPeriod time.Duration
|
||||||
|
// Label handling.
|
||||||
|
labelSelector *unversionedapi.LabelSelector
|
||||||
|
selector *labels.Selector
|
||||||
}
|
}
|
||||||
|
|
||||||
// newDNSController creates a controller for CoreDNS.
|
// newDNSController creates a controller for CoreDNS.
|
||||||
func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Duration, lselector *labels.Selector, opts dnsControlOpts) *dnsControl {
|
func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dnsControl {
|
||||||
dns := dnsControl{
|
dns := dnsControl{
|
||||||
client: kubeClient,
|
client: kubeClient,
|
||||||
selector: lselector,
|
selector: opts.selector,
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +92,7 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati
|
||||||
WatchFunc: serviceWatchFunc(dns.client, namespace, dns.selector),
|
WatchFunc: serviceWatchFunc(dns.client, namespace, dns.selector),
|
||||||
},
|
},
|
||||||
&api.Service{},
|
&api.Service{},
|
||||||
resyncPeriod,
|
opts.resyncPeriod,
|
||||||
cache.ResourceEventHandlerFuncs{},
|
cache.ResourceEventHandlerFuncs{},
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||||
|
|
||||||
|
@ -98,7 +103,7 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati
|
||||||
WatchFunc: podWatchFunc(dns.client, namespace, dns.selector),
|
WatchFunc: podWatchFunc(dns.client, namespace, dns.selector),
|
||||||
},
|
},
|
||||||
&api.Pod{}, // TODO replace with a lighter-weight custom struct
|
&api.Pod{}, // TODO replace with a lighter-weight custom struct
|
||||||
resyncPeriod,
|
opts.resyncPeriod,
|
||||||
cache.ResourceEventHandlerFuncs{},
|
cache.ResourceEventHandlerFuncs{},
|
||||||
cache.Indexers{podIPIndex: podIPIndexFunc})
|
cache.Indexers{podIPIndex: podIPIndexFunc})
|
||||||
}
|
}
|
||||||
|
@ -108,14 +113,18 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati
|
||||||
ListFunc: namespaceListFunc(dns.client, dns.selector),
|
ListFunc: namespaceListFunc(dns.client, dns.selector),
|
||||||
WatchFunc: namespaceWatchFunc(dns.client, dns.selector),
|
WatchFunc: namespaceWatchFunc(dns.client, dns.selector),
|
||||||
},
|
},
|
||||||
&api.Namespace{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
|
&api.Namespace{},
|
||||||
|
opts.resyncPeriod,
|
||||||
|
cache.ResourceEventHandlerFuncs{})
|
||||||
|
|
||||||
dns.epLister.Store, dns.epController = cache.NewInformer(
|
dns.epLister.Store, dns.epController = cache.NewInformer(
|
||||||
&cache.ListWatch{
|
&cache.ListWatch{
|
||||||
ListFunc: endpointsListFunc(dns.client, namespace, dns.selector),
|
ListFunc: endpointsListFunc(dns.client, namespace, dns.selector),
|
||||||
WatchFunc: endpointsWatchFunc(dns.client, namespace, dns.selector),
|
WatchFunc: endpointsWatchFunc(dns.client, namespace, dns.selector),
|
||||||
},
|
},
|
||||||
&api.Endpoints{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
|
&api.Endpoints{},
|
||||||
|
opts.resyncPeriod,
|
||||||
|
cache.ResourceEventHandlerFuncs{})
|
||||||
|
|
||||||
return &dns
|
return &dns
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ package kubernetes
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -38,10 +37,7 @@ type Kubernetes struct {
|
||||||
APIClientCert string
|
APIClientCert string
|
||||||
APIClientKey string
|
APIClientKey string
|
||||||
APIConn dnsController
|
APIConn dnsController
|
||||||
ResyncPeriod time.Duration
|
|
||||||
Namespaces map[string]bool
|
Namespaces map[string]bool
|
||||||
LabelSelector *unversionedapi.LabelSelector
|
|
||||||
Selector *labels.Selector
|
|
||||||
PodMode string
|
PodMode string
|
||||||
Fallthrough bool
|
Fallthrough bool
|
||||||
|
|
||||||
|
@ -59,7 +55,6 @@ func New(zones []string) *Kubernetes {
|
||||||
k.interfaceAddrsFunc = func() net.IP { return net.ParseIP("127.0.0.1") }
|
k.interfaceAddrsFunc = func() net.IP { return net.ParseIP("127.0.0.1") }
|
||||||
k.PodMode = PodModeDisabled
|
k.PodMode = PodModeDisabled
|
||||||
k.Proxy = proxy.Proxy{}
|
k.Proxy = proxy.Proxy{}
|
||||||
k.ResyncPeriod = defaultResyncPeriod
|
|
||||||
|
|
||||||
return k
|
return k
|
||||||
}
|
}
|
||||||
|
@ -260,8 +255,8 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
|
||||||
return clientConfig.ClientConfig()
|
return clientConfig.ClientConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitKubeCache initializes a new Kubernetes cache.
|
// initKubeCache initializes a new Kubernetes cache.
|
||||||
func (k *Kubernetes) InitKubeCache() (err error) {
|
func (k *Kubernetes) initKubeCache(opts dnsControlOpts) (err error) {
|
||||||
|
|
||||||
config, err := k.getClientConfig()
|
config, err := k.getClientConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -273,23 +268,18 @@ func (k *Kubernetes) InitKubeCache() (err error) {
|
||||||
return fmt.Errorf("failed to create kubernetes notification controller: %q", err)
|
return fmt.Errorf("failed to create kubernetes notification controller: %q", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.LabelSelector != nil {
|
if opts.labelSelector != nil {
|
||||||
var selector labels.Selector
|
var selector labels.Selector
|
||||||
selector, err = unversionedapi.LabelSelectorAsSelector(k.LabelSelector)
|
selector, err = unversionedapi.LabelSelectorAsSelector(opts.labelSelector)
|
||||||
k.Selector = &selector
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.LabelSelector, err)
|
return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", opts.labelSelector, err)
|
||||||
}
|
}
|
||||||
|
opts.selector = &selector
|
||||||
}
|
}
|
||||||
|
|
||||||
if k.LabelSelector != nil {
|
opts.initPodCache = k.PodMode == PodModeVerified
|
||||||
log.Printf("[INFO] Kubernetes has label selector '%s'. Only objects matching this label selector will be exposed.", unversionedapi.FormatLabelSelector(k.LabelSelector))
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := dnsControlOpts{
|
k.APIConn = newdnsController(kubeClient, opts)
|
||||||
initPodCache: k.PodMode == PodModeVerified,
|
|
||||||
}
|
|
||||||
k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector, opts)
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,12 +24,12 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func setup(c *caddy.Controller) error {
|
func setup(c *caddy.Controller) error {
|
||||||
kubernetes, err := kubernetesParse(c)
|
kubernetes, initOpts, err := kubernetesParse(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return middleware.Error("kubernetes", err)
|
return middleware.Error("kubernetes", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = kubernetes.InitKubeCache()
|
err = kubernetes.initKubeCache(initOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return middleware.Error("kubernetes", err)
|
return middleware.Error("kubernetes", err)
|
||||||
}
|
}
|
||||||
|
@ -58,11 +58,15 @@ func setup(c *caddy.Controller) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
|
func kubernetesParse(c *caddy.Controller) (*Kubernetes, dnsControlOpts, error) {
|
||||||
k8s := New([]string{""})
|
k8s := New([]string{""})
|
||||||
k8s.interfaceAddrsFunc = localPodIP
|
k8s.interfaceAddrsFunc = localPodIP
|
||||||
k8s.autoPathSearch = searchFromResolvConf()
|
k8s.autoPathSearch = searchFromResolvConf()
|
||||||
|
|
||||||
|
opts := dnsControlOpts{
|
||||||
|
resyncPeriod: defaultResyncPeriod,
|
||||||
|
}
|
||||||
|
|
||||||
for c.Next() {
|
for c.Next() {
|
||||||
zones := c.RemainingArgs()
|
zones := c.RemainingArgs()
|
||||||
|
|
||||||
|
@ -88,7 +92,7 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if k8s.primaryZoneIndex == -1 {
|
if k8s.primaryZoneIndex == -1 {
|
||||||
return nil, errors.New("non-reverse zone name must be used")
|
return nil, opts, errors.New("non-reverse zone name must be used")
|
||||||
}
|
}
|
||||||
|
|
||||||
for c.NextBlock() {
|
for c.NextBlock() {
|
||||||
|
@ -100,11 +104,11 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
|
||||||
case PodModeDisabled, PodModeInsecure, PodModeVerified:
|
case PodModeDisabled, PodModeInsecure, PodModeVerified:
|
||||||
k8s.PodMode = args[0]
|
k8s.PodMode = args[0]
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("wrong value for pods: %s, must be one of: disabled, verified, insecure", args[0])
|
return nil, opts, fmt.Errorf("wrong value for pods: %s, must be one of: disabled, verified, insecure", args[0])
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, c.ArgErr()
|
return nil, opts, c.ArgErr()
|
||||||
case "namespaces":
|
case "namespaces":
|
||||||
args := c.RemainingArgs()
|
args := c.RemainingArgs()
|
||||||
if len(args) > 0 {
|
if len(args) > 0 {
|
||||||
|
@ -113,7 +117,7 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, c.ArgErr()
|
return nil, opts, c.ArgErr()
|
||||||
case "endpoint":
|
case "endpoint":
|
||||||
args := c.RemainingArgs()
|
args := c.RemainingArgs()
|
||||||
if len(args) > 0 {
|
if len(args) > 0 {
|
||||||
|
@ -122,61 +126,60 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, c.ArgErr()
|
return nil, opts, c.ArgErr()
|
||||||
case "tls": // cert key cacertfile
|
case "tls": // cert key cacertfile
|
||||||
args := c.RemainingArgs()
|
args := c.RemainingArgs()
|
||||||
if len(args) == 3 {
|
if len(args) == 3 {
|
||||||
k8s.APIClientCert, k8s.APIClientKey, k8s.APICertAuth = args[0], args[1], args[2]
|
k8s.APIClientCert, k8s.APIClientKey, k8s.APICertAuth = args[0], args[1], args[2]
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, c.ArgErr()
|
return nil, opts, c.ArgErr()
|
||||||
case "resyncperiod":
|
case "resyncperiod":
|
||||||
args := c.RemainingArgs()
|
args := c.RemainingArgs()
|
||||||
if len(args) > 0 {
|
if len(args) > 0 {
|
||||||
rp, err := time.ParseDuration(args[0])
|
rp, err := time.ParseDuration(args[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to parse resync duration value: '%v': %v", args[0], err)
|
return nil, opts, fmt.Errorf("unable to parse resync duration value: '%v': %v", args[0], err)
|
||||||
}
|
}
|
||||||
k8s.ResyncPeriod = rp
|
opts.resyncPeriod = rp
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, c.ArgErr()
|
return nil, opts, c.ArgErr()
|
||||||
case "labels":
|
case "labels":
|
||||||
args := c.RemainingArgs()
|
args := c.RemainingArgs()
|
||||||
if len(args) > 0 {
|
if len(args) > 0 {
|
||||||
labelSelectorString := strings.Join(args, " ")
|
labelSelectorString := strings.Join(args, " ")
|
||||||
ls, err := unversionedapi.ParseToLabelSelector(labelSelectorString)
|
ls, err := unversionedapi.ParseToLabelSelector(labelSelectorString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to parse label selector value: '%v': %v", labelSelectorString, err)
|
return nil, opts, fmt.Errorf("unable to parse label selector value: '%v': %v", labelSelectorString, err)
|
||||||
}
|
}
|
||||||
k8s.LabelSelector = ls
|
opts.labelSelector = ls
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, c.ArgErr()
|
return nil, opts, c.ArgErr()
|
||||||
case "fallthrough":
|
case "fallthrough":
|
||||||
args := c.RemainingArgs()
|
args := c.RemainingArgs()
|
||||||
if len(args) == 0 {
|
if len(args) == 0 {
|
||||||
k8s.Fallthrough = true
|
k8s.Fallthrough = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, c.ArgErr()
|
return nil, opts, c.ArgErr()
|
||||||
case "upstream":
|
case "upstream":
|
||||||
args := c.RemainingArgs()
|
args := c.RemainingArgs()
|
||||||
if len(args) == 0 {
|
if len(args) == 0 {
|
||||||
return nil, c.ArgErr()
|
return nil, opts, c.ArgErr()
|
||||||
}
|
}
|
||||||
ups, err := dnsutil.ParseHostPortOrFile(args...)
|
ups, err := dnsutil.ParseHostPortOrFile(args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, opts, err
|
||||||
}
|
}
|
||||||
k8s.Proxy = proxy.NewLookup(ups)
|
k8s.Proxy = proxy.NewLookup(ups)
|
||||||
default:
|
default:
|
||||||
return nil, c.Errf("unknown property '%s'", c.Val())
|
return nil, opts, c.Errf("unknown property '%s'", c.Val())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return k8s, nil
|
|
||||||
}
|
}
|
||||||
return nil, errors.New("kubernetes setup called without keyword 'kubernetes' in Corefile")
|
return k8s, opts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func searchFromResolvConf() []string {
|
func searchFromResolvConf() []string {
|
||||||
|
|
|
@ -17,7 +17,7 @@ func TestKubernetesParseReverseZone(t *testing.T) {
|
||||||
|
|
||||||
for i, tc := range tests {
|
for i, tc := range tests {
|
||||||
c := caddy.NewTestController("dns", tc.input)
|
c := caddy.NewTestController("dns", tc.input)
|
||||||
k, err := kubernetesParse(c)
|
k, _, err := kubernetesParse(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Test %d: Expected no error, got %q", i, err)
|
t.Fatalf("Test %d: Expected no error, got %q", i, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,19 +176,6 @@ func TestKubernetesParse(t *testing.T) {
|
||||||
true,
|
true,
|
||||||
nil,
|
nil,
|
||||||
},
|
},
|
||||||
// negative
|
|
||||||
{
|
|
||||||
"",
|
|
||||||
true,
|
|
||||||
"kubernetes setup called without keyword 'kubernetes' in Corefile",
|
|
||||||
-1,
|
|
||||||
-1,
|
|
||||||
defaultResyncPeriod,
|
|
||||||
"",
|
|
||||||
PodModeDisabled,
|
|
||||||
false,
|
|
||||||
nil,
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
`kubernetes coredns.local {
|
`kubernetes coredns.local {
|
||||||
endpoint
|
endpoint
|
||||||
|
@ -396,7 +383,7 @@ func TestKubernetesParse(t *testing.T) {
|
||||||
|
|
||||||
for i, test := range tests {
|
for i, test := range tests {
|
||||||
c := caddy.NewTestController("dns", test.input)
|
c := caddy.NewTestController("dns", test.input)
|
||||||
k8sController, err := kubernetesParse(c)
|
k8sController, opts, err := kubernetesParse(c)
|
||||||
|
|
||||||
if test.shouldErr && err == nil {
|
if test.shouldErr && err == nil {
|
||||||
t.Errorf("Test %d: Expected error, but did not find error for input '%s'. Error was: '%v'", i, test.input, err)
|
t.Errorf("Test %d: Expected error, but did not find error for input '%s'. Error was: '%v'", i, test.input, err)
|
||||||
|
@ -436,14 +423,14 @@ func TestKubernetesParse(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResyncPeriod
|
// ResyncPeriod
|
||||||
foundResyncPeriod := k8sController.ResyncPeriod
|
foundResyncPeriod := opts.resyncPeriod
|
||||||
if foundResyncPeriod != test.expectedResyncPeriod {
|
if foundResyncPeriod != test.expectedResyncPeriod {
|
||||||
t.Errorf("Test %d: Expected kubernetes controller to be initialized with resync period '%s'. Instead found period '%s' for input '%s'", i, test.expectedResyncPeriod, foundResyncPeriod, test.input)
|
t.Errorf("Test %d: Expected kubernetes controller to be initialized with resync period '%s'. Instead found period '%s' for input '%s'", i, test.expectedResyncPeriod, foundResyncPeriod, test.input)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Labels
|
// Labels
|
||||||
if k8sController.LabelSelector != nil {
|
if opts.labelSelector != nil {
|
||||||
foundLabelSelectorString := unversioned.FormatLabelSelector(k8sController.LabelSelector)
|
foundLabelSelectorString := unversioned.FormatLabelSelector(opts.labelSelector)
|
||||||
if foundLabelSelectorString != test.expectedLabelSelector {
|
if foundLabelSelectorString != test.expectedLabelSelector {
|
||||||
t.Errorf("Test %d: Expected kubernetes controller to be initialized with label selector '%s'. Instead found selector '%s' for input '%s'", i, test.expectedLabelSelector, foundLabelSelectorString, test.input)
|
t.Errorf("Test %d: Expected kubernetes controller to be initialized with label selector '%s'. Instead found selector '%s' for input '%s'", i, test.expectedLabelSelector, foundLabelSelectorString, test.input)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue