diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 2964f80bb..01cce28f2 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -141,11 +141,14 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { return func(obj interface{}) error { for _, d := range obj.(cache.Deltas) { - - apiEndpoints, obj := object.ToEndpoints(d.Object) - switch d.Type { case cache.Sync, cache.Added, cache.Updated: + apiEndpoints, ok := d.Object.(*api.Endpoints) + if !ok { + return errors.New("got non-endpoint add/update") + } + obj := object.ToEndpoints(apiEndpoints) + if old, exists, err := clientState.Get(obj); err == nil && exists { if err := clientState.Update(obj); err != nil { return err @@ -163,17 +166,37 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts h.OnAdd(d.Object) dns.updateModifed() recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints) + if !opts.skipAPIObjectsCleanup { + *apiEndpoints = api.Endpoints{} + } } case cache.Deleted: + apiEndpoints, ok := d.Object.(*api.Endpoints) + if !ok { + // Assume that the object must be a cache.DeletedFinalStateUnknown. + // This is essentially an indicator that the Endpoint was deleted, without a containing a + // up-to date copy of the Endpoints object. We need to use cache.DeletedFinalStateUnknown + // object so it can be properly deleted by store.Delete() below, which knows how to handle it. + tombstone, ok := d.Object.(cache.DeletedFinalStateUnknown) + if !ok { + return errors.New("expected tombstone") + } + apiEndpoints, ok = tombstone.Obj.(*api.Endpoints) + if !ok { + return errors.New("got non-endpoint tombstone") + } + } + obj := object.ToEndpoints(apiEndpoints) + if err := clientState.Delete(obj); err != nil { return err } h.OnDelete(d.Object) dns.updateModifed() recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints) - } - if !opts.skipAPIObjectsCleanup { - *apiEndpoints = api.Endpoints{} + if !opts.skipAPIObjectsCleanup { + *apiEndpoints = api.Endpoints{} + } } } return nil diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index c7d6b7323..2a7d69acf 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -44,12 +44,7 @@ type EndpointPort struct { func EndpointsKey(name, namespace string) string { return name + "." + namespace } // ToEndpoints converts an api.Endpoints to a *Endpoints. -func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) { - end, ok := obj.(*api.Endpoints) - if !ok { - return nil, nil - } - +func ToEndpoints(end *api.Endpoints) *Endpoints { e := &Endpoints{ Version: end.GetResourceVersion(), Name: end.GetName(), @@ -93,7 +88,7 @@ func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) { } } - return end, e + return e } // CopyWithoutSubsets copies e, without the subsets.