From 717be43a5d1d69d443fee4c2a29bb1845bf615ea Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Tue, 2 Feb 2021 10:45:02 +0300 Subject: [PATCH] oracle: split broadcaster into parts 1. Generic parallel sending part can be reused by state service. 2. Specific oracle marshaling is implemented on top of (1). --- .../helpers/rpcbroadcaster/broadcaster.go | 55 ++++++++++++++++++ .../rpcbroadcaster}/client.go | 21 ++++--- pkg/services/oracle/broadcaster/oracle.go | 56 ++++--------------- 3 files changed, 81 insertions(+), 51 deletions(-) create mode 100644 pkg/services/helpers/rpcbroadcaster/broadcaster.go rename pkg/services/{oracle/broadcaster => helpers/rpcbroadcaster}/client.go (63%) diff --git a/pkg/services/helpers/rpcbroadcaster/broadcaster.go b/pkg/services/helpers/rpcbroadcaster/broadcaster.go new file mode 100644 index 000000000..d1126cfc7 --- /dev/null +++ b/pkg/services/helpers/rpcbroadcaster/broadcaster.go @@ -0,0 +1,55 @@ +package rpcbroadcaster + +import ( + "time" + + "github.com/nspcc-dev/neo-go/pkg/rpc/request" + "go.uber.org/zap" +) + +// RPCBroadcaster represent generic RPC broadcaster. +type RPCBroadcaster struct { + Clients map[string]*RPCClient + Log *zap.Logger + Responses chan request.RawParams + + close chan struct{} + sendTimeout time.Duration +} + +// NewRPCBroadcaster returns new RPC broadcaster instance. +func NewRPCBroadcaster(log *zap.Logger, sendTimeout time.Duration) *RPCBroadcaster { + return &RPCBroadcaster{ + Clients: make(map[string]*RPCClient), + Log: log, + close: make(chan struct{}), + Responses: make(chan request.RawParams), + sendTimeout: sendTimeout, + } +} + +// Run implements oracle.Broadcaster. +func (r *RPCBroadcaster) Run() { + for _, c := range r.Clients { + go c.run() + } + for { + select { + case <-r.close: + return + case ps := <-r.Responses: + for _, c := range r.Clients { + select { + case c.responses <- ps: + default: + c.log.Error("can't send response, channel is full") + } + } + } + } +} + +// Shutdown implements oracle.Broadcaster. +func (r *RPCBroadcaster) Shutdown() { + close(r.close) +} diff --git a/pkg/services/oracle/broadcaster/client.go b/pkg/services/helpers/rpcbroadcaster/client.go similarity index 63% rename from pkg/services/oracle/broadcaster/client.go rename to pkg/services/helpers/rpcbroadcaster/client.go index bfc7accbe..95cbcffa9 100644 --- a/pkg/services/oracle/broadcaster/client.go +++ b/pkg/services/helpers/rpcbroadcaster/client.go @@ -1,4 +1,4 @@ -package broadcaster +package rpcbroadcaster import ( "context" @@ -9,26 +9,33 @@ import ( "go.uber.org/zap" ) -type oracleClient struct { +// RPCClient represent rpc client for a single node. +type RPCClient struct { client *client.Client addr string close chan struct{} responses chan request.RawParams log *zap.Logger sendTimeout time.Duration + method SendMethod } -func (r *rpcBroascaster) newOracleClient(addr string, timeout time.Duration, ch chan request.RawParams) *oracleClient { - return &oracleClient{ +// SendMethod represents rpc method for sending data to other nodes. +type SendMethod func(*client.Client, request.RawParams) error + +// NewRPCClient returns new rpc client for provided address and method. +func (r *RPCBroadcaster) NewRPCClient(addr string, method SendMethod, timeout time.Duration, ch chan request.RawParams) *RPCClient { + return &RPCClient{ addr: addr, close: r.close, responses: ch, - log: r.log.With(zap.String("address", addr)), + log: r.Log.With(zap.String("address", addr)), sendTimeout: timeout, + method: method, } } -func (c *oracleClient) run() { +func (c *RPCClient) run() { // We ignore error as not every node can be available on startup. c.client, _ = client.New(context.Background(), "http://"+c.addr, client.Options{ DialTimeout: c.sendTimeout, @@ -49,7 +56,7 @@ func (c *oracleClient) run() { continue } } - err := c.client.SubmitRawOracleResponse(ps) + err := c.method(c.client, ps) if err != nil { c.log.Error("error while submitting oracle response", zap.Error(err)) } diff --git a/pkg/services/oracle/broadcaster/oracle.go b/pkg/services/oracle/broadcaster/oracle.go index 2d8baafd2..db9aec096 100644 --- a/pkg/services/oracle/broadcaster/oracle.go +++ b/pkg/services/oracle/broadcaster/oracle.go @@ -8,72 +8,40 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/rpc/client" "github.com/nspcc-dev/neo-go/pkg/rpc/request" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/rpcbroadcaster" "github.com/nspcc-dev/neo-go/pkg/services/oracle" "go.uber.org/zap" ) -type rpcBroascaster struct { - clients map[string]*oracleClient - log *zap.Logger - - close chan struct{} - responses chan request.RawParams - sendTimeout time.Duration -} - const ( defaultSendTimeout = time.Second * 4 defaultChanCapacity = 16 ) +type oracleBroadcaster struct { + rpcbroadcaster.RPCBroadcaster +} + // New returns new struct capable of broadcasting oracle responses. func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster { if cfg.ResponseTimeout == 0 { cfg.ResponseTimeout = defaultSendTimeout } - r := &rpcBroascaster{ - clients: make(map[string]*oracleClient, len(cfg.Nodes)), - log: log, - close: make(chan struct{}), - responses: make(chan request.RawParams), - sendTimeout: cfg.ResponseTimeout, + r := &oracleBroadcaster{ + RPCBroadcaster: *rpcbroadcaster.NewRPCBroadcaster(log, cfg.ResponseTimeout), } for i := range cfg.Nodes { - r.clients[cfg.Nodes[i]] = r.newOracleClient(cfg.Nodes[i], cfg.ResponseTimeout, make(chan request.RawParams, defaultChanCapacity)) + r.Clients[cfg.Nodes[i]] = r.NewRPCClient(cfg.Nodes[i], (*client.Client).SubmitRawOracleResponse, + cfg.ResponseTimeout, make(chan request.RawParams, defaultChanCapacity)) } return r } -// Run implements oracle.Broadcaster. -func (r *rpcBroascaster) Run() { - for _, c := range r.clients { - go c.run() - } - for { - select { - case <-r.close: - return - case ps := <-r.responses: - for _, c := range r.clients { - select { - case c.responses <- ps: - default: - c.log.Error("can't send response, channel is full") - } - } - } - } -} - -// Shutdown implements oracle.Broadcaster. -func (r *rpcBroascaster) Shutdown() { - close(r.close) -} - // SendResponse implements interfaces.Broadcaster. -func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { +func (r *oracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { pub := priv.PublicKey() data := GetMessage(pub.Bytes(), resp.ID, txSig) msgSig := priv.Sign(data) @@ -83,7 +51,7 @@ func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.O base64.StdEncoding.EncodeToString(txSig), base64.StdEncoding.EncodeToString(msgSig), ) - r.responses <- params + r.Responses <- params } // GetMessage returns data which is signed upon sending response by RPC.