Check interface state (up/down) on initialization step #10
6 changed files with 78 additions and 15 deletions
|
@ -3,6 +3,7 @@ package multinet
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
@ -18,6 +19,8 @@ const (
|
||||||
BalancerTypeRoundRobin BalancerType = "roundrobin"
|
BalancerTypeRoundRobin BalancerType = "roundrobin"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errNoSuitableNodeFound = errors.New("no suitale node found")
|
||||||
|
|
||||||
type balancer interface {
|
type balancer interface {
|
||||||
DialContext(ctx context.Context, s *Subnet, network, address string) (net.Conn, error)
|
DialContext(ctx context.Context, s *Subnet, network, address string) (net.Conn, error)
|
||||||
}
|
}
|
||||||
|
@ -39,7 +42,7 @@ func (r *roundRobin) DialContext(ctx context.Context, s *Subnet, network, addres
|
||||||
dd.LocalAddr = ii.LocalAddr
|
dd.LocalAddr = ii.LocalAddr
|
||||||
return r.d.dialContext(&dd, ctx, network, address)
|
return r.d.dialContext(&dd, ctx, network, address)
|
||||||
}
|
}
|
||||||
return nil, errors.New("(*roundRobin).DialContext: no suitale node found")
|
return nil, fmt.Errorf("(*roundRobin).DialContext: %w", errNoSuitableNodeFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
type firstEnabled struct {
|
type firstEnabled struct {
|
||||||
|
@ -57,5 +60,5 @@ func (r *firstEnabled) DialContext(ctx context.Context, s *Subnet, network, addr
|
||||||
dd.LocalAddr = ii.LocalAddr
|
dd.LocalAddr = ii.LocalAddr
|
||||||
return r.d.dialContext(&dd, ctx, network, address)
|
return r.d.dialContext(&dd, ctx, network, address)
|
||||||
}
|
}
|
||||||
return nil, errors.New("(*firstEnabled).DialContext: no suitale node found")
|
return nil, fmt.Errorf("(*firstEnabled).DialContext: %w", errNoSuitableNodeFound)
|
||||||
}
|
}
|
||||||
|
|
11
dialer.go
11
dialer.go
|
@ -23,7 +23,7 @@ type Dialer interface {
|
||||||
// Multidialer is like Dialer, but supports link state updates.
|
// Multidialer is like Dialer, but supports link state updates.
|
||||||
type Multidialer interface {
|
type Multidialer interface {
|
||||||
Dialer
|
Dialer
|
||||||
UpdateInterface(name string, addr netip.Addr, status bool) error
|
UpdateInterface(name string, addr netip.Addr, status bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -151,6 +151,7 @@ func NewDialer(c Config) (Multidialer, error) {
|
||||||
type iface struct {
|
type iface struct {
|
||||||
name string
|
name string
|
||||||
addrs []netip.Prefix
|
addrs []netip.Prefix
|
||||||
|
down bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func processIface(info Interface) (iface, error) {
|
func processIface(info Interface) (iface, error) {
|
||||||
|
@ -168,7 +169,7 @@ func processIface(info Interface) (iface, error) {
|
||||||
|
|
||||||
addrs = append(addrs, p)
|
addrs = append(addrs, p)
|
||||||
}
|
}
|
||||||
return iface{name: info.Name(), addrs: addrs}, nil
|
return iface{name: info.Name(), addrs: addrs, down: info.Down()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func processSubnet(subnet string, sources []iface) (Subnet, error) {
|
func processSubnet(subnet string, sources []iface) (Subnet, error) {
|
||||||
|
@ -185,6 +186,7 @@ func processSubnet(subnet string, sources []iface) (Subnet, error) {
|
||||||
ifs = append(ifs, Source{
|
ifs = append(ifs, Source{
|
||||||
Name: source.name,
|
Name: source.name,
|
||||||
LocalAddr: &net.TCPAddr{IP: net.IP(src.AsSlice())},
|
LocalAddr: &net.TCPAddr{IP: net.IP(src.AsSlice())},
|
||||||
|
Down: source.down,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -207,7 +209,7 @@ func processSubnet(subnet string, sources []iface) (Subnet, error) {
|
||||||
// Hostnames for address are currently not supported.
|
// Hostnames for address are currently not supported.
|
||||||
func (d *dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
func (d *dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
||||||
addr, err := netip.ParseAddrPort(address)
|
addr, err := netip.ParseAddrPort(address)
|
||||||
if err != nil { //try resolve as hostname
|
if err != nil { // try resolve as hostname
|
||||||
return d.dialContextHostname(ctx, network, address)
|
return d.dialContextHostname(ctx, network, address)
|
||||||
}
|
}
|
||||||
return d.dialAddr(ctx, network, address, addr)
|
return d.dialAddr(ctx, network, address, addr)
|
||||||
|
@ -389,7 +391,7 @@ func (d *dialer) dialContext(nd *net.Dialer, ctx context.Context, network, addre
|
||||||
|
|
||||||
// UpdateInterface implements the Multidialer interface.
|
// UpdateInterface implements the Multidialer interface.
|
||||||
// Updating address on a specific interface is currently not supported.
|
// Updating address on a specific interface is currently not supported.
|
||||||
func (d *dialer) UpdateInterface(iface string, addr netip.Addr, up bool) error {
|
func (d *dialer) UpdateInterface(iface string, addr netip.Addr, up bool) {
|
||||||
d.mtx.Lock()
|
d.mtx.Lock()
|
||||||
defer d.mtx.Unlock()
|
defer d.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -408,7 +410,6 @@ func (d *dialer) UpdateInterface(iface string, addr netip.Addr, up bool) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// splitByType divides an address list into two categories:
|
// splitByType divides an address list into two categories:
|
||||||
|
|
|
@ -114,10 +114,12 @@ func testInterfacesV6() ([]Interface, error) {
|
||||||
type testInterface struct {
|
type testInterface struct {
|
||||||
name string
|
name string
|
||||||
addrs []net.Addr
|
addrs []net.Addr
|
||||||
|
down bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *testInterface) Name() string { return i.name }
|
func (i *testInterface) Name() string { return i.name }
|
||||||
func (i *testInterface) Addrs() ([]net.Addr, error) { return i.addrs, nil }
|
func (i *testInterface) Addrs() ([]net.Addr, error) { return i.addrs, nil }
|
||||||
|
func (i *testInterface) Down() bool { return i.down }
|
||||||
|
|
||||||
type testAddr struct {
|
type testAddr struct {
|
||||||
network string
|
network string
|
||||||
|
|
58
dialer_test.go
Normal file
58
dialer_test.go
Normal file
|
@ -0,0 +1,58 @@
|
||||||
|
package multinet
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInterfacesDown(t *testing.T) {
|
||||||
|
t.Run("noop balancer", func(t *testing.T) {
|
||||||
|
d, err := NewDialer(Config{
|
||||||
|
Subnets: []string{"10.11.12.0/24"},
|
||||||
|
InterfaceSource: testDownInterfaces,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
conn, err := d.DialContext(context.Background(), "tcp", "10.11.12.254:8080")
|
||||||
|
require.ErrorIs(t, err, errNoSuitableNodeFound)
|
||||||
|
require.Nil(t, conn)
|
||||||
|
})
|
||||||
|
t.Run("round robin balancer", func(t *testing.T) {
|
||||||
|
d, err := NewDialer(Config{
|
||||||
|
Subnets: []string{"10.11.12.0/24"},
|
||||||
|
InterfaceSource: testDownInterfaces,
|
||||||
|
Balancer: BalancerTypeRoundRobin,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
conn, err := d.DialContext(context.Background(), "tcp", "10.11.12.254:8080")
|
||||||
|
require.ErrorIs(t, err, errNoSuitableNodeFound)
|
||||||
|
require.Nil(t, conn)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testDownInterfaces() ([]Interface, error) {
|
||||||
|
return []Interface{
|
||||||
|
&testInterface{
|
||||||
|
name: "data1",
|
||||||
|
addrs: []net.Addr{
|
||||||
|
&testAddr{
|
||||||
|
network: "tcp",
|
||||||
|
str: "10.11.12.101/24",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
down: true,
|
||||||
|
},
|
||||||
|
&testInterface{
|
||||||
|
name: "data2",
|
||||||
|
addrs: []net.Addr{
|
||||||
|
&testAddr{
|
||||||
|
network: "tcp",
|
||||||
|
str: "10.11.12.102/24",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
down: true,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
10
health.go
10
health.go
|
@ -5,7 +5,6 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/vishvananda/netlink"
|
"github.com/vishvananda/netlink"
|
||||||
"github.com/vishvananda/netns"
|
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -27,11 +26,6 @@ func NewNetlinkWatcher(d Multidialer) *NetlinkWatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *NetlinkWatcher) Start() error {
|
func (w *NetlinkWatcher) Start() error {
|
||||||
ns, err := netns.Get()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := netlink.LinkSubscribe(w.linkUpdates, w.done); err != nil {
|
if err := netlink.LinkSubscribe(w.linkUpdates, w.done); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -41,11 +35,11 @@ func (w *NetlinkWatcher) Start() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
w.wg.Add(1)
|
w.wg.Add(1)
|
||||||
go w.watch(ns)
|
go w.watch()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *NetlinkWatcher) watch(ns netns.NsHandle) {
|
func (w *NetlinkWatcher) watch() {
|
||||||
defer w.wg.Done()
|
defer w.wg.Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import "net"
|
||||||
type Interface interface {
|
type Interface interface {
|
||||||
Name() string
|
Name() string
|
||||||
Addrs() ([]net.Addr, error)
|
Addrs() ([]net.Addr, error)
|
||||||
|
Down() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type netInterface struct {
|
type netInterface struct {
|
||||||
|
@ -20,6 +21,10 @@ func (i *netInterface) Addrs() ([]net.Addr, error) {
|
||||||
return i.iface.Addrs()
|
return i.iface.Addrs()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *netInterface) Down() bool {
|
||||||
|
return i.iface.Flags&net.FlagUp == 0
|
||||||
|
}
|
||||||
|
|
||||||
func systemInterfaces() ([]Interface, error) {
|
func systemInterfaces() ([]Interface, error) {
|
||||||
ifaces, err := net.Interfaces()
|
ifaces, err := net.Interfaces()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Add table
Reference in a new issue