diff --git a/.gitignore b/.gitignore index 89971ccd0..ec03a44a4 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ coverage.txt .classpath .project .settings/** +build/ +release/ diff --git a/plugin/kubernetes/README.md b/plugin/kubernetes/README.md index c844bc0c2..ccbd681d9 100644 --- a/plugin/kubernetes/README.md +++ b/plugin/kubernetes/README.md @@ -229,3 +229,19 @@ plugin is also enabled: * kubernetes/service: the service name in the query * kubernetes/client-namespace: the client pod's namespace, if `pods verified` mode is enabled * kubernetes/client-pod-name: the client pod's name, if `pods verified` mode is enabled + +## Metrics + +The *kubernetes* plugin exports the following *Prometheus* metrics. +* `coredns_kubernetes_dns_programming_latency_seconds{service_kind}` - exports the +[DNS programming latency SLI](https://github.com/kubernetes/community/blob/master/sig-scalability/slos/dns_programming_latency.md). + The metrics has the `service_kind` label that identifies the kind of the + [kubernetes service](https://kubernetes.io/docs/concepts/services-networking/service). + It may take one of the three values: + * `cluster_ip` + * `headless_with_selector` + * `headless_without_selector` + +## Bugs + + * add support for other service types; only "headless_with_selector" is supported now diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index f5a3e7ffd..98f82341b 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -88,8 +88,9 @@ type dnsControlOpts struct { namespaceLabelSelector *meta.LabelSelector namespaceSelector labels.Selector - zones []string - endpointNameMode bool + zones []string + endpointNameMode bool + skipAPIObjectsCleanup bool } // newDNSController creates a controller for CoreDNS. @@ -111,7 +112,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns &api.Service{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc}, - object.ToService, + object.DefaultProcessor(object.ToService(opts.skipAPIObjectsCleanup)), ) if opts.initPodCache { @@ -123,7 +124,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns &api.Pod{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{podIPIndex: podIPIndexFunc}, - object.ToPod, + object.DefaultProcessor(object.ToPod(opts.skipAPIObjectsCleanup)), ) } @@ -134,9 +135,50 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector), }, &api.Endpoints{}, - cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, + cache.ResourceEventHandlerFuncs{}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.ToEndpoints) + func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { + return func(obj interface{}) error { + for _, d := range obj.(cache.Deltas) { + + apiEndpoints, obj := object.ToEndpoints(d.Object) + + switch d.Type { + case cache.Sync, cache.Added, cache.Updated: + if old, exists, err := clientState.Get(obj); err == nil && exists { + if err := clientState.Update(obj); err != nil { + return err + } + h.OnUpdate(old, obj) + // endpoint updates can come frequently, make sure it's a change we care about + if !endpointsEquivalent(old.(*object.Endpoints), obj) { + dns.updateModifed() + recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints) + } + } else { + if err := clientState.Add(obj); err != nil { + return err + } + h.OnAdd(d.Object) + dns.updateModifed() + recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints) + } + case cache.Deleted: + if err := clientState.Delete(obj); err != nil { + return err + } + h.OnDelete(d.Object) + dns.updateModifed() + recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints) + } + if !opts.skipAPIObjectsCleanup { + *apiEndpoints = api.Endpoints{} + } + } + return nil + } + }) + } dns.nsLister, dns.nsController = cache.NewInformer( @@ -423,17 +465,6 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) { switch ob := obj.(type) { case *object.Service: dns.updateModifed() - case *object.Endpoints: - if newObj == nil || oldObj == nil { - dns.updateModifed() - return - } - p := oldObj.(*object.Endpoints) - // endpoint updates can come frequently, make sure it's a change we care about - if endpointsEquivalent(p, ob) { - return - } - dns.updateModifed() case *object.Pod: dns.updateModifed() default: @@ -441,6 +472,10 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) { } } +func (dns *dnsControl) getServices(endpoints *object.Endpoints) []*object.Service { + return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace())) +} + // subsetsEquivalent checks if two endpoint subsets are significantly equivalent // I.e. that they have the same ready addresses, host names, ports (including protocol // and service names for SRV) @@ -483,6 +518,9 @@ func subsetsEquivalent(sa, sb object.EndpointSubset) bool { // endpointsEquivalent checks if the update to an endpoint is something // that matters to us or if they are effectively equivalent. func endpointsEquivalent(a, b *object.Endpoints) bool { + if a == nil || b == nil { + return false + } if len(a.Subsets) != len(b.Subsets) { return false diff --git a/plugin/kubernetes/metrics.go b/plugin/kubernetes/metrics.go new file mode 100644 index 000000000..e85b48120 --- /dev/null +++ b/plugin/kubernetes/metrics.go @@ -0,0 +1,76 @@ +package kubernetes + +import ( + "time" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/kubernetes/object" + "github.com/prometheus/client_golang/prometheus" + api "k8s.io/api/core/v1" +) + +const ( + subsystem = "kubernetes" +) + +var ( + // DnsProgrammingLatency is defined as the time it took to program a DNS instance - from the time + // a service or pod has changed to the time the change was propagated and was available to be + // served by a DNS server. + // The definition of this SLI can be found at https://github.com/kubernetes/community/blob/master/sig-scalability/slos/dns_programming_latency.md + // Note that the metrics is partially based on the time exported by the endpoints controller on + // the master machine. The measurement may be inaccurate if there is a clock drift between the + // node and master machine. + // The service_kind label can be one of: + // * cluster_ip + // * headless_with_selector + // * headless_without_selector + DnsProgrammingLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: plugin.Namespace, + Subsystem: subsystem, + Name: "dns_programming_latency_seconds", + // From 1 millisecond to ~17 minutes. + Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), + Help: "Histogram of the time (in seconds) it took to program a dns instance.", + }, []string{"service_kind"}) + + // durationSinceFunc returns the duration elapsed since the given time. + // Added as a global variable to allow injection for testing. + durationSinceFunc = time.Since +) + +func recordDNSProgrammingLatency(svcs []*object.Service, endpoints *api.Endpoints) { + // getLastChangeTriggerTime is the time.Time value of the EndpointsLastChangeTriggerTime + // annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set + var lastChangeTriggerTime time.Time + stringVal, ok := endpoints.Annotations[api.EndpointsLastChangeTriggerTime] + if ok { + ts, err := time.Parse(time.RFC3339Nano, stringVal) + if err != nil { + log.Warningf("DnsProgrammingLatency cannot be calculated for Endpoints '%s/%s'; invalid %q annotation RFC3339 value of %q", + endpoints.GetNamespace(), endpoints.GetName(), api.EndpointsLastChangeTriggerTime, stringVal) + // In case of error val = time.Zero, which is ignored in the upstream code. + } + lastChangeTriggerTime = ts + } + + // isHeadless indicates whether the endpoints object belongs to a headless + // service (i.e. clusterIp = None). Note that this can be a false negatives if the service + // informer is lagging, i.e. we may not see a recently created service. Given that the services + // don't change very often (comparing to much more frequent endpoints changes), cases when this method + // will return wrong answer should be relatively rare. Because of that we intentionally accept this + // flaw to keep the solution simple. + isHeadless := len(svcs) == 1 && svcs[0].ClusterIP == api.ClusterIPNone + + if endpoints == nil || !isHeadless || lastChangeTriggerTime.IsZero() { + return + } + + // If we're here it means that the Endpoints object is for a headless service and that + // the Endpoints object was created by the endpoints-controller (because the + // LastChangeTriggerTime annotation is set). It means that the corresponding service is a + // "headless service with selector". + DnsProgrammingLatency.WithLabelValues("headless_with_selector"). + Observe(durationSinceFunc(lastChangeTriggerTime).Seconds()) + +} diff --git a/plugin/kubernetes/metrics_test.go b/plugin/kubernetes/metrics_test.go new file mode 100644 index 000000000..b9d3a9509 --- /dev/null +++ b/plugin/kubernetes/metrics_test.go @@ -0,0 +1,132 @@ +package kubernetes + +import ( + "strings" + "testing" + "time" + + "github.com/coredns/coredns/plugin/kubernetes/object" + + "github.com/prometheus/client_golang/prometheus/testutil" + api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +const ( + namespace = "testns" +) + +func TestDnsProgrammingLatency(t *testing.T) { + client := fake.NewSimpleClientset() + now := time.Now() + controller := newdnsController(client, dnsControlOpts{ + initEndpointsCache: true, + // This is needed as otherwise the fake k8s client doesn't work properly. + skipAPIObjectsCleanup: true, + }) + durationSinceFunc = func(t time.Time) time.Duration { + return now.Sub(t) + } + DnsProgrammingLatency.Reset() + go controller.Run() + + subset1 := []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}}, + }} + + subset2 := []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}}, + }} + + createService(t, client, controller, "my-service", api.ClusterIPNone) + createEndpoints(t, client, "my-service", now.Add(-2*time.Second), subset1) + updateEndpoints(t, client, "my-service", now.Add(-1*time.Second), subset2) + + createEndpoints(t, client, "endpoints-no-service", now.Add(-4*time.Second), nil) + + createService(t, client, controller, "clusterIP-service", "10.40.0.12") + createEndpoints(t, client, "clusterIP-service", now.Add(-8*time.Second), nil) + + createService(t, client, controller, "headless-no-annotation", api.ClusterIPNone) + createEndpoints(t, client, "headless-no-annotation", nil, nil) + + createService(t, client, controller, "headless-wrong-annotation", api.ClusterIPNone) + createEndpoints(t, client, "headless-wrong-annotation", "wrong-value", nil) + + controller.Stop() + expected := ` + # HELP coredns_kubernetes_dns_programming_latency_seconds Histogram of the time (in seconds) it took to program a dns instance. + # TYPE coredns_kubernetes_dns_programming_latency_seconds histogram + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2 + coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2 + coredns_kubernetes_dns_programming_latency_seconds_sum{service_kind="headless_with_selector"} 3 + coredns_kubernetes_dns_programming_latency_seconds_count{service_kind="headless_with_selector"} 2 + ` + if err := testutil.CollectAndCompare(DnsProgrammingLatency, strings.NewReader(expected)); err != nil { + t.Error(err) + } +} + +func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []api.EndpointSubset) *api.Endpoints { + annotations := make(map[string]string) + switch v := lastChangeTriggerTime.(type) { + case string: + annotations[api.EndpointsLastChangeTriggerTime] = v + case time.Time: + annotations[api.EndpointsLastChangeTriggerTime] = v.Format(time.RFC3339Nano) + } + return &api.Endpoints{ + ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name, Annotations: annotations}, + Subsets: subsets, + } +} + +func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { + _, err := client.CoreV1().Endpoints(namespace).Create(buildEndpoints(name, triggerTime, subsets)) + if err != nil { + t.Fatal(err) + } +} + +func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { + _, err := client.CoreV1().Endpoints(namespace).Update(buildEndpoints(name, triggerTime, subsets)) + if err != nil { + t.Fatal(err) + } +} + +func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) { + if _, err := client.CoreV1().Services(namespace).Create(&api.Service{ + ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name}, + Spec: api.ServiceSpec{ClusterIP: clusterIp}, + }); err != nil { + t.Fatal(err) + } + if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + return len(controller.SvcIndex(object.ServiceKey(name, namespace))) == 1, nil + }); err != nil { + t.Fatal(err) + } +} diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index b2c77fc10..c7d6b7323 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -44,10 +44,10 @@ type EndpointPort struct { func EndpointsKey(name, namespace string) string { return name + "." + namespace } // ToEndpoints converts an api.Endpoints to a *Endpoints. -func ToEndpoints(obj interface{}) interface{} { +func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) { end, ok := obj.(*api.Endpoints) if !ok { - return nil + return nil, nil } e := &Endpoints{ @@ -93,9 +93,7 @@ func ToEndpoints(obj interface{}) interface{} { } } - *end = api.Endpoints{} - - return e + return end, e } // CopyWithoutSubsets copies e, without the subsets. diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go index 919fce12d..bd4d05d30 100644 --- a/plugin/kubernetes/object/informer.go +++ b/plugin/kubernetes/object/informer.go @@ -5,19 +5,25 @@ import ( "k8s.io/client-go/tools/cache" ) -// NewIndexerInformer is a copy of the cache.NewIndexInformer function, but allows Process to have a conversion function (ToFunc). -func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.ResourceEventHandler, indexers cache.Indexers, convert ToFunc) (cache.Indexer, cache.Controller) { +// NewIndexerInformer is a copy of the cache.NewIndexerInformer function, but allows custom process function +func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.ResourceEventHandler, indexers cache.Indexers, builder ProcessorBuilder) (cache.Indexer, cache.Controller) { clientState := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, indexers) - fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, clientState) - cfg := &cache.Config{ - Queue: fifo, + Queue: cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, clientState), ListerWatcher: lw, ObjectType: objType, FullResyncPeriod: defaultResyncPeriod, RetryOnError: false, - Process: func(obj interface{}) error { + Process: builder(clientState, h), + } + return clientState, cache.New(cfg) +} + +// DefaultProcessor is a copy of Process function from cache.NewIndexerInformer except it does a conversion. +func DefaultProcessor(convert ToFunc) ProcessorBuilder { + return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { + return func(obj interface{}) error { for _, d := range obj.(cache.Deltas) { obj := convert(d.Object) @@ -43,9 +49,8 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache. } } return nil - }, + } } - return clientState, cache.New(cfg) } const defaultResyncPeriod = 0 diff --git a/plugin/kubernetes/object/object.go b/plugin/kubernetes/object/object.go index 6b1c7d839..132b5be6c 100644 --- a/plugin/kubernetes/object/object.go +++ b/plugin/kubernetes/object/object.go @@ -16,14 +16,18 @@ package object import ( - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" ) // ToFunc converts one empty interface to another. type ToFunc func(interface{}) interface{} +// ProcessorBuilder returns function to process cache events. +type ProcessorBuilder func(cache.Indexer, cache.ResourceEventHandler) cache.ProcessFunc + // Empty is an empty struct. type Empty struct{} diff --git a/plugin/kubernetes/object/pod.go b/plugin/kubernetes/object/pod.go index 072d8d56d..9fc9b5726 100644 --- a/plugin/kubernetes/object/pod.go +++ b/plugin/kubernetes/object/pod.go @@ -16,8 +16,14 @@ type Pod struct { *Empty } -// ToPod converts an api.Pod to a *Pod. -func ToPod(obj interface{}) interface{} { +// ToPod returns a function that converts an api.Pod to a *Pod. +func ToPod(skipCleanup bool) ToFunc { + return func(obj interface{}) interface{} { + return toPod(skipCleanup, obj) + } +} + +func toPod(skipCleanup bool, obj interface{}) interface{} { pod, ok := obj.(*api.Pod) if !ok { return nil @@ -35,7 +41,9 @@ func ToPod(obj interface{}) interface{} { return nil } - *pod = api.Pod{} + if !skipCleanup { + *pod = api.Pod{} + } return p } diff --git a/plugin/kubernetes/object/service.go b/plugin/kubernetes/object/service.go index a41100ab9..295715e2d 100644 --- a/plugin/kubernetes/object/service.go +++ b/plugin/kubernetes/object/service.go @@ -26,8 +26,14 @@ type Service struct { // ServiceKey return a string using for the index. func ServiceKey(name, namespace string) string { return name + "." + namespace } -// ToService converts an api.Service to a *Service. -func ToService(obj interface{}) interface{} { +// ToService returns a function that converts an api.Service to a *Service. +func ToService(skipCleanup bool) ToFunc { + return func(obj interface{}) interface{} { + return toService(skipCleanup, obj) + } +} + +func toService(skipCleanup bool, obj interface{}) interface{} { svc, ok := obj.(*api.Service) if !ok { return nil @@ -58,7 +64,9 @@ func ToService(obj interface{}) interface{} { s.ExternalIPs[li+i] = lb.IP } - *svc = api.Service{} + if !skipCleanup { + *svc = api.Service{} + } return s } diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go index 63d85fee9..b35d653e1 100644 --- a/plugin/kubernetes/setup.go +++ b/plugin/kubernetes/setup.go @@ -10,6 +10,7 @@ import ( "github.com/coredns/coredns/core/dnsserver" "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/metrics" "github.com/coredns/coredns/plugin/pkg/dnsutil" clog "github.com/coredns/coredns/plugin/pkg/log" "github.com/coredns/coredns/plugin/pkg/parse" @@ -51,6 +52,11 @@ func setup(c *caddy.Controller) error { k.RegisterKubeCache(c) + c.OnStartup(func() error { + metrics.MustRegister(c, DnsProgrammingLatency) + return nil + }) + dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { k.Next = next return k