plugin/kubernetes: Handle endpoint tombstones (#3887)

* check for nil

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* handle tombstone

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* move casting to caller. add comments.

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* more sanding

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* more scrubbing

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* move object unwraping to switch cases

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* oops remove debug

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

* remove cruft

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
This commit is contained in:
Chris O'Haver 2020-05-14 15:39:40 -04:00 committed by GitHub
parent f4cb9a1ba3
commit bb7ee5010e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 13 deletions

View file

@ -141,11 +141,14 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
return func(obj interface{}) error { return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) { for _, d := range obj.(cache.Deltas) {
apiEndpoints, obj := object.ToEndpoints(d.Object)
switch d.Type { switch d.Type {
case cache.Sync, cache.Added, cache.Updated: case cache.Sync, cache.Added, cache.Updated:
apiEndpoints, ok := d.Object.(*api.Endpoints)
if !ok {
return errors.New("got non-endpoint add/update")
}
obj := object.ToEndpoints(apiEndpoints)
if old, exists, err := clientState.Get(obj); err == nil && exists { if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil { if err := clientState.Update(obj); err != nil {
return err return err
@ -163,19 +166,39 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
h.OnAdd(d.Object) h.OnAdd(d.Object)
dns.updateModifed() dns.updateModifed()
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints) recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
if !opts.skipAPIObjectsCleanup {
*apiEndpoints = api.Endpoints{}
}
} }
case cache.Deleted: case cache.Deleted:
apiEndpoints, ok := d.Object.(*api.Endpoints)
if !ok {
// Assume that the object must be a cache.DeletedFinalStateUnknown.
// This is essentially an indicator that the Endpoint was deleted, without a containing a
// up-to date copy of the Endpoints object. We need to use cache.DeletedFinalStateUnknown
// object so it can be properly deleted by store.Delete() below, which knows how to handle it.
tombstone, ok := d.Object.(cache.DeletedFinalStateUnknown)
if !ok {
return errors.New("expected tombstone")
}
apiEndpoints, ok = tombstone.Obj.(*api.Endpoints)
if !ok {
return errors.New("got non-endpoint tombstone")
}
}
obj := object.ToEndpoints(apiEndpoints)
if err := clientState.Delete(obj); err != nil { if err := clientState.Delete(obj); err != nil {
return err return err
} }
h.OnDelete(d.Object) h.OnDelete(d.Object)
dns.updateModifed() dns.updateModifed()
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints) recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
}
if !opts.skipAPIObjectsCleanup { if !opts.skipAPIObjectsCleanup {
*apiEndpoints = api.Endpoints{} *apiEndpoints = api.Endpoints{}
} }
} }
}
return nil return nil
} }
}) })

View file

@ -44,12 +44,7 @@ type EndpointPort struct {
func EndpointsKey(name, namespace string) string { return name + "." + namespace } func EndpointsKey(name, namespace string) string { return name + "." + namespace }
// ToEndpoints converts an api.Endpoints to a *Endpoints. // ToEndpoints converts an api.Endpoints to a *Endpoints.
func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) { func ToEndpoints(end *api.Endpoints) *Endpoints {
end, ok := obj.(*api.Endpoints)
if !ok {
return nil, nil
}
e := &Endpoints{ e := &Endpoints{
Version: end.GetResourceVersion(), Version: end.GetResourceVersion(),
Name: end.GetName(), Name: end.GetName(),
@ -93,7 +88,7 @@ func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) {
} }
} }
return end, e return e
} }
// CopyWithoutSubsets copies e, without the subsets. // CopyWithoutSubsets copies e, without the subsets.