From a3aeb3d5034be71a352f874cfe7d7d31c218059d Mon Sep 17 00:00:00 2001 From: Chris O'Haver Date: Fri, 15 May 2020 12:47:29 -0400 Subject: [PATCH] plugin/kubernetes: handle tombstones in default processor (#3890) * handle deletion tombstones in default processor Signed-off-by: Chris O'Haver * fix terminating pod exclusion Signed-off-by: Chris O'Haver --- plugin/kubernetes/object/informer.go | 19 ++++++++++++++--- plugin/kubernetes/object/object.go | 2 +- plugin/kubernetes/object/pod.go | 32 +++++++++++++++++----------- plugin/kubernetes/object/service.go | 17 ++++++++------- 4 files changed, 45 insertions(+), 25 deletions(-) diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go index bd4d05d30..e0d7f180c 100644 --- a/plugin/kubernetes/object/informer.go +++ b/plugin/kubernetes/object/informer.go @@ -25,11 +25,12 @@ func DefaultProcessor(convert ToFunc) ProcessorBuilder { return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { return func(obj interface{}) error { for _, d := range obj.(cache.Deltas) { - - obj := convert(d.Object) - switch d.Type { case cache.Sync, cache.Added, cache.Updated: + obj, err := convert(d.Object) + if err != nil { + return err + } if old, exists, err := clientState.Get(obj); err == nil && exists { if err := clientState.Update(obj); err != nil { return err @@ -42,6 +43,18 @@ func DefaultProcessor(convert ToFunc) ProcessorBuilder { h.OnAdd(obj) } case cache.Deleted: + var obj interface{} + var err error + tombstone, ok := d.Object.(cache.DeletedFinalStateUnknown) + if ok { + obj, err = convert(tombstone.Obj) + } else { + obj, err = convert(d.Object) + } + if err != nil && err != errPodTerminating { + return err + } + if err := clientState.Delete(obj); err != nil { return err } diff --git a/plugin/kubernetes/object/object.go b/plugin/kubernetes/object/object.go index d08960b11..f591f6d0a 100644 --- a/plugin/kubernetes/object/object.go +++ b/plugin/kubernetes/object/object.go @@ -23,7 +23,7 @@ import ( ) // ToFunc converts one empty interface to another. -type ToFunc func(interface{}) interface{} +type ToFunc func(interface{}) (interface{}, error) // ProcessorBuilder returns function to process cache events. type ProcessorBuilder func(cache.Indexer, cache.ResourceEventHandler) cache.ProcessFunc diff --git a/plugin/kubernetes/object/pod.go b/plugin/kubernetes/object/pod.go index 9fc9b5726..04cbe1ad2 100644 --- a/plugin/kubernetes/object/pod.go +++ b/plugin/kubernetes/object/pod.go @@ -1,6 +1,9 @@ package object import ( + "errors" + "fmt" + api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -16,30 +19,33 @@ type Pod struct { *Empty } +var errPodTerminating = errors.New("pod terminating") + // ToPod returns a function that converts an api.Pod to a *Pod. func ToPod(skipCleanup bool) ToFunc { - return func(obj interface{}) interface{} { - return toPod(skipCleanup, obj) + return func(obj interface{}) (interface{}, error) { + apiPod, ok := obj.(*api.Pod) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } + pod := toPod(skipCleanup, apiPod) + t := apiPod.ObjectMeta.DeletionTimestamp + if t != nil && !(*t).Time.IsZero() { + // if the pod is in the process of termination, return an error so it can be ignored + // during add/update event processing + return pod, errPodTerminating + } + return pod, nil } } -func toPod(skipCleanup bool, obj interface{}) interface{} { - pod, ok := obj.(*api.Pod) - if !ok { - return nil - } - +func toPod(skipCleanup bool, pod *api.Pod) *Pod { p := &Pod{ Version: pod.GetResourceVersion(), PodIP: pod.Status.PodIP, Namespace: pod.GetNamespace(), Name: pod.GetName(), } - // don't add pods that are being deleted. - t := pod.ObjectMeta.DeletionTimestamp - if t != nil && !(*t).Time.IsZero() { - return nil - } if !skipCleanup { *pod = api.Pod{} diff --git a/plugin/kubernetes/object/service.go b/plugin/kubernetes/object/service.go index 295715e2d..3dc061528 100644 --- a/plugin/kubernetes/object/service.go +++ b/plugin/kubernetes/object/service.go @@ -1,6 +1,8 @@ package object import ( + "fmt" + api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -28,17 +30,16 @@ func ServiceKey(name, namespace string) string { return name + "." + namespace } // ToService returns a function that converts an api.Service to a *Service. func ToService(skipCleanup bool) ToFunc { - return func(obj interface{}) interface{} { - return toService(skipCleanup, obj) + return func(obj interface{}) (interface{}, error) { + svc, ok := obj.(*api.Service) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } + return toService(skipCleanup, svc), nil } } -func toService(skipCleanup bool, obj interface{}) interface{} { - svc, ok := obj.(*api.Service) - if !ok { - return nil - } - +func toService(skipCleanup bool, svc *api.Service) *Service { s := &Service{ Version: svc.GetResourceVersion(), Name: svc.GetName(),