96 lines
2.4 KiB
Go
96 lines
2.4 KiB
Go
package broadcaster
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"encoding/binary"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/nspcc-dev/neo-go/pkg/rpc/request"
|
|
"github.com/nspcc-dev/neo-go/pkg/services/oracle"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type rpcBroascaster struct {
|
|
clients map[string]*oracleClient
|
|
log *zap.Logger
|
|
|
|
close chan struct{}
|
|
responses chan request.RawParams
|
|
sendTimeout time.Duration
|
|
}
|
|
|
|
const (
|
|
defaultSendTimeout = time.Second * 4
|
|
|
|
defaultChanCapacity = 16
|
|
)
|
|
|
|
// New returns new struct capable of broadcasting oracle responses.
|
|
func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster {
|
|
if cfg.ResponseTimeout == 0 {
|
|
cfg.ResponseTimeout = defaultSendTimeout
|
|
}
|
|
r := &rpcBroascaster{
|
|
clients: make(map[string]*oracleClient, len(cfg.Nodes)),
|
|
log: log,
|
|
close: make(chan struct{}),
|
|
responses: make(chan request.RawParams),
|
|
sendTimeout: cfg.ResponseTimeout,
|
|
}
|
|
for i := range cfg.Nodes {
|
|
r.clients[cfg.Nodes[i]] = r.newOracleClient(cfg.Nodes[i], cfg.ResponseTimeout, make(chan request.RawParams, defaultChanCapacity))
|
|
}
|
|
return r
|
|
}
|
|
|
|
// Run implements oracle.Broadcaster.
|
|
func (r *rpcBroascaster) Run() {
|
|
for _, c := range r.clients {
|
|
go c.run()
|
|
}
|
|
for {
|
|
select {
|
|
case <-r.close:
|
|
return
|
|
case ps := <-r.responses:
|
|
for _, c := range r.clients {
|
|
select {
|
|
case c.responses <- ps:
|
|
default:
|
|
c.log.Error("can't send response, channel is full")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown implements oracle.Broadcaster.
|
|
func (r *rpcBroascaster) Shutdown() {
|
|
close(r.close)
|
|
}
|
|
|
|
// SendResponse implements interfaces.Broadcaster.
|
|
func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) {
|
|
pub := priv.PublicKey()
|
|
data := GetMessage(pub.Bytes(), resp.ID, txSig)
|
|
msgSig := priv.Sign(data)
|
|
params := request.NewRawParams(
|
|
base64.StdEncoding.EncodeToString(pub.Bytes()),
|
|
resp.ID,
|
|
base64.StdEncoding.EncodeToString(txSig),
|
|
base64.StdEncoding.EncodeToString(msgSig),
|
|
)
|
|
r.responses <- params
|
|
}
|
|
|
|
// GetMessage returns data which is signed upon sending response by RPC.
|
|
func GetMessage(pubBytes []byte, reqID uint64, txSig []byte) []byte {
|
|
data := make([]byte, len(pubBytes)+8+len(txSig))
|
|
copy(data, pubBytes)
|
|
binary.LittleEndian.PutUint64(data[len(pubBytes):], uint64(reqID))
|
|
copy(data[len(pubBytes)+8:], txSig)
|
|
return data
|
|
}
|