package rpcbroadcaster import ( "time" "go.uber.org/zap" ) // RPCBroadcaster represents a generic RPC broadcaster. type RPCBroadcaster struct { Clients map[string]*RPCClient Log *zap.Logger Responses chan []interface{} close chan struct{} finished chan struct{} sendTimeout time.Duration } // NewRPCBroadcaster returns a new RPC broadcaster instance. func NewRPCBroadcaster(log *zap.Logger, sendTimeout time.Duration) *RPCBroadcaster { return &RPCBroadcaster{ Clients: make(map[string]*RPCClient), Log: log, close: make(chan struct{}), finished: make(chan struct{}), Responses: make(chan []interface{}), sendTimeout: sendTimeout, } } // Run implements oracle.Broadcaster. func (r *RPCBroadcaster) Run() { for _, c := range r.Clients { go c.run() } run: for { select { case <-r.close: break run case ps := <-r.Responses: for _, c := range r.Clients { select { case c.responses <- ps: default: c.log.Error("can't send response, channel is full") } } } } for _, c := range r.Clients { <-c.finished } drain: for { select { case <-r.Responses: default: break drain } } close(r.Responses) close(r.finished) } // SendParams sends a request using all clients if the broadcaster is active. func (r *RPCBroadcaster) SendParams(params []interface{}) { select { case <-r.close: case r.Responses <- params: } } // Shutdown implements oracle.Broadcaster. The same instance can't be Run again // after the shutdown. func (r *RPCBroadcaster) Shutdown() { close(r.close) <-r.finished }