rpcsrv: don't init Oracle in New, drop oracle dependency

The only thing rpcsrv needs is AddResponse callback.
This commit is contained in:
Roman Khimov 2022-07-26 18:27:33 +03:00
parent df24c1268e
commit 98e2c5568c
4 changed files with 18 additions and 45 deletions

View file

@ -10,7 +10,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/rpcbroadcaster" "github.com/nspcc-dev/neo-go/pkg/services/helpers/rpcbroadcaster"
"github.com/nspcc-dev/neo-go/pkg/services/oracle"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -20,16 +19,17 @@ const (
defaultChanCapacity = 16 defaultChanCapacity = 16
) )
type oracleBroadcaster struct { // OracleBroadcaster is an oracle broadcaster implementation.
type OracleBroadcaster struct {
rpcbroadcaster.RPCBroadcaster rpcbroadcaster.RPCBroadcaster
} }
// New returns a new struct capable of broadcasting oracle responses. // New returns a new struct capable of broadcasting oracle responses.
func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster { func New(cfg config.OracleConfiguration, log *zap.Logger) *OracleBroadcaster {
if cfg.ResponseTimeout == 0 { if cfg.ResponseTimeout == 0 {
cfg.ResponseTimeout = defaultSendTimeout cfg.ResponseTimeout = defaultSendTimeout
} }
r := &oracleBroadcaster{ r := &OracleBroadcaster{
RPCBroadcaster: *rpcbroadcaster.NewRPCBroadcaster(log, cfg.ResponseTimeout), RPCBroadcaster: *rpcbroadcaster.NewRPCBroadcaster(log, cfg.ResponseTimeout),
} }
for i := range cfg.Nodes { for i := range cfg.Nodes {
@ -40,7 +40,7 @@ func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster {
} }
// SendResponse implements interfaces.Broadcaster. // SendResponse implements interfaces.Broadcaster.
func (r *oracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { func (r *OracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) {
pub := priv.PublicKey() pub := priv.PublicKey()
data := GetMessage(pub.Bytes(), resp.ID, txSig) data := GetMessage(pub.Bytes(), resp.ID, txSig)
msgSig := priv.Sign(data) msgSig := priv.Sign(data)

View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/util/slice"
@ -43,9 +44,6 @@ type (
oracleScript []byte oracleScript []byte
verifyOffset int verifyOffset int
// mtx protects setting callbacks.
mtx sync.RWMutex
// accMtx protects account and oracle nodes. // accMtx protects account and oracle nodes.
accMtx sync.RWMutex accMtx sync.RWMutex
currAccount *wallet.Account currAccount *wallet.Account
@ -94,8 +92,6 @@ type (
Shutdown() Shutdown()
} }
defaultResponseHandler struct{}
// TxCallback executes on new transactions when they are ready to be pooled. // TxCallback executes on new transactions when they are ready to be pooled.
TxCallback = func(tx *transaction.Transaction) error TxCallback = func(tx *transaction.Transaction) error
) )
@ -165,7 +161,7 @@ func NewOracle(cfg Config) (*Oracle, error) {
} }
if o.ResponseHandler == nil { if o.ResponseHandler == nil {
o.ResponseHandler = defaultResponseHandler{} o.ResponseHandler = broadcaster.New(cfg.MainCfg, cfg.Log)
} }
if o.OnTransaction == nil { if o.OnTransaction == nil {
o.OnTransaction = func(*transaction.Transaction) error { return nil } o.OnTransaction = func(*transaction.Transaction) error { return nil }
@ -192,7 +188,7 @@ func (o *Oracle) Shutdown() {
} }
o.running = false o.running = false
close(o.close) close(o.close)
o.getBroadcaster().Shutdown() o.ResponseHandler.Shutdown()
<-o.done <-o.done
} }
@ -217,6 +213,7 @@ func (o *Oracle) start() {
for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ { for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ {
go o.runRequestWorker() go o.runRequestWorker()
} }
go o.ResponseHandler.Run()
tick := time.NewTicker(o.MainCfg.RefreshInterval) tick := time.NewTicker(o.MainCfg.RefreshInterval)
main: main:
@ -284,28 +281,3 @@ func (o *Oracle) sendTx(tx *transaction.Transaction) {
zap.Error(err)) zap.Error(err))
} }
} }
func (o *Oracle) getBroadcaster() Broadcaster {
o.mtx.RLock()
defer o.mtx.RUnlock()
return o.ResponseHandler
}
// SetBroadcaster sets callback to broadcast response.
func (o *Oracle) SetBroadcaster(b Broadcaster) {
o.mtx.Lock()
defer o.mtx.Unlock()
o.ResponseHandler.Shutdown()
o.ResponseHandler = b
go b.Run()
}
// SendResponse implements Broadcaster interface.
func (defaultResponseHandler) SendResponse(*keys.PrivateKey, *transaction.OracleResponse, []byte) {
}
// Run implements Broadcaster interface.
func (defaultResponseHandler) Run() {}
// Shutdown implements Broadcaster interface.
func (defaultResponseHandler) Shutdown() {}

