2021-02-02 07:45:02 +00:00
|
|
|
package rpcbroadcaster
|
|
|
|
|
|
|
|
import (
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
2022-04-20 18:30:09 +00:00
|
|
|
// RPCBroadcaster represents a generic RPC broadcaster.
|
2021-02-02 07:45:02 +00:00
|
|
|
type RPCBroadcaster struct {
|
|
|
|
Clients map[string]*RPCClient
|
|
|
|
Log *zap.Logger
|
2022-07-07 17:03:10 +00:00
|
|
|
Responses chan []interface{}
|
2021-02-02 07:45:02 +00:00
|
|
|
|
|
|
|
close chan struct{}
|
2022-07-01 20:04:54 +00:00
|
|
|
finished chan struct{}
|
2021-02-02 07:45:02 +00:00
|
|
|
sendTimeout time.Duration
|
|
|
|
}
|
|
|
|
|
2022-04-20 18:30:09 +00:00
|
|
|
// NewRPCBroadcaster returns a new RPC broadcaster instance.
|
2021-02-02 07:45:02 +00:00
|
|
|
func NewRPCBroadcaster(log *zap.Logger, sendTimeout time.Duration) *RPCBroadcaster {
|
|
|
|
return &RPCBroadcaster{
|
|
|
|
Clients: make(map[string]*RPCClient),
|
|
|
|
Log: log,
|
|
|
|
close: make(chan struct{}),
|
2022-07-01 20:04:54 +00:00
|
|
|
finished: make(chan struct{}),
|
2022-07-07 17:03:10 +00:00
|
|
|
Responses: make(chan []interface{}),
|
2021-02-02 07:45:02 +00:00
|
|
|
sendTimeout: sendTimeout,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run implements oracle.Broadcaster.
|
|
|
|
func (r *RPCBroadcaster) Run() {
|
|
|
|
for _, c := range r.Clients {
|
|
|
|
go c.run()
|
|
|
|
}
|
2022-07-01 20:04:54 +00:00
|
|
|
run:
|
2021-02-02 07:45:02 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-r.close:
|
2022-07-01 20:04:54 +00:00
|
|
|
break run
|
2021-02-02 07:45:02 +00:00
|
|
|
case ps := <-r.Responses:
|
|
|
|
for _, c := range r.Clients {
|
|
|
|
select {
|
|
|
|
case c.responses <- ps:
|
|
|
|
default:
|
|
|
|
c.log.Error("can't send response, channel is full")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-07-01 20:04:54 +00:00
|
|
|
for _, c := range r.Clients {
|
|
|
|
<-c.finished
|
|
|
|
}
|
|
|
|
drain:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-r.Responses:
|
|
|
|
default:
|
|
|
|
break drain
|
|
|
|
}
|
|
|
|
}
|
|
|
|
close(r.Responses)
|
|
|
|
close(r.finished)
|
|
|
|
}
|
|
|
|
|
|
|
|
// SendParams sends a request using all clients if the broadcaster is active.
|
2022-07-07 17:03:10 +00:00
|
|
|
func (r *RPCBroadcaster) SendParams(params []interface{}) {
|
2022-07-01 20:04:54 +00:00
|
|
|
select {
|
|
|
|
case <-r.close:
|
|
|
|
case r.Responses <- params:
|
|
|
|
}
|
2021-02-02 07:45:02 +00:00
|
|
|
}
|
|
|
|
|
2022-07-04 20:03:50 +00:00
|
|
|
// Shutdown implements oracle.Broadcaster. The same instance can't be Run again
|
|
|
|
// after the shutdown.
|
2021-02-02 07:45:02 +00:00
|
|
|
func (r *RPCBroadcaster) Shutdown() {
|
|
|
|
close(r.close)
|
2022-07-01 20:04:54 +00:00
|
|
|
<-r.finished
|
2021-02-02 07:45:02 +00:00
|
|
|
}
|