oracle: submit responses concurrently

This commit is contained in:
Evgenii Stratonikov 2020-10-08 14:50:10 +03:00 committed by Evgeniy Stratonikov
parent 25d734cbad
commit aa852aaaac
4 changed files with 107 additions and 25 deletions

View file

@ -279,6 +279,8 @@ func (b saveToMapBroadcaster) SendResponse(_ *keys.PrivateKey, resp *transaction
txSig: txSig, txSig: txSig,
} }
} }
func (saveToMapBroadcaster) Run() {}
func (saveToMapBroadcaster) Shutdown() {}
type responseWithSig struct { type responseWithSig struct {
resp *transaction.OracleResponse resp *transaction.OracleResponse

View file

@ -0,0 +1,58 @@
package broadcaster
import (
"context"
"time"
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
"github.com/nspcc-dev/neo-go/pkg/rpc/request"
"go.uber.org/zap"
)
type oracleClient struct {
client *client.Client
addr string
close chan struct{}
responses chan request.RawParams
log *zap.Logger
sendTimeout time.Duration
}
func (r *rpcBroascaster) newOracleClient(addr string, timeout time.Duration, ch chan request.RawParams) *oracleClient {
return &oracleClient{
addr: addr,
close: r.close,
responses: ch,
log: r.log.With(zap.String("address", addr)),
sendTimeout: timeout,
}
}
func (c *oracleClient) run() {
// We ignore error as not every node can be available on startup.
c.client, _ = client.New(context.Background(), "http://"+c.addr, client.Options{
DialTimeout: c.sendTimeout,
RequestTimeout: c.sendTimeout,
})
for {
select {
case <-c.close:
return
case ps := <-c.responses:
if c.client == nil {
var err error
c.client, err = client.New(context.Background(), "http://"+c.addr, client.Options{
DialTimeout: c.sendTimeout,
RequestTimeout: c.sendTimeout,
})
if err != nil {
continue
}
}
err := c.client.SubmitRawOracleResponse(ps)
if err != nil {
c.log.Error("error while submitting oracle response", zap.Error(err))
}
}
}
}

View file

@ -1,7 +1,6 @@
package broadcaster package broadcaster
import ( import (
"context"
"encoding/base64" "encoding/base64"
"encoding/binary" "encoding/binary"
"time" "time"
@ -9,21 +8,24 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
"github.com/nspcc-dev/neo-go/pkg/rpc/request" "github.com/nspcc-dev/neo-go/pkg/rpc/request"
"github.com/nspcc-dev/neo-go/pkg/services/oracle" "github.com/nspcc-dev/neo-go/pkg/services/oracle"
"go.uber.org/zap" "go.uber.org/zap"
) )
type rpcBroascaster struct { type rpcBroascaster struct {
clients map[string]*client.Client clients map[string]*oracleClient
log *zap.Logger log *zap.Logger
close chan struct{}
responses chan request.RawParams
sendTimeout time.Duration sendTimeout time.Duration
} }
const ( const (
defaultSendTimeout = time.Second * 4 defaultSendTimeout = time.Second * 4
defaultChanCapacity = 16
) )
// New returns new struct capable of broadcasting oracle responses. // New returns new struct capable of broadcasting oracle responses.
@ -32,20 +34,44 @@ func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster {
cfg.ResponseTimeout = defaultSendTimeout cfg.ResponseTimeout = defaultSendTimeout
} }
r := &rpcBroascaster{ r := &rpcBroascaster{
clients: make(map[string]*client.Client, len(cfg.Nodes)), clients: make(map[string]*oracleClient, len(cfg.Nodes)),
log: log, log: log,
close: make(chan struct{}),
responses: make(chan request.RawParams),
sendTimeout: cfg.ResponseTimeout, sendTimeout: cfg.ResponseTimeout,
} }
for i := range cfg.Nodes { for i := range cfg.Nodes {
// We ignore error as not every node can be available on startup. r.clients[cfg.Nodes[i]] = r.newOracleClient(cfg.Nodes[i], cfg.ResponseTimeout, make(chan request.RawParams, defaultChanCapacity))
r.clients[cfg.Nodes[i]], _ = client.New(context.Background(), "http://"+cfg.Nodes[i], client.Options{
DialTimeout: cfg.ResponseTimeout,
RequestTimeout: cfg.ResponseTimeout,
})
} }
return r return r
} }
// Run implements oracle.Broadcaster.
func (r *rpcBroascaster) Run() {
for _, c := range r.clients {
go c.run()
}
for {
select {
case <-r.close:
return
case ps := <-r.responses:
for _, c := range r.clients {
select {
case c.responses <- ps:
default:
c.log.Error("can't send response, channel is full")
}
}
}
}
}
// Shutdown implements oracle.Broadcaster.
func (r *rpcBroascaster) Shutdown() {
close(r.close)
}
// SendResponse implements interfaces.Broadcaster. // SendResponse implements interfaces.Broadcaster.
func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) {
pub := priv.PublicKey() pub := priv.PublicKey()
@ -57,22 +83,7 @@ func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.O
base64.StdEncoding.EncodeToString(txSig), base64.StdEncoding.EncodeToString(txSig),
base64.StdEncoding.EncodeToString(msgSig), base64.StdEncoding.EncodeToString(msgSig),
) )
for addr, c := range r.clients { r.responses <- params
if c == nil {
var err error
c, err = client.New(context.Background(), addr, client.Options{
DialTimeout: r.sendTimeout,
RequestTimeout: r.sendTimeout,
})
if err != nil {
r.log.Debug("can't connect to oracle node", zap.String("address", addr), zap.Error(err))
continue
}
r.clients[addr] = c
}
err := c.SubmitRawOracleResponse(params)
r.log.Debug("error during oracle response submit", zap.String("address", addr), zap.Error(err))
}
} }
// GetMessage returns data which is signed upon sending response by RPC. // GetMessage returns data which is signed upon sending response by RPC.

View file

@ -67,6 +67,8 @@ type (
// Broadcaster broadcasts oracle responses. // Broadcaster broadcasts oracle responses.
Broadcaster interface { Broadcaster interface {
SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte)
Run()
Shutdown()
} }
defaultResponseHandler struct{} defaultResponseHandler struct{}
@ -137,6 +139,7 @@ func NewOracle(cfg Config) (*Oracle, error) {
// Shutdown shutdowns Oracle. // Shutdown shutdowns Oracle.
func (o *Oracle) Shutdown() { func (o *Oracle) Shutdown() {
close(o.close) close(o.close)
o.getBroadcaster().Shutdown()
} }
// Run runs must be executed in a separate goroutine. // Run runs must be executed in a separate goroutine.
@ -182,9 +185,17 @@ func (o *Oracle) getBroadcaster() Broadcaster {
func (o *Oracle) SetBroadcaster(b Broadcaster) { func (o *Oracle) SetBroadcaster(b Broadcaster) {
o.mtx.Lock() o.mtx.Lock()
defer o.mtx.Unlock() defer o.mtx.Unlock()
o.ResponseHandler.Shutdown()
o.ResponseHandler = b o.ResponseHandler = b
go b.Run()
} }
// SendResponse implements Broadcaster interface. // SendResponse implements Broadcaster interface.
func (defaultResponseHandler) SendResponse(*keys.PrivateKey, *transaction.OracleResponse, []byte) { func (defaultResponseHandler) SendResponse(*keys.PrivateKey, *transaction.OracleResponse, []byte) {
} }
// Run implements Broadcaster interface.
func (defaultResponseHandler) Run() {}
// Shutdown implements Broadcaster interface.
func (defaultResponseHandler) Shutdown() {}