View file

@ -234,7 +234,7 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error {
incTx.attempts++ incTx.attempts++
incTx.Unlock() incTx.Unlock()
o.getBroadcaster().SendResponse(priv, resp, txSig) o.ResponseHandler.SendResponse(priv, resp, txSig)
if ready { if ready {
o.sendTx(readyTx) o.sendTx(readyTx)
} }
@ -265,7 +265,7 @@ func (o *Oracle) processFailedRequest(priv *keys.PrivateKey, req request) {
txSig := incTx.backupSigs[string(priv.PublicKey().Bytes())].sig txSig := incTx.backupSigs[string(priv.PublicKey().Bytes())].sig
incTx.Unlock() incTx.Unlock()
o.getBroadcaster().SendResponse(priv, getFailedResponse(req.ID), txSig) o.ResponseHandler.SendResponse(priv, getFailedResponse(req.ID), txSig)
if ready { if ready {
o.sendTx(readyTx) o.sendTx(readyTx)
} }

View file

@ -43,7 +43,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/network" "github.com/nspcc-dev/neo-go/pkg/network"
"github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/oracle"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster" "github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster"
"github.com/nspcc-dev/neo-go/pkg/services/rpcsrv/params" "github.com/nspcc-dev/neo-go/pkg/services/rpcsrv/params"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag" "github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
@ -108,6 +107,11 @@ type (
mempool.Feer // fee interface mempool.Feer // fee interface
} }
// OracleHandler is the interface oracle service needs to provide for the Server.
OracleHandler interface {
AddResponse(pub *keys.PublicKey, reqID uint64, txSig []byte)
}
// Server represents the JSON-RPC 2.0 server. // Server represents the JSON-RPC 2.0 server.
Server struct { Server struct {
*http.Server *http.Server
@ -118,7 +122,7 @@ type (
network netmode.Magic network netmode.Magic
stateRootEnabled bool stateRootEnabled bool
coreServer *network.Server coreServer *network.Server
oracle *oracle.Oracle oracle OracleHandler
log *zap.Logger log *zap.Logger
https *http.Server https *http.Server
shutdown chan struct{} shutdown chan struct{}
@ -248,7 +252,7 @@ var upgrader = websocket.Upgrader{}
// New creates a new Server struct. // New creates a new Server struct.
func New(chain Ledger, conf config.RPC, coreServer *network.Server, func New(chain Ledger, conf config.RPC, coreServer *network.Server,
orc *oracle.Oracle, log *zap.Logger, errChan chan error) Server { orc OracleHandler, log *zap.Logger, errChan chan error) Server {
httpServer := &http.Server{ httpServer := &http.Server{
Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10), Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10),
} }
@ -260,9 +264,6 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
} }
} }
if orc != nil {
orc.SetBroadcaster(broadcaster.New(orc.MainCfg, log))
}
protoCfg := chain.GetConfig() protoCfg := chain.GetConfig()
if conf.SessionEnabled { if conf.SessionEnabled {
if conf.SessionExpirationTime <= 0 { if conf.SessionExpirationTime <= 0 {