Measure and expose DNS programming latency from Kubernetes plugin. (#3171)

For now metric is measure only for headless services. Informer has been slighlty
refactored, so the code can measure latency without storing extra fields on
Endpoint struct.

Signed-off-by: Janek Łukaszewicz <janluk@google.com>

Suggestions from code review

Co-Authored-By: Chris O'Haver <cohaver@infoblox.com>
This commit is contained in:
janluk 2019-10-04 17:48:43 +02:00 committed by Miek Gieben
parent 03ea2ae955
commit d7cdb992b4
11 changed files with 330 additions and 37 deletions

View file

@ -88,8 +88,9 @@ type dnsControlOpts struct {
namespaceLabelSelector *meta.LabelSelector
namespaceSelector labels.Selector
zones []string
endpointNameMode bool
zones []string
endpointNameMode bool
skipAPIObjectsCleanup bool
}
// newDNSController creates a controller for CoreDNS.
@ -111,7 +112,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
&api.Service{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc},
object.ToService,
object.DefaultProcessor(object.ToService(opts.skipAPIObjectsCleanup)),
)
if opts.initPodCache {
@ -123,7 +124,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
&api.Pod{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{podIPIndex: podIPIndexFunc},
object.ToPod,
object.DefaultProcessor(object.ToPod(opts.skipAPIObjectsCleanup)),
)
}
@ -134,9 +135,50 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector),
},
&api.Endpoints{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.ResourceEventHandlerFuncs{},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.ToEndpoints)
func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {
apiEndpoints, obj := object.ToEndpoints(d.Object)
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
h.OnUpdate(old, obj)
// endpoint updates can come frequently, make sure it's a change we care about
if !endpointsEquivalent(old.(*object.Endpoints), obj) {
dns.updateModifed()
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
}
} else {
if err := clientState.Add(obj); err != nil {
return err
}
h.OnAdd(d.Object)
dns.updateModifed()
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
}
case cache.Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(d.Object)
dns.updateModifed()
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
}
if !opts.skipAPIObjectsCleanup {
*apiEndpoints = api.Endpoints{}
}
}
return nil
}
})
}
dns.nsLister, dns.nsController = cache.NewInformer(
@ -423,17 +465,6 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
switch ob := obj.(type) {
case *object.Service:
dns.updateModifed()
case *object.Endpoints:
if newObj == nil || oldObj == nil {
dns.updateModifed()
return
}
p := oldObj.(*object.Endpoints)
// endpoint updates can come frequently, make sure it's a change we care about
if endpointsEquivalent(p, ob) {
return
}
dns.updateModifed()
case *object.Pod:
dns.updateModifed()
default:
@ -441,6 +472,10 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
}
}
func (dns *dnsControl) getServices(endpoints *object.Endpoints) []*object.Service {
return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace()))
}
// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
// I.e. that they have the same ready addresses, host names, ports (including protocol
// and service names for SRV)
@ -483,6 +518,9 @@ func subsetsEquivalent(sa, sb object.EndpointSubset) bool {
// endpointsEquivalent checks if the update to an endpoint is something
// that matters to us or if they are effectively equivalent.
func endpointsEquivalent(a, b *object.Endpoints) bool {
if a == nil || b == nil {
return false
}
if len(a.Subsets) != len(b.Subsets) {
return false