plugin/kubernetes: implement HasSynced() (#1155)
* plugin/kubernetes: wait until api is ready Wait for HasSynced before allowing startup to avoid startup race. Also do a small refactor in findServices() to pull a check out of the loop - only needs to be done once. * sigh
This commit is contained in:
parent
c1f67493de
commit
d64b684831
13 changed files with 36 additions and 41 deletions
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
type APIConnFederationTest struct{}
|
||||
|
||||
func (APIConnFederationTest) HasSynced() bool { return true }
|
||||
func (APIConnFederationTest) Run() { return }
|
||||
func (APIConnFederationTest) Stop() error { return nil }
|
||||
func (APIConnFederationTest) SvcIndexReverse(string) []*api.Service { return nil }
|
||||
|
|
|
@ -65,7 +65,9 @@ func (p *proxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
func (p *apiProxy) Run() {
|
||||
p.handler.Start()
|
||||
p.Serve(p.listener)
|
||||
go func() {
|
||||
p.Serve(p.listener)
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *apiProxy) Stop() {
|
||||
|
|
|
@ -38,6 +38,7 @@ type dnsController interface {
|
|||
GetNodeByName(string) (*api.Node, error)
|
||||
|
||||
Run()
|
||||
HasSynced() bool
|
||||
Stop() error
|
||||
}
|
||||
|
||||
|
@ -229,17 +230,6 @@ func endpointsWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector)
|
|||
}
|
||||
}
|
||||
|
||||
func (dns *dnsControl) controllersInSync() bool {
|
||||
hs := dns.svcController.HasSynced() &&
|
||||
dns.epController.HasSynced()
|
||||
|
||||
if dns.podController != nil {
|
||||
hs = hs && dns.podController.HasSynced()
|
||||
}
|
||||
|
||||
return hs
|
||||
}
|
||||
|
||||
// Stop stops the controller.
|
||||
func (dns *dnsControl) Stop() error {
|
||||
dns.stopLock.Lock()
|
||||
|
@ -266,6 +256,17 @@ func (dns *dnsControl) Run() {
|
|||
<-dns.stopCh
|
||||
}
|
||||
|
||||
// HasSynced calls on all controllers.
|
||||
func (dns *dnsControl) HasSynced() bool {
|
||||
a := dns.svcController.HasSynced()
|
||||
b := dns.epController.HasSynced()
|
||||
c := true
|
||||
if dns.podController != nil {
|
||||
c = dns.podController.HasSynced()
|
||||
}
|
||||
return a && b && c
|
||||
}
|
||||
|
||||
func (dns *dnsControl) ServiceList() (svcs []*api.Service) {
|
||||
os := dns.svcLister.List()
|
||||
for _, o := range os {
|
||||
|
|
|
@ -191,6 +191,7 @@ func TestServeDNS(t *testing.T) {
|
|||
|
||||
type APIConnServeTest struct{}
|
||||
|
||||
func (APIConnServeTest) HasSynced() bool { return true }
|
||||
func (APIConnServeTest) Run() { return }
|
||||
func (APIConnServeTest) Stop() error { return nil }
|
||||
func (APIConnServeTest) EpIndexReverse(string) []*api.Endpoints { return nil }
|
||||
|
|
|
@ -338,10 +338,13 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
|
|||
)
|
||||
if wildcard(r.service) || wildcard(r.namespace) {
|
||||
serviceList = k.APIConn.ServiceList()
|
||||
endpointsList = k.APIConn.EndpointsList()
|
||||
} else {
|
||||
idx = r.service + "." + r.namespace
|
||||
serviceList = k.APIConn.SvcIndex(idx)
|
||||
endpointsList = k.APIConn.EpIndex(idx)
|
||||
}
|
||||
|
||||
for _, svc := range serviceList {
|
||||
|
||||
if !(match(r.namespace, svc.Namespace) && match(r.service, svc.Name)) {
|
||||
|
@ -356,12 +359,6 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
|
|||
|
||||
// Endpoint query or headless service
|
||||
if svc.Spec.ClusterIP == api.ClusterIPNone || r.endpoint != "" {
|
||||
if wildcard(r.service) || wildcard(r.namespace) {
|
||||
endpointsList = k.APIConn.EndpointsList()
|
||||
} else {
|
||||
idx = r.service + "." + r.namespace
|
||||
endpointsList = k.APIConn.EpIndex(idx)
|
||||
}
|
||||
for _, ep := range endpointsList {
|
||||
if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace {
|
||||
continue
|
||||
|
|
|
@ -51,6 +51,7 @@ func TestEndpointHostname(t *testing.T) {
|
|||
|
||||
type APIConnServiceTest struct{}
|
||||
|
||||
func (APIConnServiceTest) HasSynced() bool { return true }
|
||||
func (APIConnServiceTest) Run() { return }
|
||||
func (APIConnServiceTest) Stop() error { return nil }
|
||||
func (APIConnServiceTest) PodIndex(string) []*api.Pod { return nil }
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
type APIConnTest struct{}
|
||||
|
||||
func (APIConnTest) HasSynced() bool { return true }
|
||||
func (APIConnTest) Run() { return }
|
||||
func (APIConnTest) Stop() error { return nil }
|
||||
func (APIConnTest) PodIndex(string) []*api.Pod { return nil }
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
|
||||
type APIConnReverseTest struct{}
|
||||
|
||||
func (APIConnReverseTest) HasSynced() bool { return true }
|
||||
func (APIConnReverseTest) Run() { return }
|
||||
func (APIConnReverseTest) Stop() error { return nil }
|
||||
func (APIConnReverseTest) PodIndex(string) []*api.Pod { return nil }
|
||||
|
|
|
@ -39,8 +39,14 @@ func setup(c *caddy.Controller) error {
|
|||
c.OnStartup(func() error {
|
||||
go kubernetes.APIConn.Run()
|
||||
if kubernetes.APIProxy != nil {
|
||||
go kubernetes.APIProxy.Run()
|
||||
kubernetes.APIProxy.Run()
|
||||
}
|
||||
synced := false
|
||||
for synced == false {
|
||||
synced = kubernetes.APIConn.HasSynced()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
|
|
|
@ -4,7 +4,6 @@ package test
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coredns/coredns/plugin/test"
|
||||
|
||||
|
@ -36,9 +35,6 @@ func TestKubernetesAPIFallthrough(t *testing.T) {
|
|||
}
|
||||
defer server.Stop()
|
||||
|
||||
// Work-around for timing condition that results in no-data being returned in test environment.
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
for _, tc := range tests {
|
||||
|
||||
c := new(dns.Client)
|
||||
|
|
|
@ -4,7 +4,6 @@ package test
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coredns/coredns/plugin/test"
|
||||
|
||||
|
@ -42,9 +41,6 @@ func TestKubernetesNSExposed(t *testing.T) {
|
|||
}
|
||||
defer server.Stop()
|
||||
|
||||
// Work-around for timing condition that results in no-data being returned in test environment.
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
for _, tc := range dnsTestCasesAllNSExposed {
|
||||
|
||||
c := new(dns.Client)
|
||||
|
|
|
@ -4,7 +4,6 @@ package test
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coredns/coredns/plugin/test"
|
||||
|
||||
|
@ -30,11 +29,11 @@ var dnsTestCasesPodsInsecure = []test.Case{
|
|||
|
||||
func TestKubernetesPodsInsecure(t *testing.T) {
|
||||
corefile := `.:0 {
|
||||
kubernetes cluster.local 0.0.10.in-addr.arpa {
|
||||
endpoint http://localhost:8080
|
||||
namespaces test-1
|
||||
pods insecure
|
||||
}
|
||||
kubernetes cluster.local 0.0.10.in-addr.arpa {
|
||||
endpoint http://localhost:8080
|
||||
namespaces test-1
|
||||
pods insecure
|
||||
}
|
||||
`
|
||||
|
||||
server, udp, _, err := CoreDNSServerAndPorts(corefile)
|
||||
|
@ -43,9 +42,6 @@ func TestKubernetesPodsInsecure(t *testing.T) {
|
|||
}
|
||||
defer server.Stop()
|
||||
|
||||
// Work-around for timing condition that results in no-data being returned in test environment.
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
for _, tc := range dnsTestCasesPodsInsecure {
|
||||
|
||||
c := new(dns.Client)
|
||||
|
@ -92,9 +88,6 @@ func TestKubernetesPodsVerified(t *testing.T) {
|
|||
}
|
||||
defer server.Stop()
|
||||
|
||||
// Work-around for timing condition that results in no-data being returned in test environment.
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
for _, tc := range dnsTestCasesPodsVerified {
|
||||
|
||||
c := new(dns.Client)
|
||||
|
|
|
@ -279,8 +279,7 @@ func doIntegrationTests(t *testing.T, corefile string, testCases []test.Case) {
|
|||
}
|
||||
defer server.Stop()
|
||||
|
||||
// Work-around for timing condition that results in no-data being returned in test environment.
|
||||
time.Sleep(3 * time.Second)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
for _, tc := range testCases {
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue