plugin/kubernetes: fix tombstone unwrapping (#3924)
* fix tombstone unwrapping Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
This commit is contained in:
parent
d35c8e9eda
commit
d902e85919
4 changed files with 163 additions and 83 deletions
|
@ -113,7 +113,7 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
|
|||
&api.Service{},
|
||||
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
|
||||
cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc},
|
||||
object.DefaultProcessor(object.ToService(opts.skipAPIObjectsCleanup)),
|
||||
object.DefaultProcessor(object.ToService(opts.skipAPIObjectsCleanup), nil),
|
||||
)
|
||||
|
||||
if opts.initPodCache {
|
||||
|
@ -125,7 +125,7 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
|
|||
&api.Pod{},
|
||||
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
|
||||
cache.Indexers{podIPIndex: podIPIndexFunc},
|
||||
object.DefaultProcessor(object.ToPod(opts.skipAPIObjectsCleanup)),
|
||||
object.DefaultProcessor(object.ToPod(opts.skipAPIObjectsCleanup), nil),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -136,73 +136,10 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
|
|||
WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
|
||||
},
|
||||
&api.Endpoints{},
|
||||
cache.ResourceEventHandlerFuncs{},
|
||||
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
|
||||
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
|
||||
func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
|
||||
return func(obj interface{}) error {
|
||||
for _, d := range obj.(cache.Deltas) {
|
||||
switch d.Type {
|
||||
case cache.Sync, cache.Added, cache.Updated:
|
||||
apiEndpoints, ok := d.Object.(*api.Endpoints)
|
||||
if !ok {
|
||||
return errors.New("got non-endpoint add/update")
|
||||
}
|
||||
obj := object.ToEndpoints(apiEndpoints)
|
||||
|
||||
if old, exists, err := clientState.Get(obj); err == nil && exists {
|
||||
if err := clientState.Update(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
h.OnUpdate(old, obj)
|
||||
// endpoint updates can come frequently, make sure it's a change we care about
|
||||
if !endpointsEquivalent(old.(*object.Endpoints), obj) {
|
||||
dns.updateModifed()
|
||||
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
|
||||
}
|
||||
} else {
|
||||
if err := clientState.Add(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
h.OnAdd(d.Object)
|
||||
dns.updateModifed()
|
||||
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
|
||||
if !opts.skipAPIObjectsCleanup {
|
||||
*apiEndpoints = api.Endpoints{}
|
||||
}
|
||||
}
|
||||
case cache.Deleted:
|
||||
apiEndpoints, ok := d.Object.(*api.Endpoints)
|
||||
if !ok {
|
||||
// Assume that the object must be a cache.DeletedFinalStateUnknown.
|
||||
// This is essentially an indicator that the Endpoint was deleted, without a containing a
|
||||
// up-to date copy of the Endpoints object. We need to use cache.DeletedFinalStateUnknown
|
||||
// object so it can be properly deleted by store.Delete() below, which knows how to handle it.
|
||||
tombstone, ok := d.Object.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
return errors.New("expected tombstone")
|
||||
}
|
||||
apiEndpoints, ok = tombstone.Obj.(*api.Endpoints)
|
||||
if !ok {
|
||||
return errors.New("got non-endpoint tombstone")
|
||||
}
|
||||
}
|
||||
obj := object.ToEndpoints(apiEndpoints)
|
||||
|
||||
if err := clientState.Delete(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
h.OnDelete(d.Object)
|
||||
dns.updateModifed()
|
||||
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
|
||||
if !opts.skipAPIObjectsCleanup {
|
||||
*apiEndpoints = api.Endpoints{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
})
|
||||
|
||||
object.DefaultProcessor(object.ToEndpoints(opts.skipAPIObjectsCleanup), dns.recordDNSProgrammingLatency),
|
||||
)
|
||||
}
|
||||
|
||||
dns.nsLister, dns.nsController = cache.NewInformer(
|
||||
|
@ -217,6 +154,10 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
|
|||
return &dns
|
||||
}
|
||||
|
||||
func (dns *dnsControl) recordDNSProgrammingLatency(obj meta.Object) {
|
||||
recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj.(*api.Endpoints))
|
||||
}
|
||||
|
||||
func podIPIndexFunc(obj interface{}) ([]string, error) {
|
||||
p, ok := obj.(*object.Pod)
|
||||
if !ok {
|
||||
|
@ -472,8 +413,8 @@ func (dns *dnsControl) GetNamespaceByName(name string) (*api.Namespace, error) {
|
|||
return nil, fmt.Errorf("namespace not found")
|
||||
}
|
||||
|
||||
func (dns *dnsControl) Add(obj interface{}) { dns.detectChanges(nil, obj) }
|
||||
func (dns *dnsControl) Delete(obj interface{}) { dns.detectChanges(obj, nil) }
|
||||
func (dns *dnsControl) Add(obj interface{}) { dns.updateModifed() }
|
||||
func (dns *dnsControl) Delete(obj interface{}) { dns.updateModifed() }
|
||||
func (dns *dnsControl) Update(oldObj, newObj interface{}) { dns.detectChanges(oldObj, newObj) }
|
||||
|
||||
// detectChanges detects changes in objects, and updates the modified timestamp
|
||||
|
@ -491,12 +432,16 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
|
|||
dns.updateModifed()
|
||||
case *object.Pod:
|
||||
dns.updateModifed()
|
||||
case *object.Endpoints:
|
||||
if !endpointsEquivalent(oldObj.(*object.Endpoints), newObj.(*object.Endpoints)) {
|
||||
dns.updateModifed()
|
||||
}
|
||||
default:
|
||||
log.Warningf("Updates for %T not supported.", ob)
|
||||
}
|
||||
}
|
||||
|
||||
func (dns *dnsControl) getServices(endpoints *object.Endpoints) []*object.Service {
|
||||
func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service {
|
||||
return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace()))
|
||||
}
|
||||
|
||||
|
|
111
plugin/kubernetes/informer_test.go
Normal file
111
plugin/kubernetes/informer_test.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
package kubernetes
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/coredns/coredns/plugin/kubernetes/object"
|
||||
|
||||
api "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
func TestDefaultProcessor(t *testing.T) {
|
||||
pbuild := object.DefaultProcessor(object.ToService(true), nil)
|
||||
reh := cache.ResourceEventHandlerFuncs{}
|
||||
idx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{})
|
||||
processor := pbuild(idx, reh)
|
||||
testProcessor(t, processor, idx)
|
||||
}
|
||||
|
||||
func testProcessor(t *testing.T, processor cache.ProcessFunc, idx cache.Indexer) {
|
||||
obj := &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "service1", Namespace: "test1"},
|
||||
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{Port: 80}}},
|
||||
}
|
||||
obj2 := &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "service2", Namespace: "test1"},
|
||||
Spec: api.ServiceSpec{ClusterIP: "5.6.7.8", Ports: []api.ServicePort{{Port: 80}}},
|
||||
}
|
||||
|
||||
// Add the objects
|
||||
err := processor(cache.Deltas{
|
||||
{Type: cache.Added, Object: obj},
|
||||
{Type: cache.Added, Object: obj2},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("add failed: %v", err)
|
||||
}
|
||||
got, exists, err := idx.Get(obj)
|
||||
if err != nil {
|
||||
t.Fatalf("get added object failed: %v", err)
|
||||
}
|
||||
if !exists {
|
||||
t.Fatal("added object not found in index")
|
||||
}
|
||||
svc, ok := got.(*object.Service)
|
||||
if !ok {
|
||||
t.Fatal("object in index was incorrect type")
|
||||
}
|
||||
if svc.ClusterIP != obj.Spec.ClusterIP {
|
||||
t.Fatalf("expected %v, got %v", obj.Spec.ClusterIP, svc.ClusterIP)
|
||||
}
|
||||
|
||||
// Update an object
|
||||
obj.Spec.ClusterIP = "1.2.3.5"
|
||||
err = processor(cache.Deltas{{
|
||||
Type: cache.Updated,
|
||||
Object: obj,
|
||||
}})
|
||||
if err != nil {
|
||||
t.Fatalf("update failed: %v", err)
|
||||
}
|
||||
got, exists, err = idx.Get(obj)
|
||||
if err != nil {
|
||||
t.Fatalf("get updated object failed: %v", err)
|
||||
}
|
||||
if !exists {
|
||||
t.Fatal("updated object not found in index")
|
||||
}
|
||||
svc, ok = got.(*object.Service)
|
||||
if !ok {
|
||||
t.Fatal("object in index was incorrect type")
|
||||
}
|
||||
if svc.ClusterIP != obj.Spec.ClusterIP {
|
||||
t.Fatalf("expected %v, got %v", obj.Spec.ClusterIP, svc.ClusterIP)
|
||||
}
|
||||
|
||||
// Delete an object
|
||||
err = processor(cache.Deltas{{
|
||||
Type: cache.Deleted,
|
||||
Object: obj2,
|
||||
}})
|
||||
if err != nil {
|
||||
t.Fatalf("delete test failed: %v", err)
|
||||
}
|
||||
got, exists, err = idx.Get(obj2)
|
||||
if err != nil {
|
||||
t.Fatalf("get deleted object failed: %v", err)
|
||||
}
|
||||
if exists {
|
||||
t.Fatal("deleted object found in index")
|
||||
}
|
||||
|
||||
// Delete an object via tombstone
|
||||
key, _ := cache.MetaNamespaceKeyFunc(obj)
|
||||
tombstone := cache.DeletedFinalStateUnknown{Key: key, Obj: svc}
|
||||
err = processor(cache.Deltas{{
|
||||
Type: cache.Deleted,
|
||||
Object: tombstone,
|
||||
}})
|
||||
if err != nil {
|
||||
t.Fatalf("tombstone delete test failed: %v", err)
|
||||
}
|
||||
got, exists, err = idx.Get(svc)
|
||||
if err != nil {
|
||||
t.Fatalf("get tombstone deleted object failed: %v", err)
|
||||
}
|
||||
if exists {
|
||||
t.Fatal("tombstone deleted object found in index")
|
||||
}
|
||||
}
|
|
@ -1,6 +1,8 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
api "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
@ -43,8 +45,19 @@ type EndpointPort struct {
|
|||
// EndpointsKey return a string using for the index.
|
||||
func EndpointsKey(name, namespace string) string { return name + "." + namespace }
|
||||
|
||||
// ToEndpoints converts an api.Endpoints to a *Endpoints.
|
||||
func ToEndpoints(end *api.Endpoints) *Endpoints {
|
||||
// ToEndpoints returns a function that converts an *api.Endpoints to a *Endpoints.
|
||||
func ToEndpoints(skipCleanup bool) ToFunc {
|
||||
return func(obj interface{}) (interface{}, error) {
|
||||
eps, ok := obj.(*api.Endpoints)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected object %v", obj)
|
||||
}
|
||||
return toEndpoints(skipCleanup, eps), nil
|
||||
}
|
||||
}
|
||||
|
||||
// toEndpoints converts an *api.Endpoints to a *Endpoints.
|
||||
func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints {
|
||||
e := &Endpoints{
|
||||
Version: end.GetResourceVersion(),
|
||||
Name: end.GetName(),
|
||||
|
@ -88,6 +101,10 @@ func ToEndpoints(end *api.Endpoints) *Endpoints {
|
|||
}
|
||||
}
|
||||
|
||||
if !skipCleanup {
|
||||
*end = api.Endpoints{}
|
||||
}
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
@ -20,8 +21,10 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.
|
|||
return clientState, cache.New(cfg)
|
||||
}
|
||||
|
||||
// DefaultProcessor is a copy of Process function from cache.NewIndexerInformer except it does a conversion.
|
||||
func DefaultProcessor(convert ToFunc) ProcessorBuilder {
|
||||
type recordLatencyFunc func(meta.Object)
|
||||
|
||||
// DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion.
|
||||
func DefaultProcessor(convert ToFunc, recordLatency recordLatencyFunc) ProcessorBuilder {
|
||||
return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
|
||||
return func(obj interface{}) error {
|
||||
for _, d := range obj.(cache.Deltas) {
|
||||
|
@ -42,23 +45,27 @@ func DefaultProcessor(convert ToFunc) ProcessorBuilder {
|
|||
}
|
||||
h.OnAdd(obj)
|
||||
}
|
||||
if recordLatency != nil {
|
||||
recordLatency(d.Object.(meta.Object))
|
||||
}
|
||||
case cache.Deleted:
|
||||
var obj interface{}
|
||||
var err error
|
||||
tombstone, ok := d.Object.(cache.DeletedFinalStateUnknown)
|
||||
if ok {
|
||||
obj, err = convert(tombstone.Obj)
|
||||
} else {
|
||||
obj, ok := d.Object.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
var err error
|
||||
obj, err = convert(d.Object)
|
||||
}
|
||||
if err != nil && err != errPodTerminating {
|
||||
return err
|
||||
if err != nil && err != errPodTerminating {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := clientState.Delete(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
h.OnDelete(obj)
|
||||
if !ok && recordLatency != nil {
|
||||
recordLatency(d.Object.(meta.Object))
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
Loading…
Add table
Reference in a new issue