network: wait for exit in discoverer

And synchronize other threads with channels instead of mutexes. Overall this
scheme is more reliable.
This commit is contained in:
Roman Khimov 2022-08-19 22:23:47 +03:00
parent eeeb0f6f0e
commit 779a5c070f

View file

@ -49,6 +49,7 @@ type DefaultDiscovery struct {
isDead bool isDead bool
requestCh chan int requestCh chan int
pool chan string pool chan string
runExit chan struct{}
} }
// NewDefaultDiscovery returns a new DefaultDiscovery. // NewDefaultDiscovery returns a new DefaultDiscovery.
@ -64,6 +65,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa
attempted: make(map[string]bool), attempted: make(map[string]bool),
requestCh: make(chan int), requestCh: make(chan int),
pool: make(chan string, maxPoolSize), pool: make(chan string, maxPoolSize),
runExit: make(chan struct{}),
} }
go d.run() go d.run()
return d return d
@ -211,6 +213,7 @@ func (d *DefaultDiscovery) Close() {
default: default:
} }
close(d.requestCh) close(d.requestCh)
<-d.runExit
} }
// run is a goroutine that makes DefaultDiscovery process its queue to connect // run is a goroutine that makes DefaultDiscovery process its queue to connect
@ -259,7 +262,7 @@ func (d *DefaultDiscovery) run() {
} }
} }
if !ok { if !ok {
return break
} }
// Special case, no connections after all attempts. // Special case, no connections after all attempts.
d.lock.RLock() d.lock.RLock()
@ -270,4 +273,5 @@ func (d *DefaultDiscovery) run() {
requested = oldRequest requested = oldRequest
} }
} }
close(d.runExit)
} }