network/cli: move Oracle service instantiation out of the network

This commit is contained in:
Roman Khimov 2022-01-12 05:01:34 +03:00
parent 5dd4db2c02
commit 0ad3ea5944
6 changed files with 46 additions and 50 deletions

View file

@ -17,6 +17,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network" "github.com/nspcc-dev/neo-go/pkg/network"
"github.com/nspcc-dev/neo-go/pkg/network/metrics" "github.com/nspcc-dev/neo-go/pkg/network/metrics"
"github.com/nspcc-dev/neo-go/pkg/rpc/server" "github.com/nspcc-dev/neo-go/pkg/rpc/server"
"github.com/nspcc-dev/neo-go/pkg/services/oracle"
"github.com/urfave/cli" "github.com/urfave/cli"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
@ -316,6 +317,26 @@ func restoreDB(ctx *cli.Context) error {
return nil return nil
} }
func mkOracle(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) {
if !config.OracleCfg.Enabled {
return nil, nil
}
orcCfg := oracle.Config{
Log: log,
Network: config.Net,
MainCfg: config.OracleCfg,
Chain: chain,
OnTransaction: serv.RelayTxn,
}
orc, err := oracle.NewOracle(orcCfg)
if err != nil {
return nil, fmt.Errorf("can't initialize Oracle module: %w", err)
}
chain.SetOracle(orc)
serv.AddService(orc)
return orc, nil
}
func startServer(ctx *cli.Context) error { func startServer(ctx *cli.Context) error {
cfg, err := getConfigFromContext(ctx) cfg, err := getConfigFromContext(ctx)
if err != nil { if err != nil {
@ -340,7 +361,11 @@ func startServer(ctx *cli.Context) error {
if err != nil { if err != nil {
return cli.NewExitError(fmt.Errorf("failed to create network server: %w", err), 1) return cli.NewExitError(fmt.Errorf("failed to create network server: %w", err), 1)
} }
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, serv.GetOracle(), log) oracleSrv, err := mkOracle(serverConfig, chain, serv, log)
if err != nil {
return err
}
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log)
errChan := make(chan error) errChan := make(chan error)
go serv.Start(errChan) go serv.Start(errChan)
@ -369,7 +394,7 @@ Main:
errChan <- fmt.Errorf("error while restarting rpc-server: %w", serverErr) errChan <- fmt.Errorf("error while restarting rpc-server: %w", serverErr)
break break
} }
rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, serv.GetOracle(), log) rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log)
rpcServer.Start(errChan) rpcServer.Start(errChan)
} }
case <-grace.Done(): case <-grace.Done():

View file

@ -298,7 +298,7 @@ func TestOracleFull(t *testing.T) {
bc := initTestChain(t, nil, nil) bc := initTestChain(t, nil, nil)
acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two") acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two")
mp := bc.GetMemPool() mp := bc.GetMemPool()
orc.OnTransaction = func(tx *transaction.Transaction) { _ = mp.Add(tx, bc) } orc.OnTransaction = func(tx *transaction.Transaction) error { return mp.Add(tx, bc) }
bc.SetOracle(orc) bc.SetOracle(orc)
cs := getOracleContractState(bc.contracts.Oracle.Hash, bc.contracts.Std.Hash) cs := getOracleContractState(bc.contracts.Oracle.Hash, bc.contracts.Std.Hash)
@ -323,7 +323,7 @@ func TestNotYetRunningOracle(t *testing.T) {
bc := initTestChain(t, nil, nil) bc := initTestChain(t, nil, nil)
acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two") acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two")
mp := bc.GetMemPool() mp := bc.GetMemPool()
orc.OnTransaction = func(tx *transaction.Transaction) { _ = mp.Add(tx, bc) } orc.OnTransaction = func(tx *transaction.Transaction) error { return mp.Add(tx, bc) }
bc.SetOracle(orc) bc.SetOracle(orc)
cs := getOracleContractState(bc.contracts.Oracle.Hash, bc.contracts.Std.Hash) cs := getOracleContractState(bc.contracts.Oracle.Hash, bc.contracts.Std.Hash)
@ -394,8 +394,9 @@ type responseWithSig struct {
} }
func saveTxToChan(ch chan *transaction.Transaction) oracle.TxCallback { func saveTxToChan(ch chan *transaction.Transaction) oracle.TxCallback {
return func(tx *transaction.Transaction) { return func(tx *transaction.Transaction) error {
ch <- tx ch <- tx
return nil
} }
} }

View file

