Check interface state (up/down) on initialization step #10

Merged
fyrchik merged 4 commits from dstepanov-yadro/multinet:fix/interface_state_check into master 2024-11-02 14:21:56 +00:00
6 changed files with 78 additions and 15 deletions

View file

@ -3,6 +3,7 @@ package multinet
import (
"context"
"errors"
"fmt"
"net"
"sync/atomic"
)
@ -18,6 +19,8 @@ const (
BalancerTypeRoundRobin BalancerType = "roundrobin"
)
var errNoSuitableNodeFound = errors.New("no suitale node found")
type balancer interface {
DialContext(ctx context.Context, s *Subnet, network, address string) (net.Conn, error)
}
@ -39,7 +42,7 @@ func (r *roundRobin) DialContext(ctx context.Context, s *Subnet, network, addres
dd.LocalAddr = ii.LocalAddr
return r.d.dialContext(&dd, ctx, network, address)
}
return nil, errors.New("(*roundRobin).DialContext: no suitale node found")
return nil, fmt.Errorf("(*roundRobin).DialContext: %w", errNoSuitableNodeFound)
}
type firstEnabled struct {
@ -57,5 +60,5 @@ func (r *firstEnabled) DialContext(ctx context.Context, s *Subnet, network, addr
dd.LocalAddr = ii.LocalAddr
return r.d.dialContext(&dd, ctx, network, address)
}
return nil, errors.New("(*firstEnabled).DialContext: no suitale node found")
return nil, fmt.Errorf("(*firstEnabled).DialContext: %w", errNoSuitableNodeFound)
}

View file

@ -23,7 +23,7 @@ type Dialer interface {
// Multidialer is like Dialer, but supports link state updates.
type Multidialer interface {
Dialer
UpdateInterface(name string, addr netip.Addr, status bool) error
UpdateInterface(name string, addr netip.Addr, status bool)
}
var (
@ -151,6 +151,7 @@ func NewDialer(c Config) (Multidialer, error) {
type iface struct {
name string
addrs []netip.Prefix
down bool
}
func processIface(info Interface) (iface, error) {
@ -168,7 +169,7 @@ func processIface(info Interface) (iface, error) {
addrs = append(addrs, p)
}
return iface{name: info.Name(), addrs: addrs}, nil
return iface{name: info.Name(), addrs: addrs, down: info.Down()}, nil
}
func processSubnet(subnet string, sources []iface) (Subnet, error) {
@ -185,6 +186,7 @@ func processSubnet(subnet string, sources []iface) (Subnet, error) {
ifs = append(ifs, Source{
Name: source.name,
LocalAddr: &net.TCPAddr{IP: net.IP(src.AsSlice())},
Down: source.down,
})
}
}
@ -207,7 +209,7 @@ func processSubnet(subnet string, sources []iface) (Subnet, error) {
// Hostnames for address are currently not supported.
func (d *dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
addr, err := netip.ParseAddrPort(address)
if err != nil { //try resolve as hostname
if err != nil { // try resolve as hostname
return d.dialContextHostname(ctx, network, address)
}
return d.dialAddr(ctx, network, address, addr)
@ -389,7 +391,7 @@ func (d *dialer) dialContext(nd *net.Dialer, ctx context.Context, network, addre
// UpdateInterface implements the Multidialer interface.
// Updating address on a specific interface is currently not supported.
func (d *dialer) UpdateInterface(iface string, addr netip.Addr, up bool) error {
func (d *dialer) UpdateInterface(iface string, addr netip.Addr, up bool) {
d.mtx.Lock()
defer d.mtx.Unlock()
@ -408,7 +410,6 @@ func (d *dialer) UpdateInterface(iface string, addr netip.Addr, up bool) error {
}
}
}
return nil
}
// splitByType divides an address list into two categories:

View file

@ -114,10 +114,12 @@ func testInterfacesV6() ([]Interface, error) {
type testInterface struct {
name string
addrs []net.Addr
down bool
}
func (i *testInterface) Name() string { return i.name }
func (i *testInterface) Addrs() ([]net.Addr, error) { return i.addrs, nil }
func (i *testInterface) Down() bool { return i.down }
type testAddr struct {
network string

58
dialer_test.go Normal file
View file

@ -0,0 +1,58 @@
package multinet
import (
"context"
"net"
"testing"
"github.com/stretchr/testify/require"
)
func TestInterfacesDown(t *testing.T) {
t.Run("noop balancer", func(t *testing.T) {
d, err := NewDialer(Config{
Subnets: []string{"10.11.12.0/24"},
InterfaceSource: testDownInterfaces,
})
require.NoError(t, err)
conn, err := d.DialContext(context.Background(), "tcp", "10.11.12.254:8080")
require.ErrorIs(t, err, errNoSuitableNodeFound)
require.Nil(t, conn)
})
t.Run("round robin balancer", func(t *testing.T) {
d, err := NewDialer(Config{
Subnets: []string{"10.11.12.0/24"},
InterfaceSource: testDownInterfaces,
Balancer: BalancerTypeRoundRobin,
})
require.NoError(t, err)
conn, err := d.DialContext(context.Background(), "tcp", "10.11.12.254:8080")
require.ErrorIs(t, err, errNoSuitableNodeFound)
require.Nil(t, conn)
})
}
func testDownInterfaces() ([]Interface, error) {
return []Interface{
&testInterface{
name: "data1",
addrs: []net.Addr{
&testAddr{
network: "tcp",
str: "10.11.12.101/24",
},
},
down: true,
},
&testInterface{
name: "data2",
addrs: []net.Addr{
&testAddr{
network: "tcp",
str: "10.11.12.102/24",
},
},
down: true,
},
}, nil
}

View file

@ -5,7 +5,6 @@ import (
"sync"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
"golang.org/x/sys/unix"
)
@ -27,11 +26,6 @@ func NewNetlinkWatcher(d Multidialer) *NetlinkWatcher {
}
func (w *NetlinkWatcher) Start() error {
ns, err := netns.Get()
if err != nil {
return err
}
if err := netlink.LinkSubscribe(w.linkUpdates, w.done); err != nil {
return err
}
@ -41,11 +35,11 @@ func (w *NetlinkWatcher) Start() error {
}
w.wg.Add(1)
go w.watch(ns)
go w.watch()
return nil
}
func (w *NetlinkWatcher) watch(ns netns.NsHandle) {
func (w *NetlinkWatcher) watch() {
defer w.wg.Done()
for {

View file

@ -6,6 +6,7 @@ import "net"
type Interface interface {
Name() string
Addrs() ([]net.Addr, error)
Down() bool
}
type netInterface struct {
@ -20,6 +21,10 @@ func (i *netInterface) Addrs() ([]net.Addr, error) {
return i.iface.Addrs()
}
func (i *netInterface) Down() bool {
return i.iface.Flags&net.FlagUp == 0
}
func systemInterfaces() ([]Interface, error) {
ifaces, err := net.Interfaces()
if err != nil {