From e12255dd73dec62b5058093c72b14da6bec8a4c5 Mon Sep 17 00:00:00 2001 From: decentralisedkev <37423678+decentralisedkev@users.noreply.github.com> Date: Thu, 21 Mar 2019 23:18:02 +0000 Subject: [PATCH] [connmgr] Refactor Connmgr (#205) * [connmgr] - Refactor Connmgr - Remove un-needed async code - Add comment for Request --- pkg/connmgr/config.go | 25 ++++ pkg/connmgr/connmgr.go | 246 ++++++++++++++++++++++++++++++++++++ pkg/connmgr/connmgr_test.go | 107 ++++++++++++++++ pkg/connmgr/readme.md | 22 ++++ pkg/connmgr/request.go | 15 +++ 5 files changed, 415 insertions(+) create mode 100755 pkg/connmgr/config.go create mode 100755 pkg/connmgr/connmgr.go create mode 100755 pkg/connmgr/connmgr_test.go create mode 100755 pkg/connmgr/readme.md create mode 100755 pkg/connmgr/request.go diff --git a/pkg/connmgr/config.go b/pkg/connmgr/config.go new file mode 100755 index 000000000..66fada355 --- /dev/null +++ b/pkg/connmgr/config.go @@ -0,0 +1,25 @@ +package connmgr + +import ( + "net" +) + +// Config contains all methods which will be set by the caller to setup the connection manager. +type Config struct { + // GetAddress will return a single address for the connection manager to connect to + // This will be the source of addresses for the connection manager + GetAddress func() (string, error) + + // OnConnection is called by the connection manager when we successfully connect to a peer + // The caller should ideally inform the address manager that we have connected to this address in this function + OnConnection func(conn net.Conn, addr string) + + // OnAccept will take an established connection + OnAccept func(net.Conn) + + // AddressPort is the address port of the local node in the format "address:port" + AddressPort string + + // DialTimeout is the amount of time to wait, before we can disconnect a pending dialed connection + DialTimeout int +} diff --git a/pkg/connmgr/connmgr.go b/pkg/connmgr/connmgr.go new file mode 100755 index 000000000..a8e8442ac --- /dev/null +++ b/pkg/connmgr/connmgr.go @@ -0,0 +1,246 @@ +package connmgr + +import ( + "errors" + "fmt" + "net" + "net/http" + "time" +) + +var ( + // maxOutboundConn is the maximum number of active peers + // that the connection manager will try to have + maxOutboundConn = 10 + + // maxRetries is the maximum amount of successive retries that + // we can have before we stop dialing that peer + maxRetries = uint8(5) +) + +// Connmgr manages pending/active/failed cnnections +type Connmgr struct { + config Config + PendingList map[string]*Request + ConnectedList map[string]*Request + actionch chan func() +} + +//New creates a new connection manager +func New(cfg Config) *Connmgr { + cnnmgr := &Connmgr{ + cfg, + make(map[string]*Request), + make(map[string]*Request), + make(chan func(), 300), + } + + go func() { + + listener, err := net.Listen("tcp", cfg.AddressPort) + + if err != nil { + fmt.Println("Error connecting to outbound ", err) + } + + defer func() { + listener.Close() + }() + + for { + + conn, err := listener.Accept() + + if err != nil { + continue + } + go cfg.OnAccept(conn) + } + + }() + + return cnnmgr +} + +// NewRequest will make a new connection gets the address from address func in config +// Then dials it and assigns it to pending +func (c *Connmgr) NewRequest() error { + + // Fetch address + addr, err := c.config.GetAddress() + if err != nil { + return fmt.Errorf("error getting address " + err.Error()) + } + + r := &Request{ + Addr: addr, + } + return c.Connect(r) +} + +// Connect will dial the address in the Request +// Updating the request object depending on the outcome +func (c *Connmgr) Connect(r *Request) error { + + r.Retries++ + + conn, err := c.dial(r.Addr) + if err != nil { + c.failed(r) + return err + } + + r.Conn = conn + r.Inbound = true + + // r.Permanent is set by the address manager/caller. default is false + // The permanent connections will be the ones that are hardcoded, e.g seed3.ngd.network + // or are reliable. The connmgr will be more leniennt to permanent addresses as they have + // a track record or reputation of being reliable. + + return c.connected(r) +} + +//Disconnect will remove the request from the connected/pending list and close the connection +func (c *Connmgr) Disconnect(addr string) { + + var r *Request + + // fetch from connected list + r, ok := c.ConnectedList[addr] + if !ok { + // If not in connected, check pending + r, _ = c.PendingList[addr] + } + + c.disconnected(r) + +} + +// Dial is used to dial up connections given the addres and ip in the form address:port +func (c *Connmgr) dial(addr string) (net.Conn, error) { + dialTimeout := 1 * time.Second + conn, err := net.DialTimeout("tcp", addr, dialTimeout) + if err != nil { + if !isConnected() { + return nil, errors.New("Fatal Error: You do not seem to be connected to the internet") + } + return conn, err + } + return conn, nil +} +func (c *Connmgr) failed(r *Request) { + + c.actionch <- func() { + // priority to check if it is permanent or inbound + // if so then these peers are valuable in NEO and so we will just retry another time + if r.Inbound || r.Permanent { + multiplier := time.Duration(r.Retries * 10) + time.AfterFunc(multiplier*time.Second, + func() { + c.Connect(r) + }, + ) + // if not then we should check if this request has had maxRetries + // if it has then get a new address + // if not then call Connect on it again + } else if r.Retries > maxRetries { + if c.config.GetAddress != nil { + go c.NewRequest() + } + } else { + go c.Connect(r) + } + } + +} + +// Disconnected is called when a peer disconnects. +// we take the addr from peer, which is also it's key in the map +// and we use it to remove it from the connectedList +func (c *Connmgr) disconnected(r *Request) error { + + if r == nil { + // if object is nil, we return nil + return nil + } + + // if for some reason the underlying connection is not closed, close it + err := r.Conn.Close() + if err != nil { + return err + } + + // remove from any pending/connected list + delete(c.PendingList, r.Addr) + delete(c.ConnectedList, r.Addr) + + // If permanent,then lets retry + if r.Permanent { + return c.Connect(r) + } + + return nil +} + +//Connected is called when the connection manager makes a successful connection. +func (c *Connmgr) connected(r *Request) error { + + // This should not be the case, since we connected + if r == nil { + return errors.New("request object as nil inside of the connected function") + } + + // reset retries to 0 + r.Retries = 0 + + // add to connectedList + c.ConnectedList[r.Addr] = r + + // remove from pending if it was there + delete(c.PendingList, r.Addr) + + if c.config.OnConnection != nil { + c.config.OnConnection(r.Conn, r.Addr) + } + + return nil +} + +// Pending is synchronous, we do not want to continue with logic +// until we are certain it has been added to the pendingList +func (c *Connmgr) pending(r *Request) error { + + if r == nil { + return errors.New("request object is nil") + } + + c.PendingList[r.Addr] = r + + return nil +} + +// Run will start the connection manager +func (c *Connmgr) Run() error { + fmt.Println("Connection manager started") + go c.loop() + return nil +} + +func (c *Connmgr) loop() { + for { + select { + case f := <-c.actionch: + f() + } + } +} + +// https://stackoverflow.com/questions/50056144/check-for-internet-connection-from-application +func isConnected() (ok bool) { + _, err := http.Get("http://clients3.google.com/generate_204") + if err != nil { + return false + } + return true +} diff --git a/pkg/connmgr/connmgr_test.go b/pkg/connmgr/connmgr_test.go new file mode 100755 index 000000000..a060cf838 --- /dev/null +++ b/pkg/connmgr/connmgr_test.go @@ -0,0 +1,107 @@ +package connmgr + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDial(t *testing.T) { + cfg := Config{ + GetAddress: nil, + OnConnection: nil, + OnAccept: nil, + AddressPort: "", + DialTimeout: 0, + } + + cm := New(cfg) + err := cm.Run() + assert.Equal(t, nil, err) + + ipport := "google.com:80" // google unlikely to go offline, a better approach to test Dialing is welcome. + + conn, err := cm.dial(ipport) + assert.Equal(t, nil, err) + assert.NotEqual(t, nil, conn) +} +func TestConnect(t *testing.T) { + cfg := Config{ + GetAddress: nil, + OnConnection: nil, + OnAccept: nil, + AddressPort: "", + DialTimeout: 0, + } + + cm := New(cfg) + cm.Run() + + ipport := "google.com:80" + + r := Request{Addr: ipport} + + err := cm.Connect(&r) + assert.Nil(t, err) + + assert.Equal(t, 1, len(cm.ConnectedList)) + +} +func TestNewRequest(t *testing.T) { + + address := "google.com:80" + + var getAddr = func() (string, error) { + return address, nil + } + + cfg := Config{ + GetAddress: getAddr, + OnConnection: nil, + OnAccept: nil, + AddressPort: "", + DialTimeout: 0, + } + + cm := New(cfg) + + cm.Run() + + cm.NewRequest() + + if _, ok := cm.ConnectedList[address]; ok { + assert.Equal(t, true, ok) + assert.Equal(t, 1, len(cm.ConnectedList)) + return + } + + assert.Fail(t, "Could not find the address in the connected lists") + +} +func TestDisconnect(t *testing.T) { + + address := "google.com:80" + + var getAddr = func() (string, error) { + return address, nil + } + + cfg := Config{ + GetAddress: getAddr, + OnConnection: nil, + OnAccept: nil, + AddressPort: "", + DialTimeout: 0, + } + + cm := New(cfg) + + cm.Run() + + cm.NewRequest() + + cm.Disconnect(address) + + assert.Equal(t, 0, len(cm.ConnectedList)) + +} diff --git a/pkg/connmgr/readme.md b/pkg/connmgr/readme.md new file mode 100755 index 000000000..d5d43fa01 --- /dev/null +++ b/pkg/connmgr/readme.md @@ -0,0 +1,22 @@ +# Package - Connection Manager + +## Responsibility + +- Manages the active, failed and pending connections for the node. + +## Features + +- Takes an Request, dials it and logs information based on the connectivity. + +- Retry failed connections. + +- Removable address source. The connection manager does not manage addresses, only connections. + + +## Usage + +The following methods are exposed from the Connection manager: + +- Connect(r *Request) : This takes a Request object and connects to it. It follow the same logic as NewRequest() however instead of getting the address from the datasource given upon initialisation, you directly feed the address you want to connect to. + +- Disconnect(addrport string) : Given an address:port, this will disconnect it, close the connection and remove it from the connected and pending list, if it was there. diff --git a/pkg/connmgr/request.go b/pkg/connmgr/request.go new file mode 100755 index 000000000..0d7def3c3 --- /dev/null +++ b/pkg/connmgr/request.go @@ -0,0 +1,15 @@ +package connmgr + +import ( + "net" +) + +// Request is a layer on top of connection and allows us to add metadata to the net.Conn +// that the connection manager can use to determine whether to retry and other useful heuristics +type Request struct { + Conn net.Conn + Addr string + Permanent bool + Inbound bool + Retries uint8 // should not be trying more than 255 tries +}