@ -25,7 +25,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/notary" "github.com/nspcc-dev/neo-go/pkg/services/notary"
"github.com/nspcc-dev/neo-go/pkg/services/oracle"
"github.com/nspcc-dev/neo-go/pkg/services/stateroot" "github.com/nspcc-dev/neo-go/pkg/services/stateroot"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/atomic" "go.uber.org/atomic"
@ -104,7 +103,6 @@ type (
syncReached *atomic.Bool syncReached *atomic.Bool
oracle *oracle.Oracle
stateRoot stateroot.Service stateRoot stateroot.Service
stateSync blockchainer.StateSync stateSync blockchainer.StateSync
@ -211,29 +209,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
s.stateSync = sSync s.stateSync = sSync
s.bSyncQueue = newBlockQueue(maxBlockBatch, sSync, log, nil) s.bSyncQueue = newBlockQueue(maxBlockBatch, sSync, log, nil)
if config.OracleCfg.Enabled {
orcCfg := oracle.Config{
Log: log,
Network: config.Net,
MainCfg: config.OracleCfg,
Chain: chain,
}
orc, err := oracle.NewOracle(orcCfg)
if err != nil {
return nil, fmt.Errorf("can't initialize Oracle module: %w", err)
}
orc.SetOnTransaction(func(tx *transaction.Transaction) {
if err := s.RelayTxn(tx); err != nil {
orc.Log.Error("can't pool oracle tx",
zap.String("hash", tx.Hash().StringLE()),
zap.Error(err))
}
})
s.oracle = orc
s.services = append(s.services, orc)
chain.SetOracle(orc)
}
if config.Wallet != nil { if config.Wallet != nil {
srv, err := newConsensus(consensus.Config{ srv, err := newConsensus(consensus.Config{
Logger: log, Logger: log,
@ -326,9 +301,9 @@ func (s *Server) Shutdown() {
close(s.quit) close(s.quit)
} }
// GetOracle returns oracle module instance. // AddService allows to add a service to be started/stopped by Server.
func (s *Server) GetOracle() *oracle.Oracle { func (s *Server) AddService(svc Service) {
return s.oracle s.services = append(s.services, svc)
} }
// GetStateRoot returns state root service instance. // GetStateRoot returns state root service instance.

View file

@ -85,7 +85,7 @@ type (
defaultResponseHandler struct{} defaultResponseHandler struct{}
// TxCallback executes on new transactions when they are ready to be pooled. // TxCallback executes on new transactions when they are ready to be pooled.
TxCallback = func(tx *transaction.Transaction) TxCallback = func(tx *transaction.Transaction) error
// URIValidator is used to check if provided URL is valid. // URIValidator is used to check if provided URL is valid.
URIValidator = func(*url.URL) error URIValidator = func(*url.URL) error
) )
@ -156,7 +156,7 @@ func NewOracle(cfg Config) (*Oracle, error) {
o.ResponseHandler = defaultResponseHandler{} o.ResponseHandler = defaultResponseHandler{}
} }
if o.OnTransaction == nil { if o.OnTransaction == nil {
o.OnTransaction = func(*transaction.Transaction) {} o.OnTransaction = func(*transaction.Transaction) error { return nil }
} }
if o.URIValidator == nil { if o.URIValidator == nil {
o.URIValidator = defaultURIValidator o.URIValidator = defaultURIValidator
@ -239,17 +239,12 @@ func (o *Oracle) UpdateNativeContract(script, resp []byte, h util.Uint160, verif
o.verifyOffset = verifyOffset o.verifyOffset = verifyOffset
} }
func (o *Oracle) getOnTransaction() TxCallback { func (o *Oracle) sendTx(tx *transaction.Transaction) {
o.mtx.RLock() if err := o.OnTransaction(tx); err != nil {
defer o.mtx.RUnlock() o.Log.Error("can't pool oracle tx",
return o.OnTransaction zap.String("hash", tx.Hash().StringLE()),
} zap.Error(err))
}
// SetOnTransaction sets callback to pool and broadcast tx.
func (o *Oracle) SetOnTransaction(cb TxCallback) {
o.mtx.Lock()
defer o.mtx.Unlock()
o.OnTransaction = cb
} }
func (o *Oracle) getBroadcaster() Broadcaster { func (o *Oracle) getBroadcaster() Broadcaster {

View file

@ -240,7 +240,7 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error {
o.getBroadcaster().SendResponse(priv, resp, txSig) o.getBroadcaster().SendResponse(priv, resp, txSig)
if ready { if ready {
o.getOnTransaction()(readyTx) o.sendTx(readyTx)
} }
return nil return nil
} }
@ -253,7 +253,7 @@ func (o *Oracle) processFailedRequest(priv *keys.PrivateKey, req request) {
return return
} else if incTx.isSent { } else if incTx.isSent {
// Tx was sent but not yet persisted. Try to pool it again. // Tx was sent but not yet persisted. Try to pool it again.
o.getOnTransaction()(incTx.tx) o.sendTx(incTx.tx)
return return
} }
@ -271,7 +271,7 @@ func (o *Oracle) processFailedRequest(priv *keys.PrivateKey, req request) {
o.getBroadcaster().SendResponse(priv, getFailedResponse(req.ID), txSig) o.getBroadcaster().SendResponse(priv, getFailedResponse(req.ID), txSig)
if ready { if ready {
o.getOnTransaction()(readyTx) o.sendTx(readyTx)
} }
} }

View file

@ -59,7 +59,7 @@ func (o *Oracle) AddResponse(pub *keys.PublicKey, reqID uint64, txSig []byte) {
incTx.Unlock() incTx.Unlock()
if ready { if ready {
o.getOnTransaction()(readyTx) o.sendTx(readyTx)
} }
} }