Add A lookup for headless services (#451)
This commit is contained in:
parent
8faa8354b4
commit
56d3b47d11
4 changed files with 136 additions and 6 deletions
|
@ -91,6 +91,26 @@ spec:
|
||||||
name: c-port
|
name: c-port
|
||||||
protocol: UDP
|
protocol: UDP
|
||||||
---
|
---
|
||||||
|
apiVersion: extensions/v1beta1
|
||||||
|
kind: Deployment
|
||||||
|
metadata:
|
||||||
|
name: de-d1
|
||||||
|
namespace: test-1
|
||||||
|
spec:
|
||||||
|
replicas: 2
|
||||||
|
template:
|
||||||
|
metadata:
|
||||||
|
labels:
|
||||||
|
app: app-d
|
||||||
|
spec:
|
||||||
|
containers:
|
||||||
|
- name: app-d-c
|
||||||
|
image: gcr.io/google_containers/pause-amd64:3.0
|
||||||
|
ports:
|
||||||
|
- containerPort: 1234
|
||||||
|
name: c-port
|
||||||
|
protocol: UDP
|
||||||
|
---
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: Service
|
kind: Service
|
||||||
metadata:
|
metadata:
|
||||||
|
@ -149,3 +169,17 @@ spec:
|
||||||
- name: c-port
|
- name: c-port
|
||||||
port: 1234
|
port: 1234
|
||||||
protocol: UDP
|
protocol: UDP
|
||||||
|
---
|
||||||
|
apiVersion: v1
|
||||||
|
kind: Service
|
||||||
|
metadata:
|
||||||
|
name: headless-svc
|
||||||
|
namespace: test-1
|
||||||
|
spec:
|
||||||
|
selector:
|
||||||
|
app: app-d
|
||||||
|
clusterIP: None
|
||||||
|
ports:
|
||||||
|
- name: c-port
|
||||||
|
port: 1234
|
||||||
|
protocol: UDP
|
||||||
|
|
|
@ -39,9 +39,11 @@ type dnsController struct {
|
||||||
|
|
||||||
svcController *cache.Controller
|
svcController *cache.Controller
|
||||||
nsController *cache.Controller
|
nsController *cache.Controller
|
||||||
|
epController *cache.Controller
|
||||||
|
|
||||||
svcLister cache.StoreToServiceLister
|
svcLister cache.StoreToServiceLister
|
||||||
nsLister storeToNamespaceLister
|
nsLister storeToNamespaceLister
|
||||||
|
epLister cache.StoreToEndpointsLister
|
||||||
|
|
||||||
// stopLock is used to enforce only a single call to Stop is active.
|
// stopLock is used to enforce only a single call to Stop is active.
|
||||||
// Needed because we allow stopping through an http endpoint and
|
// Needed because we allow stopping through an http endpoint and
|
||||||
|
@ -76,6 +78,13 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati
|
||||||
},
|
},
|
||||||
&api.Namespace{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
|
&api.Namespace{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
|
||||||
|
|
||||||
|
dns.epLister.Store, dns.epController = cache.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: endpointsListFunc(dns.client, namespace, dns.selector),
|
||||||
|
WatchFunc: endpointsWatchFunc(dns.client, namespace, dns.selector),
|
||||||
|
},
|
||||||
|
&api.Endpoints{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
|
||||||
|
|
||||||
return &dns
|
return &dns
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,6 +94,7 @@ func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) fun
|
||||||
opts.LabelSelector = *s
|
opts.LabelSelector = *s
|
||||||
}
|
}
|
||||||
listV1, err := c.Core().Services(ns).List(opts)
|
listV1, err := c.Core().Services(ns).List(opts)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -119,6 +129,14 @@ func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) {
|
||||||
return in, true
|
return in, true
|
||||||
}
|
}
|
||||||
return watch.Event{Type: in.Type, Object: &apiObj}, true
|
return watch.Event{Type: in.Type, Object: &apiObj}, true
|
||||||
|
case *v1.Endpoints:
|
||||||
|
var apiObj api.Endpoints
|
||||||
|
err := v1.Convert_v1_Endpoints_To_api_Endpoints(v1Obj, &apiObj, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ERROR] Could not convert v1.Endpoint: %s", err)
|
||||||
|
return in, true
|
||||||
|
}
|
||||||
|
return watch.Event{Type: in.Type, Object: &apiObj}, true
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("[WARN] Unhandled v1 type in event: %v", in)
|
log.Printf("[WARN] Unhandled v1 type in event: %v", in)
|
||||||
|
@ -169,6 +187,38 @@ func namespaceWatchFunc(c *kubernetes.Clientset, s *labels.Selector) func(option
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func endpointsListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
|
||||||
|
return func(opts api.ListOptions) (runtime.Object, error) {
|
||||||
|
if s != nil {
|
||||||
|
opts.LabelSelector = *s
|
||||||
|
}
|
||||||
|
listV1, err := c.Core().Endpoints(ns).List(opts)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var listAPI api.EndpointsList
|
||||||
|
err = v1.Convert_v1_EndpointsList_To_api_EndpointsList(listV1, &listAPI, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &listAPI, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func endpointsWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
if s != nil {
|
||||||
|
options.LabelSelector = *s
|
||||||
|
}
|
||||||
|
w, err := c.Core().Endpoints(ns).Watch(options)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return watch.Filter(w, v1ToAPIFilter), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (dns *dnsController) controllersInSync() bool {
|
func (dns *dnsController) controllersInSync() bool {
|
||||||
return dns.svcController.HasSynced()
|
return dns.svcController.HasSynced()
|
||||||
}
|
}
|
||||||
|
@ -193,6 +243,7 @@ func (dns *dnsController) Stop() error {
|
||||||
func (dns *dnsController) Run() {
|
func (dns *dnsController) Run() {
|
||||||
go dns.svcController.Run(dns.stopCh)
|
go dns.svcController.Run(dns.stopCh)
|
||||||
go dns.nsController.Run(dns.stopCh)
|
go dns.nsController.Run(dns.stopCh)
|
||||||
|
go dns.epController.Run(dns.stopCh)
|
||||||
<-dns.stopCh
|
<-dns.stopCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ func (k *Kubernetes) Services(state request.Request, exact bool, opt middleware.
|
||||||
}
|
}
|
||||||
|
|
||||||
// PrimaryZone will return the first non-reverse zone being handled by this middleware
|
// PrimaryZone will return the first non-reverse zone being handled by this middleware
|
||||||
func (k *Kubernetes) PrimaryZone() (string) {
|
func (k *Kubernetes) PrimaryZone() string {
|
||||||
return k.Zones[k.primaryZone]
|
return k.Zones[k.primaryZone]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,15 +228,38 @@ func (k *Kubernetes) getRecordsForServiceItems(serviceItems []*api.Service, zone
|
||||||
var records []msg.Service
|
var records []msg.Service
|
||||||
|
|
||||||
for _, item := range serviceItems {
|
for _, item := range serviceItems {
|
||||||
clusterIP := item.Spec.ClusterIP
|
|
||||||
|
|
||||||
// Create records for each exposed port...
|
|
||||||
key := k.NameTemplate.RecordNameFromNameValues(nametemplate.NameValues{TypeName: "svc", ServiceName: item.ObjectMeta.Name, Namespace: item.ObjectMeta.Namespace, Zone: zone})
|
key := k.NameTemplate.RecordNameFromNameValues(nametemplate.NameValues{TypeName: "svc", ServiceName: item.ObjectMeta.Name, Namespace: item.ObjectMeta.Namespace, Zone: zone})
|
||||||
|
|
||||||
key = strings.Replace(key, ".", "/", -1)
|
key = strings.Replace(key, ".", "/", -1)
|
||||||
|
|
||||||
for i, p := range item.Spec.Ports {
|
clusterIP := item.Spec.ClusterIP
|
||||||
s := msg.Service{Key: msg.Path(strconv.Itoa(i)+"."+key, "coredns"), Host: clusterIP, Port: int(p.Port)}
|
if clusterIP == api.ClusterIPNone {
|
||||||
records = append(records, s)
|
// This is a headless service, create records for each pod
|
||||||
|
epList, _ := k.APIConn.epLister.List()
|
||||||
|
for _, ep := range epList.Items {
|
||||||
|
if ep.ObjectMeta.Name == item.ObjectMeta.Name && ep.ObjectMeta.Namespace == item.ObjectMeta.Namespace {
|
||||||
|
for _, eps := range ep.Subsets {
|
||||||
|
for i, port := range eps.Ports {
|
||||||
|
for j, addr := range eps.Addresses {
|
||||||
|
refid := strconv.Itoa(j*1024 + i)
|
||||||
|
s := msg.Service{
|
||||||
|
Key: msg.Path(strings.ToLower(refid+"._"+port.Name+"._"+string(port.Protocol)+"."+key), "coredns"),
|
||||||
|
Host: addr.IP, Port: int(port.Port),
|
||||||
|
}
|
||||||
|
records = append(records, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Create records for each exposed port...
|
||||||
|
|
||||||
|
for _, p := range item.Spec.Ports {
|
||||||
|
s := msg.Service{Key: msg.Path(strings.ToLower("_"+p.Name+"._"+string(p.Protocol)+"."+key), "coredns"), Host: clusterIP, Port: int(p.Port)}
|
||||||
|
records = append(records, s)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,6 +288,7 @@ func (k *Kubernetes) getServices(namespace string, nsWildcard bool, servicename
|
||||||
if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(item.Namespace, k.Namespaces)) {
|
if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(item.Namespace, k.Namespaces)) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
resultItems = append(resultItems, item)
|
resultItems = append(resultItems, item)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,8 @@ var dnsTestCases = []test.Case{
|
||||||
test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"),
|
test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"),
|
||||||
test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"),
|
test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"),
|
||||||
test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"),
|
test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"),
|
||||||
|
test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"),
|
||||||
|
test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -68,6 +70,8 @@ var dnsTestCases = []test.Case{
|
||||||
test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"),
|
test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"),
|
||||||
test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"),
|
test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"),
|
||||||
test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"),
|
test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"),
|
||||||
|
test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"),
|
||||||
|
test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -87,6 +91,16 @@ var dnsTestCases = []test.Case{
|
||||||
test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"),
|
test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"),
|
||||||
test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"),
|
test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"),
|
||||||
test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"),
|
test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"),
|
||||||
|
test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"),
|
||||||
|
test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Qname: "headless-svc.test-1.svc.cluster.local.", Qtype: dns.TypeA,
|
||||||
|
Rcode: dns.RcodeSuccess,
|
||||||
|
Answer: []dns.RR{
|
||||||
|
test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"),
|
||||||
|
test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
//TODO: Fix below to all use test.SRV not test.A!
|
//TODO: Fix below to all use test.SRV not test.A!
|
||||||
|
@ -137,6 +151,8 @@ var dnsTestCases = []test.Case{
|
||||||
test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."),
|
test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."),
|
||||||
test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."),
|
test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."),
|
||||||
test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."),
|
test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."),
|
||||||
|
test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
|
||||||
|
test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -147,6 +163,8 @@ var dnsTestCases = []test.Case{
|
||||||
test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."),
|
test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."),
|
||||||
test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."),
|
test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."),
|
||||||
test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."),
|
test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."),
|
||||||
|
test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
|
||||||
|
test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -167,6 +185,8 @@ var dnsTestCases = []test.Case{
|
||||||
test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."),
|
test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."),
|
||||||
test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."),
|
test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."),
|
||||||
test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."),
|
test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."),
|
||||||
|
test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
|
||||||
|
test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -223,6 +243,7 @@ func TestKubernetesIntegration(t *testing.T) {
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
for _, tc := range dnsTestCases {
|
for _, tc := range dnsTestCases {
|
||||||
|
|
||||||
dnsClient := new(dns.Client)
|
dnsClient := new(dns.Client)
|
||||||
dnsMessage := new(dns.Msg)
|
dnsMessage := new(dns.Msg)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue