From 98e2c5568c440692e8c2eedcd0c98feacef5bf28 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 18:27:33 +0300 Subject: [PATCH] rpcsrv: don't init Oracle in New, drop oracle dependency The only thing rpcsrv needs is AddResponse callback. --- pkg/services/oracle/broadcaster/oracle.go | 10 +++---- pkg/services/oracle/oracle.go | 36 +++-------------------- pkg/services/oracle/request.go | 4 +-- pkg/services/rpcsrv/server.go | 13 ++++---- 4 files changed, 18 insertions(+), 45 deletions(-) diff --git a/pkg/services/oracle/broadcaster/oracle.go b/pkg/services/oracle/broadcaster/oracle.go index b14b09e5c..0655580b6 100644 --- a/pkg/services/oracle/broadcaster/oracle.go +++ b/pkg/services/oracle/broadcaster/oracle.go @@ -10,7 +10,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/services/helpers/rpcbroadcaster" - "github.com/nspcc-dev/neo-go/pkg/services/oracle" "go.uber.org/zap" ) @@ -20,16 +19,17 @@ const ( defaultChanCapacity = 16 ) -type oracleBroadcaster struct { +// OracleBroadcaster is an oracle broadcaster implementation. +type OracleBroadcaster struct { rpcbroadcaster.RPCBroadcaster } // New returns a new struct capable of broadcasting oracle responses. -func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster { +func New(cfg config.OracleConfiguration, log *zap.Logger) *OracleBroadcaster { if cfg.ResponseTimeout == 0 { cfg.ResponseTimeout = defaultSendTimeout } - r := &oracleBroadcaster{ + r := &OracleBroadcaster{ RPCBroadcaster: *rpcbroadcaster.NewRPCBroadcaster(log, cfg.ResponseTimeout), } for i := range cfg.Nodes { @@ -40,7 +40,7 @@ func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster { } // SendResponse implements interfaces.Broadcaster. -func (r *oracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { +func (r *OracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { pub := priv.PublicKey() data := GetMessage(pub.Bytes(), resp.ID, txSig) msgSig := priv.Sign(data) diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index a88077dee..96cfdb4dd 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util/slice" @@ -43,9 +44,6 @@ type ( oracleScript []byte verifyOffset int - // mtx protects setting callbacks. - mtx sync.RWMutex - // accMtx protects account and oracle nodes. accMtx sync.RWMutex currAccount *wallet.Account @@ -94,8 +92,6 @@ type ( Shutdown() } - defaultResponseHandler struct{} - // TxCallback executes on new transactions when they are ready to be pooled. TxCallback = func(tx *transaction.Transaction) error ) @@ -165,7 +161,7 @@ func NewOracle(cfg Config) (*Oracle, error) { } if o.ResponseHandler == nil { - o.ResponseHandler = defaultResponseHandler{} + o.ResponseHandler = broadcaster.New(cfg.MainCfg, cfg.Log) } if o.OnTransaction == nil { o.OnTransaction = func(*transaction.Transaction) error { return nil } @@ -192,7 +188,7 @@ func (o *Oracle) Shutdown() { } o.running = false close(o.close) - o.getBroadcaster().Shutdown() + o.ResponseHandler.Shutdown() <-o.done } @@ -217,6 +213,7 @@ func (o *Oracle) start() { for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ { go o.runRequestWorker() } + go o.ResponseHandler.Run() tick := time.NewTicker(o.MainCfg.RefreshInterval) main: @@ -284,28 +281,3 @@ func (o *Oracle) sendTx(tx *transaction.Transaction) { zap.Error(err)) } } - -func (o *Oracle) getBroadcaster() Broadcaster { - o.mtx.RLock() - defer o.mtx.RUnlock() - return o.ResponseHandler -} - -// SetBroadcaster sets callback to broadcast response. -func (o *Oracle) SetBroadcaster(b Broadcaster) { - o.mtx.Lock() - defer o.mtx.Unlock() - o.ResponseHandler.Shutdown() - o.ResponseHandler = b - go b.Run() -} - -// SendResponse implements Broadcaster interface. -func (defaultResponseHandler) SendResponse(*keys.PrivateKey, *transaction.OracleResponse, []byte) { -} - -// Run implements Broadcaster interface. -func (defaultResponseHandler) Run() {} - -// Shutdown implements Broadcaster interface. -func (defaultResponseHandler) Shutdown() {} diff --git a/pkg/services/oracle/request.go b/pkg/services/oracle/request.go index 9eba982ac..1b6ed6197 100644 --- a/pkg/services/oracle/request.go +++ b/pkg/services/oracle/request.go @@ -234,7 +234,7 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error { incTx.attempts++ incTx.Unlock() - o.getBroadcaster().SendResponse(priv, resp, txSig) + o.ResponseHandler.SendResponse(priv, resp, txSig) if ready { o.sendTx(readyTx) } @@ -265,7 +265,7 @@ func (o *Oracle) processFailedRequest(priv *keys.PrivateKey, req request) { txSig := incTx.backupSigs[string(priv.PublicKey().Bytes())].sig incTx.Unlock() - o.getBroadcaster().SendResponse(priv, getFailedResponse(req.ID), txSig) + o.ResponseHandler.SendResponse(priv, getFailedResponse(req.ID), txSig) if ready { o.sendTx(readyTx) } diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index b08968046..64e78656a 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -43,7 +43,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/network" "github.com/nspcc-dev/neo-go/pkg/network/payload" - "github.com/nspcc-dev/neo-go/pkg/services/oracle" "github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster" "github.com/nspcc-dev/neo-go/pkg/services/rpcsrv/params" "github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag" @@ -108,6 +107,11 @@ type ( mempool.Feer // fee interface } + // OracleHandler is the interface oracle service needs to provide for the Server. + OracleHandler interface { + AddResponse(pub *keys.PublicKey, reqID uint64, txSig []byte) + } + // Server represents the JSON-RPC 2.0 server. Server struct { *http.Server @@ -118,7 +122,7 @@ type ( network netmode.Magic stateRootEnabled bool coreServer *network.Server - oracle *oracle.Oracle + oracle OracleHandler log *zap.Logger https *http.Server shutdown chan struct{} @@ -248,7 +252,7 @@ var upgrader = websocket.Upgrader{} // New creates a new Server struct. func New(chain Ledger, conf config.RPC, coreServer *network.Server, - orc *oracle.Oracle, log *zap.Logger, errChan chan error) Server { + orc OracleHandler, log *zap.Logger, errChan chan error) Server { httpServer := &http.Server{ Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10), } @@ -260,9 +264,6 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, } } - if orc != nil { - orc.SetBroadcaster(broadcaster.New(orc.MainCfg, log)) - } protoCfg := chain.GetConfig() if conf.SessionEnabled { if conf.SessionExpirationTime <= 0 {