middleware/kubernetes: Implement current federation beta (#723)

* federation initial commit

* UTs/bugfixes

* federation bits

* polish, cover UT gaps

* add TODO

* go fmt & todo note

* remove unrelated change

* pr changes

* start node watcher

* get real node name

* remove unused case
This commit is contained in:
Chris O'Haver 2017-06-14 09:38:00 -04:00 committed by John Belamaric
parent 8e86fa6f23
commit 930c54ef62
9 changed files with 420 additions and 17 deletions

View file

@ -39,6 +39,7 @@ type Kubernetes struct {
APIConn dnsController
ResyncPeriod time.Duration
Namespaces []string
Federations []Federation
LabelSelector *unversionedapi.LabelSelector
Selector *labels.Selector
PodMode string
@ -78,9 +79,18 @@ type pod struct {
}
type recordRequest struct {
port, protocol, endpoint, service, namespace, typeName, zone string
port string
protocol string
endpoint string
service string
namespace string
typeName string
zone string
federation string
}
var localPodIP net.IP
var errNoItems = errors.New("no items found")
var errNsNotExposed = errors.New("namespace is not exposed")
var errInvalidRequest = errors.New("invalid query name")
@ -236,16 +246,20 @@ func (k *Kubernetes) InitKubeCache() (err error) {
log.Printf("[INFO] Kubernetes middleware configured with the label selector '%s'. Only kubernetes objects matching this label selector will be exposed.", unversionedapi.FormatLabelSelector(k.LabelSelector))
}
k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector, k.PodMode == PodModeVerified)
opts := dnsControlOpts{
initPodCache: k.PodMode == PodModeVerified,
}
k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector, opts)
return err
}
func (k *Kubernetes) parseRequest(lowerCasedName string, qtype uint16) (r recordRequest, err error) {
// 3 Possible cases
// SRV Request: _port._protocol.service.namespace.type.zone
// A Request (endpoint): endpoint.service.namespace.type.zone
// A Request (service): service.namespace.type.zone
// SRV Request: _port._protocol.service.namespace.[federation.]type.zone
// A Request (endpoint): endpoint.service.namespace.[federation.]type.zone
// A Request (service): service.namespace.[federation.]type.zone
// separate zone from rest of lowerCasedName
var segs []string
for _, z := range k.Zones {
@ -261,6 +275,8 @@ func (k *Kubernetes) parseRequest(lowerCasedName string, qtype uint16) (r record
return r, errZoneNotFound
}
r.federation, segs = k.stripFederation(segs)
if qtype == dns.TypeNS {
return r, nil
}
@ -339,11 +355,17 @@ func (k *Kubernetes) Records(r recordRequest) ([]msg.Service, error) {
return nil, err
}
if len(services) == 0 && len(pods) == 0 {
// Did not find item in k8s
// Did not find item in k8s, try federated
if r.federation != "" {
fedCNAME := k.federationCNAMERecord(r)
if fedCNAME.Key != "" {
return []msg.Service{fedCNAME}, nil
}
}
return nil, errNoItems
}
records := k.getRecordsForK8sItems(services, pods, r.zone)
records := k.getRecordsForK8sItems(services, pods, r)
return records, nil
}
@ -360,27 +382,37 @@ func endpointHostname(addr api.EndpointAddress) string {
return ""
}
func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, zone string) (records []msg.Service) {
zonePath := msg.Path(zone, "coredns")
func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, r recordRequest) (records []msg.Service) {
zonePath := msg.Path(r.zone, "coredns")
for _, svc := range services {
if svc.addr == api.ClusterIPNone {
// This is a headless service, create records for each endpoint
for _, ep := range svc.endpoints {
s := msg.Service{
Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name, endpointHostname(ep.addr)}, "/"),
Host: ep.addr.IP,
Port: int(ep.port.Port),
}
if r.federation != "" {
s.Key = strings.Join([]string{zonePath, "svc", r.federation, svc.namespace, svc.name, endpointHostname(ep.addr)}, "/")
} else {
s.Key = strings.Join([]string{zonePath, "svc", svc.namespace, svc.name, endpointHostname(ep.addr)}, "/")
}
records = append(records, s)
}
} else {
// Create records for each exposed port...
for _, p := range svc.ports {
s := msg.Service{
Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/"),
Host: svc.addr,
Port: int(p.Port)}
if r.federation != "" {
s.Key = strings.Join([]string{zonePath, "svc", r.federation, svc.namespace, svc.name}, "/")
} else {
s.Key = strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/")
}
records = append(records, s)
}
// If the addr is not an IP (i.e. an external service), add the record ...
@ -388,6 +420,11 @@ func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, zone
Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/"),
Host: svc.addr}
if t, _ := s.HostType(); t == dns.TypeCNAME {
if r.federation != "" {
s.Key = strings.Join([]string{zonePath, "svc", r.federation, svc.namespace, svc.name}, "/")
} else {
s.Key = strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/")
}
records = append(records, s)
}
@ -575,3 +612,21 @@ func (k *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service {
func symbolContainsWildcard(symbol string) bool {
return (symbol == "*" || symbol == "any")
}
func (k *Kubernetes) localPodIP() net.IP {
if localPodIP != nil {
return localPodIP
}
addrs, _ := k.interfaceAddrs.interfaceAddrs()
for _, addr := range addrs {
ip, _, _ := net.ParseCIDR(addr.String())
ip = ip.To4()
if ip == nil || ip.IsLoopback() {
continue
}
localPodIP = ip
return localPodIP
}
return nil
}