plugin/kubernetes: fix headless/endpoint query panics when endpoints are disabled (#6137)

* always create listers, so we dont panic

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>

---------

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
This commit is contained in:
Chris O'Haver 2023-06-07 16:22:28 -04:00 committed by GitHub
parent 7bf37c1da0
commit 6d3db023fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 119 additions and 50 deletions

View file

@ -128,33 +128,37 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
object.DefaultProcessor(object.ToService, nil), object.DefaultProcessor(object.ToService, nil),
) )
podLister, podController := object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Pod{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{podIPIndex: podIPIndexFunc},
object.DefaultProcessor(object.ToPod, nil),
)
dns.podLister = podLister
if opts.initPodCache { if opts.initPodCache {
dns.podLister, dns.podController = object.NewIndexerInformer( dns.podController = podController
&cache.ListWatch{
ListFunc: podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Pod{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{podIPIndex: podIPIndexFunc},
object.DefaultProcessor(object.ToPod, nil),
)
} }
epLister, epController := object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
)
dns.epLock.Lock()
dns.epLister = epLister
if opts.initEndpointsCache { if opts.initEndpointsCache {
dns.epLock.Lock() dns.epController = epController
dns.epLister, dns.epController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
)
dns.epLock.Unlock()
} }
dns.epLock.Unlock()
dns.nsLister, dns.nsController = object.NewIndexerInformer( dns.nsLister, dns.nsController = object.NewIndexerInformer(
&cache.ListWatch{ &cache.ListWatch{
@ -561,8 +565,8 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
} }
// GetNodeByName return the node by name. If nothing is found an error is // GetNodeByName return the node by name. If nothing is found an error is
// returned. This query causes a roundtrip to the k8s API server, so use // returned. This query causes a round trip to the k8s API server, so use
// sparingly. Currently this is only used for Federation. // sparingly. Currently, this is only used for Federation.
func (dns *dnsControl) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { func (dns *dnsControl) GetNodeByName(ctx context.Context, name string) (*api.Node, error) {
v1node, err := dns.client.CoreV1().Nodes().Get(ctx, name, meta.GetOptions{}) v1node, err := dns.client.CoreV1().Nodes().Get(ctx, name, meta.GetOptions{})
return v1node, err return v1node, err

View file

@ -1,16 +1,19 @@
package kubernetes package kubernetes
import ( import (
"context" "context"
"net" "net"
"strconv" "strconv"
"testing" "testing"
"time"
"github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/test" "github.com/coredns/coredns/plugin/test"
"github.com/miekg/dns" "github.com/miekg/dns"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
@ -25,23 +28,39 @@ func inc(ip net.IP) {
} }
} }
func BenchmarkController(b *testing.B) { func kubernetesWithFakeClient(ctx context.Context, zone, cidr string, initEndpointsCache bool, svcType string) *Kubernetes {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
dco := dnsControlOpts{ dco := dnsControlOpts{
zones: []string{"cluster.local."}, zones: []string{zone},
initEndpointsCache: initEndpointsCache,
} }
ctx := context.Background()
controller := newdnsController(ctx, client, dco) controller := newdnsController(ctx, client, dco)
cidr := "10.0.0.0/19"
// Add resources // Add resources
generateEndpoints(cidr, client) _, err := client.CoreV1().Namespaces().Create(ctx, &api.Namespace{ObjectMeta: meta.ObjectMeta{Name: "testns"}}, meta.CreateOptions{})
generateSvcs(cidr, "all", client) if err != nil {
m := new(dns.Msg) log.Fatal(err)
m.SetQuestion("svc1.testns.svc.cluster.local.", dns.TypeA) }
generateSvcs(cidr, svcType, client)
generateEndpointSlices(cidr, client)
k := New([]string{"cluster.local."}) k := New([]string{"cluster.local."})
k.APIConn = controller k.APIConn = controller
return k
}
func BenchmarkController(b *testing.B) {
ctx := context.Background()
k := kubernetesWithFakeClient(ctx, "cluster.local.", "10.0.0.0/24", true, "all")
go k.APIConn.Run()
defer k.APIConn.Stop()
for !k.APIConn.HasSynced() {
time.Sleep(time.Millisecond)
}
rw := &test.ResponseWriter{} rw := &test.ResponseWriter{}
m := new(dns.Msg)
m.SetQuestion("svc1.testns.svc.cluster.local.", dns.TypeA)
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -49,7 +68,47 @@ func BenchmarkController(b *testing.B) {
} }
} }
func generateEndpoints(cidr string, client kubernetes.Interface) { func TestEndpointsDisabled(t *testing.T) {
ctx := context.Background()
k := kubernetesWithFakeClient(ctx, "cluster.local.", "10.0.0.0/30", false, "headless")
k.opts.initEndpointsCache = false
go k.APIConn.Run()
defer k.APIConn.Stop()
for !k.APIConn.HasSynced() {
time.Sleep(time.Millisecond)
}
rw := &dnstest.Recorder{ResponseWriter: &test.ResponseWriter{}}
m := new(dns.Msg)
m.SetQuestion("svc2.testns.svc.cluster.local.", dns.TypeA)
k.ServeDNS(ctx, rw, m)
if rw.Msg.Rcode != dns.RcodeNameError {
t.Errorf("Expected NXDOMAIN, got %v", dns.RcodeToString[rw.Msg.Rcode])
}
}
func TestEndpointsEnabled(t *testing.T) {
ctx := context.Background()
k := kubernetesWithFakeClient(ctx, "cluster.local.", "10.0.0.0/30", true, "headless")
k.opts.initEndpointsCache = true
go k.APIConn.Run()
defer k.APIConn.Stop()
for !k.APIConn.HasSynced() {
time.Sleep(time.Millisecond)
}
rw := &dnstest.Recorder{ResponseWriter: &test.ResponseWriter{}}
m := new(dns.Msg)
m.SetQuestion("svc2.testns.svc.cluster.local.", dns.TypeA)
k.ServeDNS(ctx, rw, m)
if rw.Msg.Rcode != dns.RcodeSuccess {
t.Errorf("Expected SUCCESS, got %v", dns.RcodeToString[rw.Msg.Rcode])
}
}
func generateEndpointSlices(cidr string, client kubernetes.Interface) {
// https://groups.google.com/d/msg/golang-nuts/zlcYA4qk-94/TWRFHeXJCcYJ // https://groups.google.com/d/msg/golang-nuts/zlcYA4qk-94/TWRFHeXJCcYJ
ip, ipnet, err := net.ParseCIDR(cidr) ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil { if err != nil {
@ -57,30 +116,36 @@ func generateEndpoints(cidr string, client kubernetes.Interface) {
} }
count := 1 count := 1
ep := &api.Endpoints{ port := int32(80)
Subsets: []api.EndpointSubset{{ protocol := api.Protocol("tcp")
Ports: []api.EndpointPort{ name := "http"
{ eps := &discovery.EndpointSlice{
Port: 80, Ports: []discovery.EndpointPort{
Protocol: "tcp", {
Name: "http", Port: &port,
}, Protocol: &protocol,
Name: &name,
}, },
}}, },
ObjectMeta: meta.ObjectMeta{ ObjectMeta: meta.ObjectMeta{
Namespace: "testns", Namespace: "testns",
}, },
} }
ctx := context.TODO() ctx := context.TODO()
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
ep.Subsets[0].Addresses = []api.EndpointAddress{ hostname := "foo" + strconv.Itoa(count)
eps.Endpoints = []discovery.Endpoint{
{ {
IP: ip.String(), Addresses: []string{ip.String()},
Hostname: "foo" + strconv.Itoa(count), Hostname: &hostname,
}, },
} }
ep.ObjectMeta.Name = "svc" + strconv.Itoa(count) eps.ObjectMeta.Name = "svc" + strconv.Itoa(count)
client.CoreV1().Endpoints("testns").Create(ctx, ep, meta.CreateOptions{}) eps.ObjectMeta.Labels = map[string]string{discovery.LabelServiceName: eps.ObjectMeta.Name}
_, err := client.DiscoveryV1().EndpointSlices("testns").Create(ctx, eps, meta.CreateOptions{})
if err != nil {
log.Fatal(err)
}
count++ count++
} }
} }
@ -144,7 +209,7 @@ func createHeadlessSvc(suffix int, client kubernetes.Interface, ip net.IP) {
ctx := context.TODO() ctx := context.TODO()
client.CoreV1().Services("testns").Create(ctx, &api.Service{ client.CoreV1().Services("testns").Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{ ObjectMeta: meta.ObjectMeta{
Name: "hdls" + strconv.Itoa(suffix), Name: "svc" + strconv.Itoa(suffix),
Namespace: "testns", Namespace: "testns",
}, },
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
@ -157,7 +222,7 @@ func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) {
ctx := context.TODO() ctx := context.TODO()
client.CoreV1().Services("testns").Create(ctx, &api.Service{ client.CoreV1().Services("testns").Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{ ObjectMeta: meta.ObjectMeta{
Name: "external" + strconv.Itoa(suffix), Name: "svc" + strconv.Itoa(suffix),
Namespace: "testns", Namespace: "testns",
}, },
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{