diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 41416ae0a..233d6f548 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -111,7 +111,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns }, &api.Service{}, opts.resyncPeriod, - cache.ResourceEventHandlerFuncs{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc}, object.ToService, ) @@ -124,7 +124,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns }, &api.Pod{}, opts.resyncPeriod, - cache.ResourceEventHandlerFuncs{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{podIPIndex: podIPIndexFunc}, object.ToPod, ) @@ -138,7 +138,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns }, &api.Endpoints{}, opts.resyncPeriod, - cache.ResourceEventHandlerFuncs{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, object.ToEndpoints) } @@ -406,6 +406,100 @@ func (dns *dnsControl) GetNamespaceByName(name string) (*api.Namespace, error) { return nil, fmt.Errorf("namespace not found") } +func (dns *dnsControl) Add(obj interface{}) { dns.detectChanges(nil, obj) } +func (dns *dnsControl) Delete(obj interface{}) { dns.detectChanges(obj, nil) } +func (dns *dnsControl) Update(oldObj, newObj interface{}) { dns.detectChanges(oldObj, newObj) } + +// detectChanges detects changes in objects, and updates the modified timestamp +func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) { + // If both objects have the same resource version, they are identical. + if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) { + return + } + obj := newObj + if obj == nil { + obj = oldObj + } + switch ob := obj.(type) { + case *object.Service: + dns.updateModifed() + case *object.Endpoints: + if newObj == nil || oldObj == nil { + dns.updateModifed() + return + } + p := oldObj.(*object.Endpoints) + // endpoint updates can come frequently, make sure it's a change we care about + if endpointsEquivalent(p, ob) { + return + } + dns.updateModifed() + case *object.Pod: + dns.updateModifed() + default: + log.Warningf("Updates for %T not supported.", ob) + } +} + +// subsetsEquivalent checks if two endpoint subsets are significantly equivalent +// I.e. that they have the same ready addresses, host names, ports (including protocol +// and service names for SRV) +func subsetsEquivalent(sa, sb object.EndpointSubset) bool { + if len(sa.Addresses) != len(sb.Addresses) { + return false + } + if len(sa.Ports) != len(sb.Ports) { + return false + } + + // in Addresses and Ports, we should be able to rely on + // these being sorted and able to be compared + // they are supposed to be in a canonical format + for addr, aaddr := range sa.Addresses { + baddr := sb.Addresses[addr] + if aaddr.IP != baddr.IP { + return false + } + if aaddr.Hostname != baddr.Hostname { + return false + } + } + + for port, aport := range sa.Ports { + bport := sb.Ports[port] + if aport.Name != bport.Name { + return false + } + if aport.Port != bport.Port { + return false + } + if aport.Protocol != bport.Protocol { + return false + } + } + return true +} + +// endpointsEquivalent checks if the update to an endpoint is something +// that matters to us or if they are effectively equivalent. +func endpointsEquivalent(a, b *object.Endpoints) bool { + + if len(a.Subsets) != len(b.Subsets) { + return false + } + + // we should be able to rely on + // these being sorted and able to be compared + // they are supposed to be in a canonical format + for i, sa := range a.Subsets { + sb := b.Subsets[i] + if !subsetsEquivalent(sa, sb) { + return false + } + } + return true +} + func (dns *dnsControl) Modified() int64 { unix := atomic.LoadInt64(&dns.modified) return unix diff --git a/plugin/kubernetes/xfr_test.go b/plugin/kubernetes/xfr_test.go index 4336194bd..1ada4f7aa 100644 --- a/plugin/kubernetes/xfr_test.go +++ b/plugin/kubernetes/xfr_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/pkg/dnstest" "github.com/coredns/coredns/plugin/test" @@ -141,3 +142,88 @@ func difference(testRRs []dns.RR, gotRRs []dns.RR) []dns.RR { } return foundRRs } + +func TestEndpointsEquivalent(t *testing.T) { + epA := object.Endpoints{ + Subsets: []object.EndpointSubset{{ + Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}}, + }}, + } + epB := object.Endpoints{ + Subsets: []object.EndpointSubset{{ + Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}}, + }}, + } + epC := object.Endpoints{ + Subsets: []object.EndpointSubset{{ + Addresses: []object.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}}, + }}, + } + epD := object.Endpoints{ + Subsets: []object.EndpointSubset{{ + Addresses: []object.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}}, + }, + { + Addresses: []object.EndpointAddress{{IP: "1.2.2.2", Hostname: "foofoo"}}, + }}, + } + epE := object.Endpoints{ + Subsets: []object.EndpointSubset{{ + Addresses: []object.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}, {IP: "1.1.1.1"}}, + }}, + } + epF := object.Endpoints{ + Subsets: []object.EndpointSubset{{ + Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foofoo"}}, + }}, + } + epG := object.Endpoints{ + Subsets: []object.EndpointSubset{{ + Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}}, + Ports: []object.EndpointPort{{Name: "http", Port: 80, Protocol: "TCP"}}, + }}, + } + epH := object.Endpoints{ + Subsets: []object.EndpointSubset{{ + Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}}, + Ports: []object.EndpointPort{{Name: "newportname", Port: 80, Protocol: "TCP"}}, + }}, + } + epI := object.Endpoints{ + Subsets: []object.EndpointSubset{{ + Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}}, + Ports: []object.EndpointPort{{Name: "http", Port: 8080, Protocol: "TCP"}}, + }}, + } + epJ := object.Endpoints{ + Subsets: []object.EndpointSubset{{ + Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}}, + Ports: []object.EndpointPort{{Name: "http", Port: 80, Protocol: "UDP"}}, + }}, + } + + tests := []struct { + equiv bool + a *object.Endpoints + b *object.Endpoints + }{ + {true, &epA, &epB}, + {false, &epA, &epC}, + {false, &epA, &epD}, + {false, &epA, &epE}, + {false, &epA, &epF}, + {false, &epF, &epG}, + {false, &epG, &epH}, + {false, &epG, &epI}, + {false, &epG, &epJ}, + } + + for i, tc := range tests { + if tc.equiv && !endpointsEquivalent(tc.a, tc.b) { + t.Errorf("Test %d: expected endpoints to be equivalent and they are not.", i) + } + if !tc.equiv && endpointsEquivalent(tc.a, tc.b) { + t.Errorf("Test %d: expected endpoints to be seen as different but they were not.", i) + } + } +}