diff --git a/pkg/core/oracle_test.go b/pkg/core/oracle_test.go index 88bf84d36..a852fa899 100644 --- a/pkg/core/oracle_test.go +++ b/pkg/core/oracle_test.go @@ -279,6 +279,8 @@ func (b saveToMapBroadcaster) SendResponse(_ *keys.PrivateKey, resp *transaction txSig: txSig, } } +func (saveToMapBroadcaster) Run() {} +func (saveToMapBroadcaster) Shutdown() {} type responseWithSig struct { resp *transaction.OracleResponse diff --git a/pkg/services/oracle/broadcaster/client.go b/pkg/services/oracle/broadcaster/client.go new file mode 100644 index 000000000..bfc7accbe --- /dev/null +++ b/pkg/services/oracle/broadcaster/client.go @@ -0,0 +1,58 @@ +package broadcaster + +import ( + "context" + "time" + + "github.com/nspcc-dev/neo-go/pkg/rpc/client" + "github.com/nspcc-dev/neo-go/pkg/rpc/request" + "go.uber.org/zap" +) + +type oracleClient struct { + client *client.Client + addr string + close chan struct{} + responses chan request.RawParams + log *zap.Logger + sendTimeout time.Duration +} + +func (r *rpcBroascaster) newOracleClient(addr string, timeout time.Duration, ch chan request.RawParams) *oracleClient { + return &oracleClient{ + addr: addr, + close: r.close, + responses: ch, + log: r.log.With(zap.String("address", addr)), + sendTimeout: timeout, + } +} + +func (c *oracleClient) run() { + // We ignore error as not every node can be available on startup. + c.client, _ = client.New(context.Background(), "http://"+c.addr, client.Options{ + DialTimeout: c.sendTimeout, + RequestTimeout: c.sendTimeout, + }) + for { + select { + case <-c.close: + return + case ps := <-c.responses: + if c.client == nil { + var err error + c.client, err = client.New(context.Background(), "http://"+c.addr, client.Options{ + DialTimeout: c.sendTimeout, + RequestTimeout: c.sendTimeout, + }) + if err != nil { + continue + } + } + err := c.client.SubmitRawOracleResponse(ps) + if err != nil { + c.log.Error("error while submitting oracle response", zap.Error(err)) + } + } + } +} diff --git a/pkg/services/oracle/broadcaster/oracle.go b/pkg/services/oracle/broadcaster/oracle.go index a4182d3d2..2d8baafd2 100644 --- a/pkg/services/oracle/broadcaster/oracle.go +++ b/pkg/services/oracle/broadcaster/oracle.go @@ -1,7 +1,6 @@ package broadcaster import ( - "context" "encoding/base64" "encoding/binary" "time" @@ -9,21 +8,24 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neo-go/pkg/rpc/client" "github.com/nspcc-dev/neo-go/pkg/rpc/request" "github.com/nspcc-dev/neo-go/pkg/services/oracle" "go.uber.org/zap" ) type rpcBroascaster struct { - clients map[string]*client.Client + clients map[string]*oracleClient log *zap.Logger + close chan struct{} + responses chan request.RawParams sendTimeout time.Duration } const ( defaultSendTimeout = time.Second * 4 + + defaultChanCapacity = 16 ) // New returns new struct capable of broadcasting oracle responses. @@ -32,20 +34,44 @@ func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster { cfg.ResponseTimeout = defaultSendTimeout } r := &rpcBroascaster{ - clients: make(map[string]*client.Client, len(cfg.Nodes)), + clients: make(map[string]*oracleClient, len(cfg.Nodes)), log: log, + close: make(chan struct{}), + responses: make(chan request.RawParams), sendTimeout: cfg.ResponseTimeout, } for i := range cfg.Nodes { - // We ignore error as not every node can be available on startup. - r.clients[cfg.Nodes[i]], _ = client.New(context.Background(), "http://"+cfg.Nodes[i], client.Options{ - DialTimeout: cfg.ResponseTimeout, - RequestTimeout: cfg.ResponseTimeout, - }) + r.clients[cfg.Nodes[i]] = r.newOracleClient(cfg.Nodes[i], cfg.ResponseTimeout, make(chan request.RawParams, defaultChanCapacity)) } return r } +// Run implements oracle.Broadcaster. +func (r *rpcBroascaster) Run() { + for _, c := range r.clients { + go c.run() + } + for { + select { + case <-r.close: + return + case ps := <-r.responses: + for _, c := range r.clients { + select { + case c.responses <- ps: + default: + c.log.Error("can't send response, channel is full") + } + } + } + } +} + +// Shutdown implements oracle.Broadcaster. +func (r *rpcBroascaster) Shutdown() { + close(r.close) +} + // SendResponse implements interfaces.Broadcaster. func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { pub := priv.PublicKey() @@ -57,22 +83,7 @@ func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.O base64.StdEncoding.EncodeToString(txSig), base64.StdEncoding.EncodeToString(msgSig), ) - for addr, c := range r.clients { - if c == nil { - var err error - c, err = client.New(context.Background(), addr, client.Options{ - DialTimeout: r.sendTimeout, - RequestTimeout: r.sendTimeout, - }) - if err != nil { - r.log.Debug("can't connect to oracle node", zap.String("address", addr), zap.Error(err)) - continue - } - r.clients[addr] = c - } - err := c.SubmitRawOracleResponse(params) - r.log.Debug("error during oracle response submit", zap.String("address", addr), zap.Error(err)) - } + r.responses <- params } // GetMessage returns data which is signed upon sending response by RPC. diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 264636f88..1bf33022c 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -67,6 +67,8 @@ type ( // Broadcaster broadcasts oracle responses. Broadcaster interface { SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) + Run() + Shutdown() } defaultResponseHandler struct{} @@ -137,6 +139,7 @@ func NewOracle(cfg Config) (*Oracle, error) { // Shutdown shutdowns Oracle. func (o *Oracle) Shutdown() { close(o.close) + o.getBroadcaster().Shutdown() } // Run runs must be executed in a separate goroutine. @@ -182,9 +185,17 @@ func (o *Oracle) getBroadcaster() Broadcaster { func (o *Oracle) SetBroadcaster(b Broadcaster) { o.mtx.Lock() defer o.mtx.Unlock() + o.ResponseHandler.Shutdown() o.ResponseHandler = b + go b.Run() } // SendResponse implements Broadcaster interface. func (defaultResponseHandler) SendResponse(*keys.PrivateKey, *transaction.OracleResponse, []byte) { } + +// Run implements Broadcaster interface. +func (defaultResponseHandler) Run() {} + +// Shutdown implements Broadcaster interface. +func (defaultResponseHandler) Shutdown() {}