oracle: split broadcaster into parts

1. Generic parallel sending part can be reused by state service.
2. Specific oracle marshaling is implemented on top of (1).
This commit is contained in:
Evgeniy Stratonikov 2021-02-02 10:45:02 +03:00
parent f087775160
commit 717be43a5d
3 changed files with 81 additions and 51 deletions

View file

@ -0,0 +1,55 @@
package rpcbroadcaster
import (
"time"
"github.com/nspcc-dev/neo-go/pkg/rpc/request"
"go.uber.org/zap"
)
// RPCBroadcaster represent generic RPC broadcaster.
type RPCBroadcaster struct {
Clients map[string]*RPCClient
Log *zap.Logger
Responses chan request.RawParams
close chan struct{}
sendTimeout time.Duration
}
// NewRPCBroadcaster returns new RPC broadcaster instance.
func NewRPCBroadcaster(log *zap.Logger, sendTimeout time.Duration) *RPCBroadcaster {
return &RPCBroadcaster{
Clients: make(map[string]*RPCClient),
Log: log,
close: make(chan struct{}),
Responses: make(chan request.RawParams),
sendTimeout: sendTimeout,
}
}
// Run implements oracle.Broadcaster.
func (r *RPCBroadcaster) Run() {
for _, c := range r.Clients {
go c.run()
}
for {
select {
case <-r.close:
return
case ps := <-r.Responses:
for _, c := range r.Clients {
select {
case c.responses <- ps:
default:
c.log.Error("can't send response, channel is full")
}
}
}
}
}
// Shutdown implements oracle.Broadcaster.
func (r *RPCBroadcaster) Shutdown() {
close(r.close)
}

View file

@ -1,4 +1,4 @@
package broadcaster package rpcbroadcaster
import ( import (
"context" "context"
@ -9,26 +9,33 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type oracleClient struct { // RPCClient represent rpc client for a single node.
type RPCClient struct {
client *client.Client client *client.Client
addr string addr string
close chan struct{} close chan struct{}
responses chan request.RawParams responses chan request.RawParams
log *zap.Logger log *zap.Logger
sendTimeout time.Duration sendTimeout time.Duration
method SendMethod
} }
func (r *rpcBroascaster) newOracleClient(addr string, timeout time.Duration, ch chan request.RawParams) *oracleClient { // SendMethod represents rpc method for sending data to other nodes.
return &oracleClient{ type SendMethod func(*client.Client, request.RawParams) error
// NewRPCClient returns new rpc client for provided address and method.
func (r *RPCBroadcaster) NewRPCClient(addr string, method SendMethod, timeout time.Duration, ch chan request.RawParams) *RPCClient {
return &RPCClient{
addr: addr, addr: addr,
close: r.close, close: r.close,
responses: ch, responses: ch,
log: r.log.With(zap.String("address", addr)), log: r.Log.With(zap.String("address", addr)),
sendTimeout: timeout, sendTimeout: timeout,
method: method,
} }
} }
func (c *oracleClient) run() { func (c *RPCClient) run() {
// We ignore error as not every node can be available on startup. // We ignore error as not every node can be available on startup.
c.client, _ = client.New(context.Background(), "http://"+c.addr, client.Options{ c.client, _ = client.New(context.Background(), "http://"+c.addr, client.Options{
DialTimeout: c.sendTimeout, DialTimeout: c.sendTimeout,
@ -49,7 +56,7 @@ func (c *oracleClient) run() {
continue continue
} }
} }
err := c.client.SubmitRawOracleResponse(ps) err := c.method(c.client, ps)
if err != nil { if err != nil {
c.log.Error("error while submitting oracle response", zap.Error(err)) c.log.Error("error while submitting oracle response", zap.Error(err))
} }

View file

@ -8,72 +8,40 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
"github.com/nspcc-dev/neo-go/pkg/rpc/request" "github.com/nspcc-dev/neo-go/pkg/rpc/request"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/rpcbroadcaster"
"github.com/nspcc-dev/neo-go/pkg/services/oracle" "github.com/nspcc-dev/neo-go/pkg/services/oracle"
"go.uber.org/zap" "go.uber.org/zap"
) )
type rpcBroascaster struct {
clients map[string]*oracleClient
log *zap.Logger
close chan struct{}
responses chan request.RawParams
sendTimeout time.Duration
}
const ( const (
defaultSendTimeout = time.Second * 4 defaultSendTimeout = time.Second * 4
defaultChanCapacity = 16 defaultChanCapacity = 16
) )
type oracleBroadcaster struct {
rpcbroadcaster.RPCBroadcaster
}
// New returns new struct capable of broadcasting oracle responses. // New returns new struct capable of broadcasting oracle responses.
func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster { func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster {
if cfg.ResponseTimeout == 0 { if cfg.ResponseTimeout == 0 {
cfg.ResponseTimeout = defaultSendTimeout cfg.ResponseTimeout = defaultSendTimeout
} }
r := &rpcBroascaster{ r := &oracleBroadcaster{
clients: make(map[string]*oracleClient, len(cfg.Nodes)), RPCBroadcaster: *rpcbroadcaster.NewRPCBroadcaster(log, cfg.ResponseTimeout),
log: log,
close: make(chan struct{}),
responses: make(chan request.RawParams),
sendTimeout: cfg.ResponseTimeout,
} }
for i := range cfg.Nodes { for i := range cfg.Nodes {
r.clients[cfg.Nodes[i]] = r.newOracleClient(cfg.Nodes[i], cfg.ResponseTimeout, make(chan request.RawParams, defaultChanCapacity)) r.Clients[cfg.Nodes[i]] = r.NewRPCClient(cfg.Nodes[i], (*client.Client).SubmitRawOracleResponse,
cfg.ResponseTimeout, make(chan request.RawParams, defaultChanCapacity))
} }
return r return r
} }
// Run implements oracle.Broadcaster.
func (r *rpcBroascaster) Run() {
for _, c := range r.clients {
go c.run()
}
for {
select {
case <-r.close:
return
case ps := <-r.responses:
for _, c := range r.clients {
select {
case c.responses <- ps:
default:
c.log.Error("can't send response, channel is full")
}
}
}
}
}
// Shutdown implements oracle.Broadcaster.
func (r *rpcBroascaster) Shutdown() {
close(r.close)
}
// SendResponse implements interfaces.Broadcaster. // SendResponse implements interfaces.Broadcaster.
func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { func (r *oracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) {
pub := priv.PublicKey() pub := priv.PublicKey()
data := GetMessage(pub.Bytes(), resp.ID, txSig) data := GetMessage(pub.Bytes(), resp.ID, txSig)
msgSig := priv.Sign(data) msgSig := priv.Sign(data)
@ -83,7 +51,7 @@ func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.O
base64.StdEncoding.EncodeToString(txSig), base64.StdEncoding.EncodeToString(txSig),
base64.StdEncoding.EncodeToString(msgSig), base64.StdEncoding.EncodeToString(msgSig),
) )
r.responses <- params r.Responses <- params
} }
// GetMessage returns data which is signed upon sending response by RPC. // GetMessage returns data which is signed upon sending response by RPC.