plugin/kubernetes: Remove Endpoint and EndpointSlice v1beta Support (#6147)
* remove endpoint and endpointslicev1beta watch support Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * adjust readme Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * informer object changes Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * remove unused funcs Signed-off-by: Chris O'Haver <cohaver@infoblox.com> --------- Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
This commit is contained in:
parent
6e6fc650ce
commit
06cd843918
4 changed files with 1 additions and 276 deletions
|
@ -114,9 +114,7 @@ that has not yet been synchronized.
|
||||||
|
|
||||||
## Monitoring Kubernetes Endpoints
|
## Monitoring Kubernetes Endpoints
|
||||||
|
|
||||||
By default the *kubernetes* plugin watches Endpoints via the `discovery.EndpointSlices` API. However the
|
The *kubernetes* plugin watches Endpoints via the `discovery.EndpointSlices` API.
|
||||||
`api.Endpoints` API is used instead if the Kubernetes version does not support the `EndpointSliceProxying`
|
|
||||||
feature gate by default (i.e. Kubernetes version < 1.19).
|
|
||||||
|
|
||||||
## Ready
|
## Ready
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
|
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1"
|
discovery "k8s.io/api/discovery/v1"
|
||||||
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
|
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
@ -66,10 +65,6 @@ type dnsControl struct {
|
||||||
selector labels.Selector
|
selector labels.Selector
|
||||||
namespaceSelector labels.Selector
|
namespaceSelector labels.Selector
|
||||||
|
|
||||||
// epLock is used to lock reads of epLister and epController while they are being replaced
|
|
||||||
// with the api.Endpoints Lister/Controller on k8s systems that don't use discovery.EndpointSlices
|
|
||||||
epLock sync.RWMutex
|
|
||||||
|
|
||||||
svcController cache.Controller
|
svcController cache.Controller
|
||||||
podController cache.Controller
|
podController cache.Controller
|
||||||
epController cache.Controller
|
epController cache.Controller
|
||||||
|
@ -153,12 +148,10 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
|
||||||
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
|
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
|
||||||
object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
|
object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
|
||||||
)
|
)
|
||||||
dns.epLock.Lock()
|
|
||||||
dns.epLister = epLister
|
dns.epLister = epLister
|
||||||
if opts.initEndpointsCache {
|
if opts.initEndpointsCache {
|
||||||
dns.epController = epController
|
dns.epController = epController
|
||||||
}
|
}
|
||||||
dns.epLock.Unlock()
|
|
||||||
|
|
||||||
dns.nsLister, dns.nsController = object.NewIndexerInformer(
|
dns.nsLister, dns.nsController = object.NewIndexerInformer(
|
||||||
&cache.ListWatch{
|
&cache.ListWatch{
|
||||||
|
@ -174,42 +167,6 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
|
||||||
return &dns
|
return &dns
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchEndpoints will set the endpoint Lister and Controller to watch object.Endpoints
|
|
||||||
// instead of the default discovery.EndpointSlice. This is used in older k8s clusters where
|
|
||||||
// discovery.EndpointSlice is not fully supported.
|
|
||||||
// This can be removed when all supported k8s versions fully support EndpointSlice.
|
|
||||||
func (dns *dnsControl) WatchEndpoints(ctx context.Context) {
|
|
||||||
dns.epLock.Lock()
|
|
||||||
dns.epLister, dns.epController = object.NewIndexerInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
|
|
||||||
WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
|
|
||||||
},
|
|
||||||
&api.Endpoints{},
|
|
||||||
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
|
|
||||||
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
|
|
||||||
object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder()),
|
|
||||||
)
|
|
||||||
dns.epLock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// WatchEndpointSliceV1beta1 will set the endpoint Lister and Controller to watch v1beta1
|
|
||||||
// instead of the default v1.
|
|
||||||
func (dns *dnsControl) WatchEndpointSliceV1beta1(ctx context.Context) {
|
|
||||||
dns.epLock.Lock()
|
|
||||||
dns.epLister, dns.epController = object.NewIndexerInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: endpointSliceListFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector),
|
|
||||||
WatchFunc: endpointSliceWatchFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector),
|
|
||||||
},
|
|
||||||
&discoveryV1beta1.EndpointSlice{},
|
|
||||||
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
|
|
||||||
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
|
|
||||||
object.DefaultProcessor(object.EndpointSliceV1beta1ToEndpoints, dns.EndpointSliceLatencyRecorder()),
|
|
||||||
)
|
|
||||||
dns.epLock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
|
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
|
||||||
return &object.EndpointLatencyRecorder{
|
return &object.EndpointLatencyRecorder{
|
||||||
ServiceFunc: func(o meta.Object) []*object.Service {
|
ServiceFunc: func(o meta.Object) []*object.Service {
|
||||||
|
@ -298,14 +255,6 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label
|
||||||
return c.CoreV1().Pods(ns).List(ctx, opts)
|
return c.CoreV1().Pods(ns).List(ctx, opts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func endpointSliceListFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
|
|
||||||
return func(opts meta.ListOptions) (runtime.Object, error) {
|
|
||||||
if s != nil {
|
|
||||||
opts.LabelSelector = s.String()
|
|
||||||
}
|
|
||||||
return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
|
func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
|
||||||
return func(opts meta.ListOptions) (runtime.Object, error) {
|
return func(opts meta.ListOptions) (runtime.Object, error) {
|
||||||
|
@ -316,15 +265,6 @@ func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns strin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
|
|
||||||
return func(opts meta.ListOptions) (runtime.Object, error) {
|
|
||||||
if s != nil {
|
|
||||||
opts.LabelSelector = s.String()
|
|
||||||
}
|
|
||||||
return c.CoreV1().Endpoints(ns).List(ctx, opts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
|
func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
|
||||||
return func(opts meta.ListOptions) (runtime.Object, error) {
|
return func(opts meta.ListOptions) (runtime.Object, error) {
|
||||||
if s != nil {
|
if s != nil {
|
||||||
|
@ -356,15 +296,6 @@ func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func endpointSliceWatchFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
|
||||||
return func(options meta.ListOptions) (watch.Interface, error) {
|
|
||||||
if s != nil {
|
|
||||||
options.LabelSelector = s.String()
|
|
||||||
}
|
|
||||||
return c.DiscoveryV1beta1().EndpointSlices(ns).Watch(ctx, options)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
||||||
return func(options meta.ListOptions) (watch.Interface, error) {
|
return func(options meta.ListOptions) (watch.Interface, error) {
|
||||||
if s != nil {
|
if s != nil {
|
||||||
|
@ -374,15 +305,6 @@ func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns stri
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
|
||||||
return func(options meta.ListOptions) (watch.Interface, error) {
|
|
||||||
if s != nil {
|
|
||||||
options.LabelSelector = s.String()
|
|
||||||
}
|
|
||||||
return c.CoreV1().Endpoints(ns).Watch(ctx, options)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
|
||||||
return func(options meta.ListOptions) (watch.Interface, error) {
|
return func(options meta.ListOptions) (watch.Interface, error) {
|
||||||
if s != nil {
|
if s != nil {
|
||||||
|
@ -413,9 +335,7 @@ func (dns *dnsControl) Run() {
|
||||||
go dns.svcController.Run(dns.stopCh)
|
go dns.svcController.Run(dns.stopCh)
|
||||||
if dns.epController != nil {
|
if dns.epController != nil {
|
||||||
go func() {
|
go func() {
|
||||||
dns.epLock.RLock()
|
|
||||||
dns.epController.Run(dns.stopCh)
|
dns.epController.Run(dns.stopCh)
|
||||||
dns.epLock.RUnlock()
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
if dns.podController != nil {
|
if dns.podController != nil {
|
||||||
|
@ -430,9 +350,7 @@ func (dns *dnsControl) HasSynced() bool {
|
||||||
a := dns.svcController.HasSynced()
|
a := dns.svcController.HasSynced()
|
||||||
b := true
|
b := true
|
||||||
if dns.epController != nil {
|
if dns.epController != nil {
|
||||||
dns.epLock.RLock()
|
|
||||||
b = dns.epController.HasSynced()
|
b = dns.epController.HasSynced()
|
||||||
dns.epLock.RUnlock()
|
|
||||||
}
|
}
|
||||||
c := true
|
c := true
|
||||||
if dns.podController != nil {
|
if dns.podController != nil {
|
||||||
|
@ -455,8 +373,6 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
|
func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
|
||||||
dns.epLock.RLock()
|
|
||||||
defer dns.epLock.RUnlock()
|
|
||||||
os := dns.epLister.List()
|
os := dns.epLister.List()
|
||||||
for _, o := range os {
|
for _, o := range os {
|
||||||
ep, ok := o.(*object.Endpoints)
|
ep, ok := o.(*object.Endpoints)
|
||||||
|
@ -531,8 +447,6 @@ func (dns *dnsControl) SvcExtIndexReverse(ip string) (svcs []*object.Service) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
|
func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
|
||||||
dns.epLock.RLock()
|
|
||||||
defer dns.epLock.RUnlock()
|
|
||||||
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
|
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -548,8 +462,6 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
|
func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
|
||||||
dns.epLock.RLock()
|
|
||||||
defer dns.epLock.RUnlock()
|
|
||||||
os, err := dns.epLister.ByIndex(epIPIndex, ip)
|
os, err := dns.epLister.ByIndex(epIPIndex, ip)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -19,9 +18,6 @@ import (
|
||||||
|
|
||||||
"github.com/miekg/dns"
|
"github.com/miekg/dns"
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1"
|
|
||||||
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
|
|
||||||
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
@ -262,22 +258,8 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o
|
||||||
|
|
||||||
k.APIConn = newdnsController(ctx, kubeClient, k.opts)
|
k.APIConn = newdnsController(ctx, kubeClient, k.opts)
|
||||||
|
|
||||||
initEndpointWatch := k.opts.initEndpointsCache
|
|
||||||
|
|
||||||
onStart = func() error {
|
onStart = func() error {
|
||||||
go func() {
|
go func() {
|
||||||
if initEndpointWatch {
|
|
||||||
// Revert to watching Endpoints for incompatible K8s.
|
|
||||||
// This can be removed when all supported k8s versions support endpointslices.
|
|
||||||
ok, v := k.endpointSliceSupported(kubeClient)
|
|
||||||
if !ok {
|
|
||||||
k.APIConn.(*dnsControl).WatchEndpoints(ctx)
|
|
||||||
}
|
|
||||||
// Revert to EndpointSlice v1beta1 if v1 is not supported
|
|
||||||
if ok && v == discoveryV1beta1.SchemeGroupVersion.String() {
|
|
||||||
k.APIConn.(*dnsControl).WatchEndpointSliceV1beta1(ctx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
k.APIConn.Run()
|
k.APIConn.Run()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -311,68 +293,6 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o
|
||||||
return onStart, onShut, err
|
return onStart, onShut, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints)
|
|
||||||
// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices
|
|
||||||
// should be watched, and false when endpoints should be watched.
|
|
||||||
// If the API supports discovery, and the server versions >= 1.19, true is returned.
|
|
||||||
// Also returned is the discovery version supported: "v1" if v1 is supported, and v1beta1 if v1beta1 is supported and
|
|
||||||
// v1 is not supported.
|
|
||||||
// This function should be removed, when all supported versions of k8s support v1.
|
|
||||||
func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) (bool, string) {
|
|
||||||
ticker := time.NewTicker(100 * time.Millisecond)
|
|
||||||
defer ticker.Stop()
|
|
||||||
logTicker := time.NewTicker(10 * time.Second)
|
|
||||||
defer logTicker.Stop()
|
|
||||||
var connErr error
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-logTicker.C:
|
|
||||||
if connErr == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Warningf("Kubernetes API connection failure: %v", connErr)
|
|
||||||
case <-ticker.C:
|
|
||||||
sv, err := kubeClient.ServerVersion()
|
|
||||||
if err != nil {
|
|
||||||
connErr = err
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled
|
|
||||||
// by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19.
|
|
||||||
// DNS results should be built from the same source data that the proxy uses. This decision assumes
|
|
||||||
// k8s EndpointSliceProxying feature gate is at the default (i.e. only enabled for k8s >= 1.19).
|
|
||||||
major, _ := strconv.Atoi(sv.Major)
|
|
||||||
minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+"))
|
|
||||||
if major <= 1 && minor <= 18 {
|
|
||||||
log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
|
|
||||||
return false, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enable use of endpoint slices if the API supports the discovery api
|
|
||||||
_, err = kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String())
|
|
||||||
if err == nil {
|
|
||||||
return true, discovery.SchemeGroupVersion.String()
|
|
||||||
} else if !kerrors.IsNotFound(err) {
|
|
||||||
connErr = err
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = kubeClient.Discovery().ServerResourcesForGroupVersion(discoveryV1beta1.SchemeGroupVersion.String())
|
|
||||||
if err == nil {
|
|
||||||
return true, discoveryV1beta1.SchemeGroupVersion.String()
|
|
||||||
} else if !kerrors.IsNotFound(err) {
|
|
||||||
connErr = err
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Disable use of endpoint slices in case that it is disabled in k8s versions 1.19 and newer.
|
|
||||||
log.Info("Endpointslices API disabled. Watching Endpoints instead.")
|
|
||||||
return false, ""
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Records looks up services in kubernetes.
|
// Records looks up services in kubernetes.
|
||||||
func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error) {
|
func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error) {
|
||||||
r, e := parseRequest(state.Name(), state.Zone)
|
r, e := parseRequest(state.Name(), state.Zone)
|
||||||
|
|
|
@ -3,9 +3,7 @@ package object
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
api "k8s.io/api/core/v1"
|
|
||||||
discovery "k8s.io/api/discovery/v1"
|
discovery "k8s.io/api/discovery/v1"
|
||||||
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
|
|
||||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
)
|
)
|
||||||
|
@ -48,60 +46,6 @@ type EndpointPort struct {
|
||||||
// EndpointsKey returns a string using for the index.
|
// EndpointsKey returns a string using for the index.
|
||||||
func EndpointsKey(name, namespace string) string { return name + "." + namespace }
|
func EndpointsKey(name, namespace string) string { return name + "." + namespace }
|
||||||
|
|
||||||
// ToEndpoints converts an *api.Endpoints to a *Endpoints.
|
|
||||||
func ToEndpoints(obj meta.Object) (meta.Object, error) {
|
|
||||||
end, ok := obj.(*api.Endpoints)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("unexpected object %v", obj)
|
|
||||||
}
|
|
||||||
e := &Endpoints{
|
|
||||||
Version: end.GetResourceVersion(),
|
|
||||||
Name: end.GetName(),
|
|
||||||
Namespace: end.GetNamespace(),
|
|
||||||
Index: EndpointsKey(end.GetName(), end.GetNamespace()),
|
|
||||||
Subsets: make([]EndpointSubset, len(end.Subsets)),
|
|
||||||
}
|
|
||||||
for i, eps := range end.Subsets {
|
|
||||||
sub := EndpointSubset{
|
|
||||||
Addresses: make([]EndpointAddress, len(eps.Addresses)),
|
|
||||||
}
|
|
||||||
if len(eps.Ports) == 0 {
|
|
||||||
// Add sentinel if there are no ports.
|
|
||||||
sub.Ports = []EndpointPort{{Port: -1}}
|
|
||||||
} else {
|
|
||||||
sub.Ports = make([]EndpointPort, len(eps.Ports))
|
|
||||||
}
|
|
||||||
|
|
||||||
for j, a := range eps.Addresses {
|
|
||||||
ea := EndpointAddress{IP: a.IP, Hostname: a.Hostname}
|
|
||||||
if a.NodeName != nil {
|
|
||||||
ea.NodeName = *a.NodeName
|
|
||||||
}
|
|
||||||
if a.TargetRef != nil {
|
|
||||||
ea.TargetRefName = a.TargetRef.Name
|
|
||||||
}
|
|
||||||
sub.Addresses[j] = ea
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, p := range eps.Ports {
|
|
||||||
ep := EndpointPort{Port: p.Port, Name: p.Name, Protocol: string(p.Protocol)}
|
|
||||||
sub.Ports[k] = ep
|
|
||||||
}
|
|
||||||
|
|
||||||
e.Subsets[i] = sub
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, eps := range end.Subsets {
|
|
||||||
for _, a := range eps.Addresses {
|
|
||||||
e.IndexIP = append(e.IndexIP, a.IP)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*end = api.Endpoints{}
|
|
||||||
|
|
||||||
return e, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// EndpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints.
|
// EndpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints.
|
||||||
func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) {
|
func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) {
|
||||||
ends, ok := obj.(*discovery.EndpointSlice)
|
ends, ok := obj.(*discovery.EndpointSlice)
|
||||||
|
@ -153,55 +97,6 @@ func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) {
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// EndpointSliceV1beta1ToEndpoints converts a v1beta1 *discovery.EndpointSlice to a *Endpoints.
|
|
||||||
func EndpointSliceV1beta1ToEndpoints(obj meta.Object) (meta.Object, error) {
|
|
||||||
ends, ok := obj.(*discoveryV1beta1.EndpointSlice)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("unexpected object %v", obj)
|
|
||||||
}
|
|
||||||
e := &Endpoints{
|
|
||||||
Version: ends.GetResourceVersion(),
|
|
||||||
Name: ends.GetName(),
|
|
||||||
Namespace: ends.GetNamespace(),
|
|
||||||
Index: EndpointsKey(ends.Labels[discovery.LabelServiceName], ends.GetNamespace()),
|
|
||||||
Subsets: make([]EndpointSubset, 1),
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(ends.Ports) == 0 {
|
|
||||||
// Add sentinel if there are no ports.
|
|
||||||
e.Subsets[0].Ports = []EndpointPort{{Port: -1}}
|
|
||||||
} else {
|
|
||||||
e.Subsets[0].Ports = make([]EndpointPort, len(ends.Ports))
|
|
||||||
for k, p := range ends.Ports {
|
|
||||||
ep := EndpointPort{Port: *p.Port, Name: *p.Name, Protocol: string(*p.Protocol)}
|
|
||||||
e.Subsets[0].Ports[k] = ep
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, end := range ends.Endpoints {
|
|
||||||
if !endpointsliceReady(end.Conditions.Ready) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, a := range end.Addresses {
|
|
||||||
ea := EndpointAddress{IP: a}
|
|
||||||
if end.Hostname != nil {
|
|
||||||
ea.Hostname = *end.Hostname
|
|
||||||
}
|
|
||||||
// ignore pod names that are too long to be a valid label
|
|
||||||
if end.TargetRef != nil && len(end.TargetRef.Name) < 64 {
|
|
||||||
ea.TargetRefName = end.TargetRef.Name
|
|
||||||
}
|
|
||||||
// EndpointSlice does not contain NodeName, leave blank
|
|
||||||
e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, ea)
|
|
||||||
e.IndexIP = append(e.IndexIP, a)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*ends = discoveryV1beta1.EndpointSlice{}
|
|
||||||
|
|
||||||
return e, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func endpointsliceReady(ready *bool) bool {
|
func endpointsliceReady(ready *bool) bool {
|
||||||
// Per API docs: a nil value indicates an unknown state. In most cases consumers
|
// Per API docs: a nil value indicates an unknown state. In most cases consumers
|
||||||
// should interpret this unknown state as ready.
|
// should interpret this unknown state as ready.
|
||||||
|
|
Loading…
Add table
Reference in a new issue