network: use and improve discovery mechanism for reconnections
This makes our node reconnect to other nodes if connection drops for some reason. Fixes #390.
This commit is contained in:
parent
b263c97bbb
commit
be6c905e5d
2 changed files with 25 additions and 17 deletions
|
@ -107,8 +107,21 @@ func (d *DefaultDiscovery) work(addrCh chan string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DefaultDiscovery) next() string {
|
func (d *DefaultDiscovery) requestToWork(workCh chan string) {
|
||||||
return <-d.pool
|
var requested int
|
||||||
|
|
||||||
|
for {
|
||||||
|
for requested = <-d.requestCh; requested > 0; requested-- {
|
||||||
|
select {
|
||||||
|
case r := <-d.requestCh:
|
||||||
|
if requested < r {
|
||||||
|
requested = r
|
||||||
|
}
|
||||||
|
case addr := <-d.pool:
|
||||||
|
workCh <- addr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DefaultDiscovery) run() {
|
func (d *DefaultDiscovery) run() {
|
||||||
|
@ -121,6 +134,7 @@ func (d *DefaultDiscovery) run() {
|
||||||
go d.work(workCh)
|
go d.work(workCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go d.requestToWork(workCh)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case addr := <-d.backFill:
|
case addr := <-d.backFill:
|
||||||
|
@ -132,18 +146,10 @@ func (d *DefaultDiscovery) run() {
|
||||||
d.unconnectedAddrs[addr] = true
|
d.unconnectedAddrs[addr] = true
|
||||||
d.pool <- addr
|
d.pool <- addr
|
||||||
}
|
}
|
||||||
case n := <-d.requestCh:
|
|
||||||
go func() {
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
workCh <- d.next()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
case addr := <-d.badAddrCh:
|
case addr := <-d.badAddrCh:
|
||||||
d.badAddrs[addr] = true
|
d.badAddrs[addr] = true
|
||||||
delete(d.unconnectedAddrs, addr)
|
delete(d.unconnectedAddrs, addr)
|
||||||
go func() {
|
d.RequestRemote(1)
|
||||||
workCh <- d.next()
|
|
||||||
}()
|
|
||||||
|
|
||||||
case addr := <-d.connectedCh:
|
case addr := <-d.connectedCh:
|
||||||
delete(d.unconnectedAddrs, addr)
|
delete(d.unconnectedAddrs, addr)
|
||||||
|
|
|
@ -15,7 +15,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// peer numbers are arbitrary at the moment
|
||||||
minPeers = 5
|
minPeers = 5
|
||||||
|
maxPeers = 20
|
||||||
maxBlockBatch = 200
|
maxBlockBatch = 200
|
||||||
minPoolCount = 30
|
minPoolCount = 30
|
||||||
)
|
)
|
||||||
|
@ -90,12 +92,7 @@ func (s *Server) Start(errChan chan error) {
|
||||||
"headerHeight": s.chain.HeaderHeight(),
|
"headerHeight": s.chain.HeaderHeight(),
|
||||||
}).Info("node started")
|
}).Info("node started")
|
||||||
|
|
||||||
for _, addr := range s.Seeds {
|
s.discovery.BackFill(s.Seeds...)
|
||||||
if err := s.transport.Dial(addr, s.DialTimeout); err != nil {
|
|
||||||
log.Warnf("failed to connect to remote node %s", addr)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
go s.transport.Accept()
|
go s.transport.Accept()
|
||||||
s.run()
|
s.run()
|
||||||
|
@ -122,6 +119,10 @@ func (s *Server) BadPeers() []string {
|
||||||
|
|
||||||
func (s *Server) run() {
|
func (s *Server) run() {
|
||||||
for {
|
for {
|
||||||
|
c := s.PeerCount()
|
||||||
|
if c < minPeers {
|
||||||
|
s.discovery.RequestRemote(maxPeers - c)
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case <-s.quit:
|
case <-s.quit:
|
||||||
s.transport.Close()
|
s.transport.Close()
|
||||||
|
@ -147,6 +148,7 @@ func (s *Server) run() {
|
||||||
"reason": drop.reason,
|
"reason": drop.reason,
|
||||||
"peerCount": s.PeerCount(),
|
"peerCount": s.PeerCount(),
|
||||||
}).Warn("peer disconnected")
|
}).Warn("peer disconnected")
|
||||||
|
s.discovery.BackFill(drop.peer.NetAddr().String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue