diff --git a/cli/server/server.go b/cli/server/server.go index 1a37ca997..957b1fa3b 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -17,6 +17,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network" "github.com/nspcc-dev/neo-go/pkg/network/metrics" "github.com/nspcc-dev/neo-go/pkg/rpc/server" + "github.com/nspcc-dev/neo-go/pkg/services/oracle" "github.com/urfave/cli" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -316,6 +317,26 @@ func restoreDB(ctx *cli.Context) error { return nil } +func mkOracle(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) { + if !config.OracleCfg.Enabled { + return nil, nil + } + orcCfg := oracle.Config{ + Log: log, + Network: config.Net, + MainCfg: config.OracleCfg, + Chain: chain, + OnTransaction: serv.RelayTxn, + } + orc, err := oracle.NewOracle(orcCfg) + if err != nil { + return nil, fmt.Errorf("can't initialize Oracle module: %w", err) + } + chain.SetOracle(orc) + serv.AddService(orc) + return orc, nil +} + func startServer(ctx *cli.Context) error { cfg, err := getConfigFromContext(ctx) if err != nil { @@ -340,7 +361,11 @@ func startServer(ctx *cli.Context) error { if err != nil { return cli.NewExitError(fmt.Errorf("failed to create network server: %w", err), 1) } - rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, serv.GetOracle(), log) + oracleSrv, err := mkOracle(serverConfig, chain, serv, log) + if err != nil { + return err + } + rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) errChan := make(chan error) go serv.Start(errChan) @@ -369,7 +394,7 @@ Main: errChan <- fmt.Errorf("error while restarting rpc-server: %w", serverErr) break } - rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, serv.GetOracle(), log) + rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) rpcServer.Start(errChan) } case <-grace.Done(): diff --git a/pkg/core/oracle_test.go b/pkg/core/oracle_test.go index 7f21650d2..fd5f1ebfc 100644 --- a/pkg/core/oracle_test.go +++ b/pkg/core/oracle_test.go @@ -298,7 +298,7 @@ func TestOracleFull(t *testing.T) { bc := initTestChain(t, nil, nil) acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two") mp := bc.GetMemPool() - orc.OnTransaction = func(tx *transaction.Transaction) { _ = mp.Add(tx, bc) } + orc.OnTransaction = func(tx *transaction.Transaction) error { return mp.Add(tx, bc) } bc.SetOracle(orc) cs := getOracleContractState(bc.contracts.Oracle.Hash, bc.contracts.Std.Hash) @@ -323,7 +323,7 @@ func TestNotYetRunningOracle(t *testing.T) { bc := initTestChain(t, nil, nil) acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two") mp := bc.GetMemPool() - orc.OnTransaction = func(tx *transaction.Transaction) { _ = mp.Add(tx, bc) } + orc.OnTransaction = func(tx *transaction.Transaction) error { return mp.Add(tx, bc) } bc.SetOracle(orc) cs := getOracleContractState(bc.contracts.Oracle.Hash, bc.contracts.Std.Hash) @@ -394,8 +394,9 @@ type responseWithSig struct { } func saveTxToChan(ch chan *transaction.Transaction) oracle.TxCallback { - return func(tx *transaction.Transaction) { + return func(tx *transaction.Transaction) error { ch <- tx + return nil } } diff --git a/pkg/network/server.go b/pkg/network/server.go index 1fa88eda7..6353587cb 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -25,7 +25,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/services/notary" - "github.com/nspcc-dev/neo-go/pkg/services/oracle" "github.com/nspcc-dev/neo-go/pkg/services/stateroot" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" @@ -104,7 +103,6 @@ type ( syncReached *atomic.Bool - oracle *oracle.Oracle stateRoot stateroot.Service stateSync blockchainer.StateSync @@ -211,29 +209,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai s.stateSync = sSync s.bSyncQueue = newBlockQueue(maxBlockBatch, sSync, log, nil) - if config.OracleCfg.Enabled { - orcCfg := oracle.Config{ - Log: log, - Network: config.Net, - MainCfg: config.OracleCfg, - Chain: chain, - } - orc, err := oracle.NewOracle(orcCfg) - if err != nil { - return nil, fmt.Errorf("can't initialize Oracle module: %w", err) - } - orc.SetOnTransaction(func(tx *transaction.Transaction) { - if err := s.RelayTxn(tx); err != nil { - orc.Log.Error("can't pool oracle tx", - zap.String("hash", tx.Hash().StringLE()), - zap.Error(err)) - } - }) - s.oracle = orc - s.services = append(s.services, orc) - chain.SetOracle(orc) - } - if config.Wallet != nil { srv, err := newConsensus(consensus.Config{ Logger: log, @@ -326,9 +301,9 @@ func (s *Server) Shutdown() { close(s.quit) } -// GetOracle returns oracle module instance. -func (s *Server) GetOracle() *oracle.Oracle { - return s.oracle +// AddService allows to add a service to be started/stopped by Server. +func (s *Server) AddService(svc Service) { + s.services = append(s.services, svc) } // GetStateRoot returns state root service instance. diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index d0b37b326..c5f570a78 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -85,7 +85,7 @@ type ( defaultResponseHandler struct{} // TxCallback executes on new transactions when they are ready to be pooled. - TxCallback = func(tx *transaction.Transaction) + TxCallback = func(tx *transaction.Transaction) error // URIValidator is used to check if provided URL is valid. URIValidator = func(*url.URL) error ) @@ -156,7 +156,7 @@ func NewOracle(cfg Config) (*Oracle, error) { o.ResponseHandler = defaultResponseHandler{} } if o.OnTransaction == nil { - o.OnTransaction = func(*transaction.Transaction) {} + o.OnTransaction = func(*transaction.Transaction) error { return nil } } if o.URIValidator == nil { o.URIValidator = defaultURIValidator @@ -239,17 +239,12 @@ func (o *Oracle) UpdateNativeContract(script, resp []byte, h util.Uint160, verif o.verifyOffset = verifyOffset } -func (o *Oracle) getOnTransaction() TxCallback { - o.mtx.RLock() - defer o.mtx.RUnlock() - return o.OnTransaction -} - -// SetOnTransaction sets callback to pool and broadcast tx. -func (o *Oracle) SetOnTransaction(cb TxCallback) { - o.mtx.Lock() - defer o.mtx.Unlock() - o.OnTransaction = cb +func (o *Oracle) sendTx(tx *transaction.Transaction) { + if err := o.OnTransaction(tx); err != nil { + o.Log.Error("can't pool oracle tx", + zap.String("hash", tx.Hash().StringLE()), + zap.Error(err)) + } } func (o *Oracle) getBroadcaster() Broadcaster { diff --git a/pkg/services/oracle/request.go b/pkg/services/oracle/request.go index fea5c61b3..360d84276 100644 --- a/pkg/services/oracle/request.go +++ b/pkg/services/oracle/request.go @@ -240,7 +240,7 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error { o.getBroadcaster().SendResponse(priv, resp, txSig) if ready { - o.getOnTransaction()(readyTx) + o.sendTx(readyTx) } return nil } @@ -253,7 +253,7 @@ func (o *Oracle) processFailedRequest(priv *keys.PrivateKey, req request) { return } else if incTx.isSent { // Tx was sent but not yet persisted. Try to pool it again. - o.getOnTransaction()(incTx.tx) + o.sendTx(incTx.tx) return } @@ -271,7 +271,7 @@ func (o *Oracle) processFailedRequest(priv *keys.PrivateKey, req request) { o.getBroadcaster().SendResponse(priv, getFailedResponse(req.ID), txSig) if ready { - o.getOnTransaction()(readyTx) + o.sendTx(readyTx) } } diff --git a/pkg/services/oracle/response.go b/pkg/services/oracle/response.go index d6525d2f7..fdbef6afa 100644 --- a/pkg/services/oracle/response.go +++ b/pkg/services/oracle/response.go @@ -59,7 +59,7 @@ func (o *Oracle) AddResponse(pub *keys.PublicKey, reqID uint64, txSig []byte) { incTx.Unlock() if ready { - o.getOnTransaction()(readyTx) + o.sendTx(readyTx) } }