handle clusterIP endpoint queries (#730)

This commit is contained in:
Chris O'Haver 2017-06-14 10:29:41 -04:00 committed by John Belamaric
parent 930c54ef62
commit 5c10eba31c
3 changed files with 53 additions and 35 deletions

View file

@ -366,6 +366,7 @@ func (k *Kubernetes) Records(r recordRequest) ([]msg.Service, error) {
}
records := k.getRecordsForK8sItems(services, pods, r)
return records, nil
}
@ -386,8 +387,8 @@ func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, r rec
zonePath := msg.Path(r.zone, "coredns")
for _, svc := range services {
if svc.addr == api.ClusterIPNone {
// This is a headless service, create records for each endpoint
if svc.addr == api.ClusterIPNone || len(svc.endpoints) > 0 {
// This is a headless service or endpoints are present, create records for each endpoint
for _, ep := range svc.endpoints {
s := msg.Service{
Host: ep.addr.IP,
@ -522,47 +523,52 @@ func (k *Kubernetes) findServices(r recordRequest) ([]service, error) {
continue
}
s := service{name: svc.Name, namespace: svc.Namespace}
// External Service
// Endpoint query or headless service
if svc.Spec.ClusterIP == api.ClusterIPNone || r.endpoint != "" {
s.addr = svc.Spec.ClusterIP
endpointsList := k.APIConn.EndpointsList()
for _, ep := range endpointsList.Items {
if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace {
continue
}
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
for _, p := range eps.Ports {
ephostname := endpointHostname(addr)
if r.endpoint != "" && r.endpoint != ephostname {
continue
}
if !(symbolMatches(r.port, strings.ToLower(p.Name), portWildcard) && symbolMatches(r.protocol, strings.ToLower(string(p.Protocol)), protocolWildcard)) {
continue
}
s.endpoints = append(s.endpoints, endpoint{addr: addr, port: p})
}
}
}
}
if len(s.endpoints) > 0 {
resultItems = append(resultItems, s)
}
continue
}
// External service
if svc.Spec.ExternalName != "" {
s.addr = svc.Spec.ExternalName
resultItems = append(resultItems, s)
continue
}
// ClusterIP service
if svc.Spec.ClusterIP != api.ClusterIPNone {
s.addr = svc.Spec.ClusterIP
for _, p := range svc.Spec.Ports {
if !(symbolMatches(r.port, strings.ToLower(p.Name), portWildcard) && symbolMatches(r.protocol, strings.ToLower(string(p.Protocol)), protocolWildcard)) {
continue
}
s.ports = append(s.ports, p)
}
resultItems = append(resultItems, s)
continue
}
// Headless service
s.addr = svc.Spec.ClusterIP
endpointsList := k.APIConn.EndpointsList()
for _, ep := range endpointsList.Items {
if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace {
// ClusterIP service
s.addr = svc.Spec.ClusterIP
for _, p := range svc.Spec.Ports {
if !(symbolMatches(r.port, strings.ToLower(p.Name), portWildcard) && symbolMatches(r.protocol, strings.ToLower(string(p.Protocol)), protocolWildcard)) {
continue
}
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
for _, p := range eps.Ports {
ephostname := endpointHostname(addr)
if r.endpoint != "" && r.endpoint != ephostname {
continue
}
if !(symbolMatches(r.port, strings.ToLower(p.Name), portWildcard) && symbolMatches(r.protocol, strings.ToLower(string(p.Protocol)), protocolWildcard)) {
continue
}
s.endpoints = append(s.endpoints, endpoint{addr: addr, port: p})
}
}
}
s.ports = append(s.ports, p)
}
resultItems = append(resultItems, s)
}
return resultItems, nil