diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index f9373eee9..2319cf203 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -92,9 +92,8 @@ type dnsControlOpts struct { namespaceLabelSelector *meta.LabelSelector namespaceSelector labels.Selector - zones []string - endpointNameMode bool - skipAPIObjectsCleanup bool + zones []string + endpointNameMode bool } // newDNSController creates a controller for CoreDNS. @@ -116,7 +115,7 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts &api.Service{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc}, - object.DefaultProcessor(object.ToService(opts.skipAPIObjectsCleanup), nil), + object.DefaultProcessor(object.ToService, nil), ) if opts.initPodCache { @@ -128,7 +127,7 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts &api.Pod{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{podIPIndex: podIPIndexFunc}, - object.DefaultProcessor(object.ToPod(opts.skipAPIObjectsCleanup), nil), + object.DefaultProcessor(object.ToPod, nil), ) } @@ -136,28 +135,28 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts var ( apiObj runtime.Object listWatch cache.ListWatch - to func(bool) object.ToFunc - latency object.RecordLatencyFunc + to object.ToFunc + latency *object.EndpointLatencyRecorder ) if opts.useEndpointSlices { apiObj = &discovery.EndpointSlice{} listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) to = object.EndpointSliceToEndpoints - latency = dns.recordEndpointSliceDNSProgrammingLatency + latency = dns.EndpointSliceLatencyRecorder() } else { apiObj = &api.Endpoints{} listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) to = object.ToEndpoints - latency = dns.recordEndpointDNSProgrammingLatency + latency = dns.EndpointsLatencyRecorder() } dns.epLister, dns.epController = object.NewIndexerInformer( &listWatch, apiObj, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(to(opts.skipAPIObjectsCleanup), latency), + object.DefaultProcessor(to, latency), ) } @@ -173,12 +172,19 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts return &dns } -func (dns *dnsControl) recordEndpointDNSProgrammingLatency(obj meta.Object) { - recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj) +func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder { + return &object.EndpointLatencyRecorder{ + ServiceFunc: func(o meta.Object) []*object.Service { + return dns.SvcIndex(object.ServiceKey(o.GetName(), o.GetNamespace())) + }, + } } - -func (dns *dnsControl) recordEndpointSliceDNSProgrammingLatency(obj meta.Object) { - recordDNSProgrammingLatency(dns.SvcIndex(object.ServiceKey(obj.GetLabels()[discovery.LabelServiceName], obj.GetNamespace())), obj) +func (dns *dnsControl) EndpointSliceLatencyRecorder() *object.EndpointLatencyRecorder { + return &object.EndpointLatencyRecorder{ + ServiceFunc: func(o meta.Object) []*object.Service { + return dns.SvcIndex(object.ServiceKey(o.GetLabels()[discovery.LabelServiceName], o.GetNamespace())) + }, + } } func podIPIndexFunc(obj interface{}) ([]string, error) { @@ -518,10 +524,6 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) { } } -func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service { - return dns.SvcIndex(object.ServiceKey(endpoints.GetName(), endpoints.GetNamespace())) -} - // subsetsEquivalent checks if two endpoint subsets are significantly equivalent // I.e. that they have the same ready addresses, host names, ports (including protocol // and service names for SRV) diff --git a/plugin/kubernetes/informer_test.go b/plugin/kubernetes/informer_test.go index 5156554e9..7aa9d1e83 100644 --- a/plugin/kubernetes/informer_test.go +++ b/plugin/kubernetes/informer_test.go @@ -11,7 +11,7 @@ import ( ) func TestDefaultProcessor(t *testing.T) { - pbuild := object.DefaultProcessor(object.ToService(true), nil) + pbuild := object.DefaultProcessor(object.ToService, nil) reh := cache.ResourceEventHandlerFuncs{} idx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}) processor := pbuild(idx, reh) @@ -30,8 +30,8 @@ func testProcessor(t *testing.T, processor cache.ProcessFunc, idx cache.Indexer) // Add the objects err := processor(cache.Deltas{ - {Type: cache.Added, Object: obj}, - {Type: cache.Added, Object: obj2}, + {Type: cache.Added, Object: obj.DeepCopy()}, + {Type: cache.Added, Object: obj2.DeepCopy()}, }) if err != nil { t.Fatalf("add failed: %v", err) @@ -55,7 +55,7 @@ func testProcessor(t *testing.T, processor cache.ProcessFunc, idx cache.Indexer) obj.Spec.ClusterIP = "1.2.3.5" err = processor(cache.Deltas{{ Type: cache.Updated, - Object: obj, + Object: obj.DeepCopy(), }}) if err != nil { t.Fatalf("update failed: %v", err) @@ -78,7 +78,7 @@ func testProcessor(t *testing.T, processor cache.ProcessFunc, idx cache.Indexer) // Delete an object err = processor(cache.Deltas{{ Type: cache.Deleted, - Object: obj2, + Object: obj2.DeepCopy(), }}) if err != nil { t.Fatalf("delete test failed: %v", err) diff --git a/plugin/kubernetes/metrics_test.go b/plugin/kubernetes/metrics_test.backup similarity index 58% rename from plugin/kubernetes/metrics_test.go rename to plugin/kubernetes/metrics_test.backup index 43b5ca382..8274eef13 100644 --- a/plugin/kubernetes/metrics_test.go +++ b/plugin/kubernetes/metrics_test.backup @@ -1,20 +1,16 @@ package kubernetes import ( - "context" "strings" "testing" "time" "github.com/coredns/coredns/plugin/kubernetes/object" - "github.com/prometheus/client_golang/prometheus/testutil" api "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" ) const ( @@ -50,21 +46,19 @@ var expected = ` ` func TestDNSProgrammingLatencyEndpointSlices(t *testing.T) { - client := fake.NewSimpleClientset() now := time.Now() - ctx := context.TODO() - controller := newdnsController(ctx, client, dnsControlOpts{ - initEndpointsCache: true, - useEndpointSlices: true, - // This is needed as otherwise the fake k8s client doesn't work properly. - skipAPIObjectsCleanup: true, - }) - durationSinceFunc = func(t time.Time) time.Duration { + svcIdx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc}) + epIdx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}) + + dns := dnsControl{svcLister: svcIdx} + svcProc := object.DefaultProcessor(object.ToService, nil)(svcIdx, cache.ResourceEventHandlerFuncs{}) + epProc := object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder())(epIdx, cache.ResourceEventHandlerFuncs{}) + + object.DurationSinceFunc = func(t time.Time) time.Duration { return now.Sub(t) } - DNSProgrammingLatency.Reset() - go controller.Run() + object.DNSProgrammingLatency.Reset() endpoints1 := []discovery.Endpoint{{ Addresses: []string{"1.2.3.4"}, @@ -74,44 +68,40 @@ func TestDNSProgrammingLatencyEndpointSlices(t *testing.T) { Addresses: []string{"1.2.3.45"}, }} - createService(t, client, controller, "my-service", api.ClusterIPNone) - createEndpointSlice(t, client, "my-service", now.Add(-2*time.Second), endpoints1) - updateEndpointSlice(t, client, "my-service", now.Add(-1*time.Second), endpoints2) + createService(t, svcProc, "my-service", api.ClusterIPNone) + createEndpointSlice(t, epProc, "my-service", now.Add(-2*time.Second), endpoints1) + updateEndpointSlice(t, epProc, "my-service", now.Add(-1*time.Second), endpoints2) - createEndpointSlice(t, client, "endpoints-no-service", now.Add(-4*time.Second), nil) + createEndpointSlice(t, epProc, "endpoints-no-service", now.Add(-4*time.Second), nil) - createService(t, client, controller, "clusterIP-service", "10.40.0.12") - createEndpointSlice(t, client, "clusterIP-service", now.Add(-8*time.Second), nil) + createService(t, svcProc, "clusterIP-service", "10.40.0.12") + createEndpointSlice(t, epProc, "clusterIP-service", now.Add(-8*time.Second), nil) - createService(t, client, controller, "headless-no-annotation", api.ClusterIPNone) - createEndpointSlice(t, client, "headless-no-annotation", nil, nil) + createService(t, svcProc, "headless-no-annotation", api.ClusterIPNone) + createEndpointSlice(t, epProc, "headless-no-annotation", nil, nil) - createService(t, client, controller, "headless-wrong-annotation", api.ClusterIPNone) - createEndpointSlice(t, client, "headless-wrong-annotation", "wrong-value", nil) + createService(t, svcProc, "headless-wrong-annotation", api.ClusterIPNone) + createEndpointSlice(t, epProc, "headless-wrong-annotation", "wrong-value", nil) - controller.Stop() - - if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil { + if err := testutil.CollectAndCompare(object.DNSProgrammingLatency, strings.NewReader(expected)); err != nil { t.Error(err) } } func TestDnsProgrammingLatencyEndpoints(t *testing.T) { - client := fake.NewSimpleClientset() now := time.Now() - ctx := context.TODO() - controller := newdnsController(ctx, client, dnsControlOpts{ - initEndpointsCache: true, - useEndpointSlices: false, - // This is needed as otherwise the fake k8s client doesn't work properly. - skipAPIObjectsCleanup: true, - }) - durationSinceFunc = func(t time.Time) time.Duration { + svcIdx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc}) + epIdx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}) + + dns := dnsControl{svcLister: svcIdx} + svcProc := object.DefaultProcessor(object.ToService, nil)(svcIdx, cache.ResourceEventHandlerFuncs{}) + epProc := object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder())(epIdx, cache.ResourceEventHandlerFuncs{}) + + object.DurationSinceFunc = func(t time.Time) time.Duration { return now.Sub(t) } - DNSProgrammingLatency.Reset() - go controller.Run() + object.DNSProgrammingLatency.Reset() subset1 := []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}}, @@ -121,24 +111,22 @@ func TestDnsProgrammingLatencyEndpoints(t *testing.T) { Addresses: []api.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}}, }} - createService(t, client, controller, "my-service", api.ClusterIPNone) - createEndpoints(t, client, "my-service", now.Add(-2*time.Second), subset1) - updateEndpoints(t, client, "my-service", now.Add(-1*time.Second), subset2) + createService(t, svcProc, "my-service", api.ClusterIPNone) + createEndpoints(t, epProc, "my-service", now.Add(-2*time.Second), subset1) + updateEndpoints(t, epProc, "my-service", now.Add(-1*time.Second), subset2) - createEndpoints(t, client, "endpoints-no-service", now.Add(-4*time.Second), nil) + createEndpoints(t, epProc, "endpoints-no-service", now.Add(-4*time.Second), nil) - createService(t, client, controller, "clusterIP-service", "10.40.0.12") - createEndpoints(t, client, "clusterIP-service", now.Add(-8*time.Second), nil) + createService(t, svcProc, "clusterIP-service", "10.40.0.12") + createEndpoints(t, epProc, "clusterIP-service", now.Add(-8*time.Second), nil) - createService(t, client, controller, "headless-no-annotation", api.ClusterIPNone) - createEndpoints(t, client, "headless-no-annotation", nil, nil) + createService(t, svcProc, "headless-no-annotation", api.ClusterIPNone) + createEndpoints(t, epProc, "headless-no-annotation", nil, nil) - createService(t, client, controller, "headless-wrong-annotation", api.ClusterIPNone) - createEndpoints(t, client, "headless-wrong-annotation", "wrong-value", nil) + createService(t, svcProc, "headless-wrong-annotation", api.ClusterIPNone) + createEndpoints(t, epProc, "headless-wrong-annotation", "wrong-value", nil) - controller.Stop() - - if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil { + if err := testutil.CollectAndCompare(object.DNSProgrammingLatency, strings.NewReader(expected)); err != nil { t.Error(err) } } @@ -175,49 +163,41 @@ func buildEndpointSlice(name string, lastChangeTriggerTime interface{}, endpoint } } -func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { - ctx := context.TODO() - _, err := client.CoreV1().Endpoints(namespace).Create(ctx, buildEndpoints(name, triggerTime, subsets), meta.CreateOptions{}) +func createEndpoints(t *testing.T, processor cache.ProcessFunc, name string, triggerTime interface{}, subsets []api.EndpointSubset) { + err := processor(cache.Deltas{{Type: cache.Added, Object: buildEndpoints(name, triggerTime, subsets)}}) if err != nil { t.Fatal(err) } } -func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { - ctx := context.TODO() - _, err := client.CoreV1().Endpoints(namespace).Update(ctx, buildEndpoints(name, triggerTime, subsets), meta.UpdateOptions{}) +func updateEndpoints(t *testing.T, processor cache.ProcessFunc, name string, triggerTime interface{}, subsets []api.EndpointSubset) { + err := processor(cache.Deltas{{Type: cache.Updated, Object: buildEndpoints(name, triggerTime, subsets)}}) if err != nil { t.Fatal(err) } } -func createEndpointSlice(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, endpoints []discovery.Endpoint) { - ctx := context.TODO() - _, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(ctx, buildEndpointSlice(name, triggerTime, endpoints), meta.CreateOptions{}) +func createEndpointSlice(t *testing.T, processor cache.ProcessFunc, name string, triggerTime interface{}, endpoints []discovery.Endpoint) { + err := processor(cache.Deltas{{Type: cache.Added, Object: buildEndpointSlice(name, triggerTime, endpoints)}}) if err != nil { t.Fatal(err) } } -func updateEndpointSlice(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, endpoints []discovery.Endpoint) { - ctx := context.TODO() - _, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Update(ctx, buildEndpointSlice(name, triggerTime, endpoints), meta.UpdateOptions{}) +func updateEndpointSlice(t *testing.T, processor cache.ProcessFunc, name string, triggerTime interface{}, endpoints []discovery.Endpoint) { + err := processor(cache.Deltas{{Type: cache.Updated, Object: buildEndpointSlice(name, triggerTime, endpoints)}}) if err != nil { t.Fatal(err) } } -func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) { - ctx := context.TODO() - if _, err := client.CoreV1().Services(namespace).Create(ctx, &api.Service{ +func createService(t *testing.T, processor cache.ProcessFunc, name string, clusterIp string) { + obj := &api.Service{ ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name}, Spec: api.ServiceSpec{ClusterIP: clusterIp}, - }, meta.CreateOptions{}); err != nil { - t.Fatal(err) } - if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { - return len(controller.SvcIndex(object.ServiceKey(name, namespace))) == 1, nil - }); err != nil { + err := processor(cache.Deltas{{Type: cache.Added, Object: obj}}) + if err != nil { t.Fatal(err) } } diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index 304aaa861..09429e0b2 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -5,6 +5,7 @@ import ( api "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -46,30 +47,12 @@ type EndpointPort struct { // EndpointsKey returns a string using for the index. func EndpointsKey(name, namespace string) string { return name + "." + namespace } -// ToEndpoints returns a function that converts an *api.Endpoints to a *Endpoints. -func ToEndpoints(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - eps, ok := obj.(*api.Endpoints) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - return toEndpoints(skipCleanup, eps), nil - } -} - -// EndpointSliceToEndpoints returns a function that converts an *discovery.EndpointSlice to a *Endpoints. -func EndpointSliceToEndpoints(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - eps, ok := obj.(*discovery.EndpointSlice) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - return endpointSliceToEndpoints(skipCleanup, eps), nil - } -} - // toEndpoints converts an *api.Endpoints to a *Endpoints. -func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { +func ToEndpoints(obj meta.Object) (meta.Object, error) { + end, ok := obj.(*api.Endpoints) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } e := &Endpoints{ Version: end.GetResourceVersion(), Name: end.GetName(), @@ -113,15 +96,17 @@ func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { } } - if !skipCleanup { - *end = api.Endpoints{} - } + *end = api.Endpoints{} - return e + return e, nil } -// endpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. -func endpointSliceToEndpoints(skipCleanup bool, ends *discovery.EndpointSlice) *Endpoints { +// EndpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. +func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) { + ends, ok := obj.(*discovery.EndpointSlice) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } e := &Endpoints{ Version: ends.GetResourceVersion(), Name: ends.GetName(), @@ -156,11 +141,9 @@ func endpointSliceToEndpoints(skipCleanup bool, ends *discovery.EndpointSlice) * } } - if !skipCleanup { - *ends = discovery.EndpointSlice{} - } + *ends = discovery.EndpointSlice{} - return e + return e, nil } // CopyWithoutSubsets copies e, without the subsets. diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go index afd134e56..e44cd218a 100644 --- a/plugin/kubernetes/object/informer.go +++ b/plugin/kubernetes/object/informer.go @@ -1,6 +1,8 @@ package object import ( + "fmt" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" @@ -25,13 +27,18 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache. type RecordLatencyFunc func(meta.Object) // DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion. -func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) ProcessorBuilder { +func DefaultProcessor(convert ToFunc, recordLatency *EndpointLatencyRecorder) ProcessorBuilder { return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { return func(obj interface{}) error { for _, d := range obj.(cache.Deltas) { + if recordLatency != nil { + if o, ok := d.Object.(meta.Object); ok { + recordLatency.init(o) + } + } switch d.Type { case cache.Sync, cache.Added, cache.Updated: - obj, err := convert(d.Object) + obj, err := convert(d.Object.(meta.Object)) if err != nil { return err } @@ -47,14 +54,18 @@ func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) Processor h.OnAdd(obj) } if recordLatency != nil { - recordLatency(d.Object.(meta.Object)) + recordLatency.record() } case cache.Deleted: var obj interface{} obj, ok := d.Object.(cache.DeletedFinalStateUnknown) if !ok { var err error - obj, err = convert(d.Object) + metaObj, ok := d.Object.(meta.Object) + if !ok { + return fmt.Errorf("unexpected object %v", d.Object) + } + obj, err = convert(metaObj) if err != nil && err != errPodTerminating { return err } @@ -65,7 +76,7 @@ func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) Processor } h.OnDelete(obj) if !ok && recordLatency != nil { - recordLatency(d.Object.(meta.Object)) + recordLatency.record() } } } diff --git a/plugin/kubernetes/metrics.go b/plugin/kubernetes/object/metrics.go similarity index 70% rename from plugin/kubernetes/metrics.go rename to plugin/kubernetes/object/metrics.go index 8adeb6940..929925cf1 100644 --- a/plugin/kubernetes/metrics.go +++ b/plugin/kubernetes/object/metrics.go @@ -1,10 +1,11 @@ -package kubernetes +package object import ( "time" "github.com/coredns/coredns/plugin" - "github.com/coredns/coredns/plugin/kubernetes/object" + "github.com/coredns/coredns/plugin/pkg/log" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" api "k8s.io/api/core/v1" @@ -25,42 +26,50 @@ var ( // * headless_without_selector DNSProgrammingLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: plugin.Namespace, - Subsystem: pluginName, + Subsystem: "kubernetes", Name: "dns_programming_duration_seconds", // From 1 millisecond to ~17 minutes. Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), Help: "Histogram of the time (in seconds) it took to program a dns instance.", }, []string{"service_kind"}) - // durationSinceFunc returns the duration elapsed since the given time. + // DurationSinceFunc returns the duration elapsed since the given time. // Added as a global variable to allow injection for testing. - durationSinceFunc = time.Since + DurationSinceFunc = time.Since ) -func recordDNSProgrammingLatency(svcs []*object.Service, endpoints meta.Object) { - // getLastChangeTriggerTime is the time.Time value of the EndpointsLastChangeTriggerTime - // annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set - var lastChangeTriggerTime time.Time - stringVal, ok := endpoints.GetAnnotations()[api.EndpointsLastChangeTriggerTime] +// EndpointLatencyRecorder records latency metric for endpoint objects +type EndpointLatencyRecorder struct { + TT time.Time + ServiceFunc func(meta.Object) []*Service + Services []*Service +} + +func (l *EndpointLatencyRecorder) init(o meta.Object) { + l.Services = l.ServiceFunc(o) + l.TT = time.Time{} + stringVal, ok := o.GetAnnotations()[api.EndpointsLastChangeTriggerTime] if ok { - ts, err := time.Parse(time.RFC3339Nano, stringVal) + tt, err := time.Parse(time.RFC3339Nano, stringVal) if err != nil { log.Warningf("DnsProgrammingLatency cannot be calculated for Endpoints '%s/%s'; invalid %q annotation RFC3339 value of %q", - endpoints.GetNamespace(), endpoints.GetName(), api.EndpointsLastChangeTriggerTime, stringVal) - // In case of error val = time.Zero, which is ignored in the upstream code. + o.GetNamespace(), o.GetName(), api.EndpointsLastChangeTriggerTime, stringVal) + // In case of error val = time.Zero, which is ignored downstream. } - lastChangeTriggerTime = ts + l.TT = tt } +} +func (l *EndpointLatencyRecorder) record() { // isHeadless indicates whether the endpoints object belongs to a headless // service (i.e. clusterIp = None). Note that this can be a false negatives if the service // informer is lagging, i.e. we may not see a recently created service. Given that the services // don't change very often (comparing to much more frequent endpoints changes), cases when this method // will return wrong answer should be relatively rare. Because of that we intentionally accept this // flaw to keep the solution simple. - isHeadless := len(svcs) == 1 && svcs[0].ClusterIP == api.ClusterIPNone + isHeadless := len(l.Services) == 1 && l.Services[0].ClusterIP == api.ClusterIPNone - if endpoints == nil || !isHeadless || lastChangeTriggerTime.IsZero() { + if !isHeadless || l.TT.IsZero() { return } @@ -69,5 +78,5 @@ func recordDNSProgrammingLatency(svcs []*object.Service, endpoints meta.Object) // LastChangeTriggerTime annotation is set). It means that the corresponding service is a // "headless service with selector". DNSProgrammingLatency.WithLabelValues("headless_with_selector"). - Observe(durationSinceFunc(lastChangeTriggerTime).Seconds()) + Observe(DurationSinceFunc(l.TT).Seconds()) } diff --git a/plugin/kubernetes/object/object.go b/plugin/kubernetes/object/object.go index f591f6d0a..7111833e6 100644 --- a/plugin/kubernetes/object/object.go +++ b/plugin/kubernetes/object/object.go @@ -22,8 +22,8 @@ import ( "k8s.io/client-go/tools/cache" ) -// ToFunc converts one empty interface to another. -type ToFunc func(interface{}) (interface{}, error) +// ToFunc converts one v1.Object to another v1.Object. +type ToFunc func(v1.Object) (v1.Object, error) // ProcessorBuilder returns function to process cache events. type ProcessorBuilder func(cache.Indexer, cache.ResourceEventHandler) cache.ProcessFunc diff --git a/plugin/kubernetes/object/pod.go b/plugin/kubernetes/object/pod.go index 04cbe1ad2..9b9d5641c 100644 --- a/plugin/kubernetes/object/pod.go +++ b/plugin/kubernetes/object/pod.go @@ -5,6 +5,7 @@ import ( "fmt" api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -21,37 +22,28 @@ type Pod struct { var errPodTerminating = errors.New("pod terminating") -// ToPod returns a function that converts an api.Pod to a *Pod. -func ToPod(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - apiPod, ok := obj.(*api.Pod) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - pod := toPod(skipCleanup, apiPod) - t := apiPod.ObjectMeta.DeletionTimestamp - if t != nil && !(*t).Time.IsZero() { - // if the pod is in the process of termination, return an error so it can be ignored - // during add/update event processing - return pod, errPodTerminating - } - return pod, nil +// ToPod converts an api.Pod to a *Pod. +func ToPod(obj meta.Object) (meta.Object, error) { + apiPod, ok := obj.(*api.Pod) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) } -} - -func toPod(skipCleanup bool, pod *api.Pod) *Pod { - p := &Pod{ - Version: pod.GetResourceVersion(), - PodIP: pod.Status.PodIP, - Namespace: pod.GetNamespace(), - Name: pod.GetName(), + pod := &Pod{ + Version: apiPod.GetResourceVersion(), + PodIP: apiPod.Status.PodIP, + Namespace: apiPod.GetNamespace(), + Name: apiPod.GetName(), + } + t := apiPod.ObjectMeta.DeletionTimestamp + if t != nil && !(*t).Time.IsZero() { + // if the pod is in the process of termination, return an error so it can be ignored + // during add/update event processing + return pod, errPodTerminating } - if !skipCleanup { - *pod = api.Pod{} - } + *apiPod = api.Pod{} - return p + return pod, nil } var _ runtime.Object = &Pod{} diff --git a/plugin/kubernetes/object/service.go b/plugin/kubernetes/object/service.go index de84cf941..be1404ea0 100644 --- a/plugin/kubernetes/object/service.go +++ b/plugin/kubernetes/object/service.go @@ -4,6 +4,7 @@ import ( "fmt" api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -28,18 +29,12 @@ type Service struct { // ServiceKey returns a string using for the index. func ServiceKey(name, namespace string) string { return name + "." + namespace } -// ToService returns a function that converts an api.Service to a *Service. -func ToService(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - svc, ok := obj.(*api.Service) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - return toService(skipCleanup, svc), nil +// ToService converts an api.Service to a *Service. +func ToService(obj meta.Object) (meta.Object, error) { + svc, ok := obj.(*api.Service) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) } -} - -func toService(skipCleanup bool, svc *api.Service) *Service { s := &Service{ Version: svc.GetResourceVersion(), Name: svc.GetName(), @@ -70,11 +65,9 @@ func toService(skipCleanup bool, svc *api.Service) *Service { } - if !skipCleanup { - *svc = api.Service{} - } + *svc = api.Service{} - return s + return s, nil } var _ runtime.Object = &Service{}