Add pod cache and verified pod responses (#483)
* Add pod cache and verified pod responses * add ip indexing for pod cache
This commit is contained in:
parent
51a34d934d
commit
adfd7d5b19
4 changed files with 123 additions and 6 deletions
|
@ -1,6 +1,7 @@
|
||||||
package kubernetes
|
package kubernetes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -24,6 +25,8 @@ type storeToNamespaceLister struct {
|
||||||
cache.Store
|
cache.Store
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const podIPIndex = "PodIP"
|
||||||
|
|
||||||
// List lists all Namespaces in the store.
|
// List lists all Namespaces in the store.
|
||||||
func (s *storeToNamespaceLister) List() (ns api.NamespaceList, err error) {
|
func (s *storeToNamespaceLister) List() (ns api.NamespaceList, err error) {
|
||||||
for _, m := range s.Store.List() {
|
for _, m := range s.Store.List() {
|
||||||
|
@ -38,10 +41,12 @@ type dnsController struct {
|
||||||
selector *labels.Selector
|
selector *labels.Selector
|
||||||
|
|
||||||
svcController *cache.Controller
|
svcController *cache.Controller
|
||||||
|
podController *cache.Controller
|
||||||
nsController *cache.Controller
|
nsController *cache.Controller
|
||||||
epController *cache.Controller
|
epController *cache.Controller
|
||||||
|
|
||||||
svcLister cache.StoreToServiceLister
|
svcLister cache.StoreToServiceLister
|
||||||
|
podLister cache.StoreToPodLister
|
||||||
nsLister storeToNamespaceLister
|
nsLister storeToNamespaceLister
|
||||||
epLister cache.StoreToEndpointsLister
|
epLister cache.StoreToEndpointsLister
|
||||||
|
|
||||||
|
@ -54,7 +59,7 @@ type dnsController struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newDNSController creates a controller for CoreDNS.
|
// newDNSController creates a controller for CoreDNS.
|
||||||
func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Duration, lselector *labels.Selector) *dnsController {
|
func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Duration, lselector *labels.Selector, initPodCache bool) *dnsController {
|
||||||
dns := dnsController{
|
dns := dnsController{
|
||||||
client: kubeClient,
|
client: kubeClient,
|
||||||
selector: lselector,
|
selector: lselector,
|
||||||
|
@ -71,6 +76,18 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati
|
||||||
cache.ResourceEventHandlerFuncs{},
|
cache.ResourceEventHandlerFuncs{},
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||||
|
|
||||||
|
if initPodCache {
|
||||||
|
dns.podLister.Indexer, dns.podController = cache.NewIndexerInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: podListFunc(dns.client, namespace, dns.selector),
|
||||||
|
WatchFunc: podWatchFunc(dns.client, namespace, dns.selector),
|
||||||
|
},
|
||||||
|
&api.Pod{}, // TODO replace with a lighter-weight custom struct
|
||||||
|
resyncPeriod,
|
||||||
|
cache.ResourceEventHandlerFuncs{},
|
||||||
|
cache.Indexers{podIPIndex: podIPIndexFunc})
|
||||||
|
}
|
||||||
|
|
||||||
dns.nsLister.Store, dns.nsController = cache.NewInformer(
|
dns.nsLister.Store, dns.nsController = cache.NewInformer(
|
||||||
&cache.ListWatch{
|
&cache.ListWatch{
|
||||||
ListFunc: namespaceListFunc(dns.client, dns.selector),
|
ListFunc: namespaceListFunc(dns.client, dns.selector),
|
||||||
|
@ -88,6 +105,14 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati
|
||||||
return &dns
|
return &dns
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func podIPIndexFunc(obj interface{}) ([]string, error) {
|
||||||
|
p, ok := obj.(*api.Pod)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("obj was not an *api.Pod")
|
||||||
|
}
|
||||||
|
return []string{p.Status.PodIP}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
|
func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
|
||||||
return func(opts api.ListOptions) (runtime.Object, error) {
|
return func(opts api.ListOptions) (runtime.Object, error) {
|
||||||
if s != nil {
|
if s != nil {
|
||||||
|
@ -107,6 +132,26 @@ func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) fun
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func podListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
|
||||||
|
return func(opts api.ListOptions) (runtime.Object, error) {
|
||||||
|
if s != nil {
|
||||||
|
opts.LabelSelector = *s
|
||||||
|
}
|
||||||
|
listV1, err := c.Core().Pods(ns).List(opts)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var listAPI api.PodList
|
||||||
|
err = v1.Convert_v1_PodList_To_api_PodList(listV1, &listAPI, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &listAPI, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) {
|
func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) {
|
||||||
if in.Type == watch.Error {
|
if in.Type == watch.Error {
|
||||||
return in, true
|
return in, true
|
||||||
|
@ -121,6 +166,14 @@ func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) {
|
||||||
return in, true
|
return in, true
|
||||||
}
|
}
|
||||||
return watch.Event{Type: in.Type, Object: &apiObj}, true
|
return watch.Event{Type: in.Type, Object: &apiObj}, true
|
||||||
|
case *v1.Pod:
|
||||||
|
var apiObj api.Pod
|
||||||
|
err := v1.Convert_v1_Pod_To_api_Pod(v1Obj, &apiObj, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ERROR] Could not convert v1.Pod: %s", err)
|
||||||
|
return in, true
|
||||||
|
}
|
||||||
|
return watch.Event{Type: in.Type, Object: &apiObj}, true
|
||||||
case *v1.Namespace:
|
case *v1.Namespace:
|
||||||
var apiObj api.Namespace
|
var apiObj api.Namespace
|
||||||
err := v1.Convert_v1_Namespace_To_api_Namespace(v1Obj, &apiObj, nil)
|
err := v1.Convert_v1_Namespace_To_api_Namespace(v1Obj, &apiObj, nil)
|
||||||
|
@ -156,6 +209,20 @@ func serviceWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) fu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func podWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
if s != nil {
|
||||||
|
options.LabelSelector = *s
|
||||||
|
}
|
||||||
|
w, err := c.Core().Pods(ns).Watch(options)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return watch.Filter(w, v1ToAPIFilter), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func namespaceListFunc(c *kubernetes.Clientset, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
|
func namespaceListFunc(c *kubernetes.Clientset, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
|
||||||
return func(opts api.ListOptions) (runtime.Object, error) {
|
return func(opts api.ListOptions) (runtime.Object, error) {
|
||||||
if s != nil {
|
if s != nil {
|
||||||
|
@ -244,6 +311,9 @@ func (dns *dnsController) Run() {
|
||||||
go dns.svcController.Run(dns.stopCh)
|
go dns.svcController.Run(dns.stopCh)
|
||||||
go dns.nsController.Run(dns.stopCh)
|
go dns.nsController.Run(dns.stopCh)
|
||||||
go dns.epController.Run(dns.stopCh)
|
go dns.epController.Run(dns.stopCh)
|
||||||
|
if dns.podController != nil {
|
||||||
|
go dns.podController.Run(dns.stopCh)
|
||||||
|
}
|
||||||
<-dns.stopCh
|
<-dns.stopCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,7 @@ type Kubernetes struct {
|
||||||
|
|
||||||
const (
|
const (
|
||||||
PodModeDisabled = "disabled" // default. pod requests are ignored
|
PodModeDisabled = "disabled" // default. pod requests are ignored
|
||||||
|
PodModeVerified = "verified" // Pod requests are answered only if they exist
|
||||||
PodModeInsecure = "insecure" // ALL pod requests are answered without verfying they exist
|
PodModeInsecure = "insecure" // ALL pod requests are answered without verfying they exist
|
||||||
DnsSchemaVersion = "1.0.0" // https://github.com/kubernetes/dns/blob/master/docs/specification.md
|
DnsSchemaVersion = "1.0.0" // https://github.com/kubernetes/dns/blob/master/docs/specification.md
|
||||||
)
|
)
|
||||||
|
@ -197,7 +198,7 @@ func (k *Kubernetes) InitKubeCache() error {
|
||||||
log.Printf("[INFO] Kubernetes middleware configured with the label selector '%s'. Only kubernetes objects matching this label selector will be exposed.", unversionedapi.FormatLabelSelector(k.LabelSelector))
|
log.Printf("[INFO] Kubernetes middleware configured with the label selector '%s'. Only kubernetes objects matching this label selector will be exposed.", unversionedapi.FormatLabelSelector(k.LabelSelector))
|
||||||
}
|
}
|
||||||
|
|
||||||
k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector)
|
k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector, k.PodMode == PodModeVerified)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -384,9 +385,30 @@ func (k *Kubernetes) findPods(namespace, podname string) (pods []pod, err error)
|
||||||
return pods, nil
|
return pods, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: implement cache verified pod responses
|
// PodModeVerified
|
||||||
return pods, nil
|
objList, err := k.APIConn.podLister.Indexer.ByIndex(podIPIndex, ip)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nsWildcard := symbolContainsWildcard(namespace)
|
||||||
|
for _, o := range objList {
|
||||||
|
p, ok := o.(*api.Pod)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("expected type *api.Pod")
|
||||||
|
}
|
||||||
|
// If namespace has a wildcard, filter results against Corefile namespace list.
|
||||||
|
if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(p.Namespace, k.Namespaces)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// check for matching ip and namespace
|
||||||
|
if ip == p.Status.PodIP && symbolMatches(namespace, p.Namespace, nsWildcard) {
|
||||||
|
s := pod{name: podname, namespace: namespace, addr: ip}
|
||||||
|
pods = append(pods, s)
|
||||||
|
return pods, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pods, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get retrieves matching data from the cache.
|
// Get retrieves matching data from the cache.
|
||||||
|
|
|
@ -88,10 +88,10 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
|
||||||
args := c.RemainingArgs()
|
args := c.RemainingArgs()
|
||||||
if len(args) == 1 {
|
if len(args) == 1 {
|
||||||
switch args[0] {
|
switch args[0] {
|
||||||
case PodModeDisabled, PodModeInsecure:
|
case PodModeDisabled, PodModeInsecure, PodModeVerified:
|
||||||
k8s.PodMode = args[0]
|
k8s.PodMode = args[0]
|
||||||
default:
|
default:
|
||||||
return nil, errors.New("pods must be one of: disabled, insecure")
|
return nil, errors.New("pods must be one of: disabled, verified, insecure")
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -242,6 +242,19 @@ var dnsTestCasesPodsInsecure = []test.Case{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var dnsTestCasesPodsVerified = []test.Case{
|
||||||
|
{
|
||||||
|
Qname: "10-20-0-101.test-1.pod.cluster.local.", Qtype: dns.TypeA,
|
||||||
|
Rcode: dns.RcodeNameError,
|
||||||
|
Answer: []dns.RR{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Qname: "10-20-0-101.test-X.pod.cluster.local.", Qtype: dns.TypeA,
|
||||||
|
Rcode: dns.RcodeNameError,
|
||||||
|
Answer: []dns.RR{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
func createTestServer(t *testing.T, corefile string) (*caddy.Instance, string) {
|
func createTestServer(t *testing.T, corefile string) (*caddy.Instance, string) {
|
||||||
server, err := CoreDNSServer(corefile)
|
server, err := CoreDNSServer(corefile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -315,3 +328,15 @@ func TestKubernetesIntegrationPodsInsecure(t *testing.T) {
|
||||||
`
|
`
|
||||||
doIntegrationTests(t, corefile, dnsTestCasesPodsInsecure)
|
doIntegrationTests(t, corefile, dnsTestCasesPodsInsecure)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestKubernetesIntegrationPodsVerified(t *testing.T) {
|
||||||
|
corefile :=
|
||||||
|
`.:0 {
|
||||||
|
kubernetes cluster.local 0.0.10.in-addr.arpa {
|
||||||
|
endpoint http://localhost:8080
|
||||||
|
namespaces test-1
|
||||||
|
pods verified
|
||||||
|
}
|
||||||
|
`
|
||||||
|
doIntegrationTests(t, corefile, dnsTestCasesPodsVerified)
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue