plugin/kubernetes: add namespace watch (#1533)

* add namespace watch

* start ns watch, and add sync check
This commit is contained in:
Chris O'Haver 2018-02-15 14:21:54 -05:00 committed by John Belamaric
parent 16504234e5
commit dfd72e440f

View file

@ -60,10 +60,12 @@ type dnsControl struct {
svcController cache.Controller svcController cache.Controller
podController cache.Controller podController cache.Controller
epController cache.Controller epController cache.Controller
nsController cache.Controller
svcLister cache.Indexer svcLister cache.Indexer
podLister cache.Indexer podLister cache.Indexer
epLister cache.Indexer epLister cache.Indexer
nsLister storeToNamespaceLister
// stopLock is used to enforce only a single call to Stop is active. // stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and // Needed because we allow stopping through an http endpoint and
@ -88,6 +90,7 @@ func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dn
selector: opts.selector, selector: opts.selector,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
dns.svcLister, dns.svcController = cache.NewIndexerInformer( dns.svcLister, dns.svcController = cache.NewIndexerInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: serviceListFunc(dns.client, namespace, dns.selector), ListFunc: serviceListFunc(dns.client, namespace, dns.selector),
@ -109,6 +112,7 @@ func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dn
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{podIPIndex: podIPIndexFunc}) cache.Indexers{podIPIndex: podIPIndexFunc})
} }
dns.epLister, dns.epController = cache.NewIndexerInformer( dns.epLister, dns.epController = cache.NewIndexerInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: endpointsListFunc(dns.client, namespace, dns.selector), ListFunc: endpointsListFunc(dns.client, namespace, dns.selector),
@ -119,9 +123,21 @@ func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dn
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}) cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc})
dns.nsLister.Store, dns.nsController = cache.NewInformer(
&cache.ListWatch{
ListFunc: namespaceListFunc(dns.client, dns.selector),
WatchFunc: namespaceWatchFunc(dns.client, dns.selector),
},
&api.Namespace{}, opts.resyncPeriod, cache.ResourceEventHandlerFuncs{})
return &dns return &dns
} }
// storeToNamespaceLister makes a Store that lists Namespaces.
type storeToNamespaceLister struct {
cache.Store
}
func podIPIndexFunc(obj interface{}) ([]string, error) { func podIPIndexFunc(obj interface{}) ([]string, error) {
p, ok := obj.(*api.Pod) p, ok := obj.(*api.Pod)
if !ok { if !ok {
@ -246,6 +262,32 @@ func endpointsWatchFunc(c *kubernetes.Clientset, ns string, s labels.Selector) f
} }
} }
func namespaceListFunc(c *kubernetes.Clientset, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
listV1, err := c.CoreV1().Namespaces().List(opts)
if err != nil {
return nil, err
}
return listV1, err
}
}
func namespaceWatchFunc(c *kubernetes.Clientset, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
w, err := c.CoreV1().Namespaces().Watch(options)
if err != nil {
return nil, err
}
return w, nil
}
}
// Stop stops the controller. // Stop stops the controller.
func (dns *dnsControl) Stop() error { func (dns *dnsControl) Stop() error {
dns.stopLock.Lock() dns.stopLock.Lock()
@ -269,6 +311,7 @@ func (dns *dnsControl) Run() {
if dns.podController != nil { if dns.podController != nil {
go dns.podController.Run(dns.stopCh) go dns.podController.Run(dns.stopCh)
} }
go dns.nsController.Run(dns.stopCh)
<-dns.stopCh <-dns.stopCh
} }
@ -280,7 +323,8 @@ func (dns *dnsControl) HasSynced() bool {
if dns.podController != nil { if dns.podController != nil {
c = dns.podController.HasSynced() c = dns.podController.HasSynced()
} }
return a && b && c d := dns.nsController.HasSynced()
return a && b && c && d
} }
func (dns *dnsControl) ServiceList() (svcs []*api.Service) { func (dns *dnsControl) ServiceList() (svcs []*api.Service) {
@ -410,14 +454,19 @@ func (dns *dnsControl) GetNodeByName(name string) (*api.Node, error) {
} }
// GetNamespaceByName returns the namespace by name. If nothing is found an // GetNamespaceByName returns the namespace by name. If nothing is found an
// error is returned. This query causes a roundtrip to the k8s API server, so // error is returned.
// use sparingly.
func (dns *dnsControl) GetNamespaceByName(name string) (*api.Namespace, error) { func (dns *dnsControl) GetNamespaceByName(name string) (*api.Namespace, error) {
v1ns, err := dns.client.CoreV1().Namespaces().Get(name, meta.GetOptions{}) os := dns.nsLister.List()
if err != nil { for _, o := range os {
return &api.Namespace{}, err ns, ok := o.(*api.Namespace)
if !ok {
continue
}
if name == ns.ObjectMeta.Name {
return ns, nil
}
} }
return v1ns, nil return nil, fmt.Errorf("namespace not found")
} }
func (dns *dnsControl) Modified() int64 { func (dns *dnsControl) Modified() int64 {