From 2adcf406d3fcd92d4f002f3b05a9f08155822e94 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 21:36:37 +0300 Subject: [PATCH] cli: reload Oracle service on USR1 Which allows to enable/disable the service, change nodes, keys and other settings. Unfortunately, atomic.Value doesn't allow Store(nil), so we have to store a pointer there that can point to nil interface. --- cli/server/server.go | 37 ++++++++++++++++++++++-------- pkg/core/blockchain.go | 28 ++++++++++++++++------ pkg/core/native/designate.go | 4 ++-- pkg/core/native/oracle.go | 34 ++++++++++++++------------- pkg/services/oracle/oracle.go | 1 + pkg/services/oracle/oracle_test.go | 2 +- pkg/services/rpcsrv/server.go | 18 +++++++++++---- 7 files changed, 85 insertions(+), 39 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index 42b699bd8..dc99df1e9 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -12,6 +12,7 @@ import ( "github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core/block" @@ -385,14 +386,14 @@ func restoreDB(ctx *cli.Context) error { return nil } -func mkOracle(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) { - if !config.OracleCfg.Enabled { +func mkOracle(config config.OracleConfiguration, magic netmode.Magic, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) { + if !config.Enabled { return nil, nil } orcCfg := oracle.Config{ Log: log, - Network: config.Net, - MainCfg: config.OracleCfg, + Network: magic, + MainCfg: config, Chain: chain, OnTransaction: serv.RelayTxn, } @@ -492,7 +493,7 @@ func startServer(ctx *cli.Context) error { } serv.AddExtensibleService(sr, stateroot.Category, sr.OnPayload) - oracleSrv, err := mkOracle(serverConfig, chain, serv, log) + oracleSrv, err := mkOracle(cfg.ApplicationConfiguration.Oracle, cfg.ProtocolConfiguration.Magic, chain, serv, log) if err != nil { return cli.NewExitError(err, 1) } @@ -513,8 +514,9 @@ func startServer(ctx *cli.Context) error { rpcServer.Start() } - sighupCh := make(chan os.Signal, 1) - signal.Notify(sighupCh, syscall.SIGHUP) + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGHUP) + signal.Notify(sigCh, syscall.SIGUSR1) fmt.Fprintln(ctx.App.Writer, Logo()) fmt.Fprintln(ctx.App.Writer, serv.UserAgent) @@ -527,7 +529,7 @@ Main: case err := <-errChan: shutdownErr = fmt.Errorf("server error: %w", err) cancel() - case sig := <-sighupCh: + case sig := <-sigCh: log.Info("signal received", zap.Stringer("name", sig)) cfgnew, err := getConfigFromContext(ctx) if err != nil { @@ -557,10 +559,27 @@ Main: prometheus.ShutDown() prometheus = metrics.NewPrometheusService(cfgnew.ApplicationConfiguration.Prometheus, log) go prometheus.Start() + case syscall.SIGUSR1: + if oracleSrv != nil { + chain.SetOracle(nil) + rpcServer.SetOracleHandler(nil) + oracleSrv.Shutdown() + } + oracleSrv, err = mkOracle(cfgnew.ApplicationConfiguration.Oracle, cfgnew.ProtocolConfiguration.Magic, chain, serv, log) + if err != nil { + log.Error("failed to create oracle service", zap.Error(err)) + break // Keep going. + } + if oracleSrv != nil { + rpcServer.SetOracleHandler(oracleSrv) + if serv.IsInSync() { + oracleSrv.Start() + } + } } cfg = cfgnew case <-grace.Done(): - signal.Stop(sighupCh) + signal.Stop(sigCh) serv.Shutdown() break Main } diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index ca47cb7f2..0247518aa 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -303,14 +303,28 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L // must be called before `bc.Run()` to avoid data race. func (bc *Blockchain) SetOracle(mod native.OracleService) { orc := bc.contracts.Oracle - md, ok := orc.GetMethod(manifest.MethodVerify, -1) - if !ok { - panic(fmt.Errorf("%s method not found", manifest.MethodVerify)) + if mod != nil { + md, ok := orc.GetMethod(manifest.MethodVerify, -1) + if !ok { + panic(fmt.Errorf("%s method not found", manifest.MethodVerify)) + } + mod.UpdateNativeContract(orc.NEF.Script, orc.GetOracleResponseScript(), + orc.Hash, md.MD.Offset) + keys, _, err := bc.contracts.Designate.GetDesignatedByRole(bc.dao, noderoles.Oracle, bc.BlockHeight()) + if err != nil { + bc.log.Error("failed to get oracle key list") + return + } + mod.UpdateOracleNodes(keys) + reqs, err := bc.contracts.Oracle.GetRequests(bc.dao) + if err != nil { + bc.log.Error("failed to get current oracle request list") + return + } + mod.AddRequests(reqs) } - mod.UpdateNativeContract(orc.NEF.Script, orc.GetOracleResponseScript(), - orc.Hash, md.MD.Offset) - orc.Module.Store(mod) - bc.contracts.Designate.OracleService.Store(mod) + orc.Module.Store(&mod) + bc.contracts.Designate.OracleService.Store(&mod) } // SetNotary sets notary module. It doesn't protected by mutex and diff --git a/pkg/core/native/designate.go b/pkg/core/native/designate.go index 492e8849f..8427e5fc8 100644 --- a/pkg/core/native/designate.go +++ b/pkg/core/native/designate.go @@ -238,8 +238,8 @@ func (s *Designate) updateCachedRoleData(cache *DesignationCache, d *dao.Simple, func (s *Designate) notifyRoleChanged(v *roleData, r noderoles.Role) { switch r { case noderoles.Oracle: - if orc, _ := s.OracleService.Load().(OracleService); orc != nil { - orc.UpdateOracleNodes(v.nodes.Copy()) + if orc, _ := s.OracleService.Load().(*OracleService); orc != nil && *orc != nil { + (*orc).UpdateOracleNodes(v.nodes.Copy()) } case noderoles.P2PNotary: if ntr, _ := s.NotaryService.Load().(NotaryService); ntr != nil { diff --git a/pkg/core/native/oracle.go b/pkg/core/native/oracle.go index a31eb929d..62ac80b00 100644 --- a/pkg/core/native/oracle.go +++ b/pkg/core/native/oracle.go @@ -113,7 +113,10 @@ func copyOracleCache(src, dst *OracleCache) { } func newOracle() *Oracle { - o := &Oracle{ContractMD: *interop.NewContractMD(nativenames.Oracle, oracleContractID)} + o := &Oracle{ + ContractMD: *interop.NewContractMD(nativenames.Oracle, oracleContractID), + newRequests: make(map[uint64]*state.OracleRequest), + } defer o.UpdateHash() o.oracleScript = CreateOracleResponseScript(o.Hash) @@ -161,11 +164,7 @@ func (o *Oracle) GetOracleResponseScript() []byte { // OnPersist implements the Contract interface. func (o *Oracle) OnPersist(ic *interop.Context) error { - var err error - if o.newRequests == nil { - o.newRequests, err = o.getRequests(ic.DAO) - } - return err + return nil } // PostPersist represents `postPersist` method. @@ -177,7 +176,7 @@ func (o *Oracle) PostPersist(ic *interop.Context) error { single := big.NewInt(p) var removedIDs []uint64 - orc, _ := o.Module.Load().(OracleService) + orc, _ := o.Module.Load().(*OracleService) for _, tx := range ic.Block.Transactions { resp := getResponse(tx) if resp == nil { @@ -189,7 +188,7 @@ func (o *Oracle) PostPersist(ic *interop.Context) error { continue } ic.DAO.DeleteStorageItem(o.ID, reqKey) - if orc != nil { + if orc != nil && *orc != nil { removedIDs = append(removedIDs, resp.ID) } @@ -229,8 +228,8 @@ func (o *Oracle) PostPersist(ic *interop.Context) error { o.GAS.mint(ic, nodes[i].GetScriptHash(), &reward[i], false) } - if len(removedIDs) != 0 && orc != nil { - orc.RemoveRequests(removedIDs) + if len(removedIDs) != 0 { + (*orc).RemoveRequests(removedIDs) } return o.updateCache(ic.DAO) } @@ -415,7 +414,10 @@ func (o *Oracle) PutRequestInternal(id uint64, req *state.OracleRequest, d *dao. if err := putConvertibleToDAO(o.ID, d, reqKey, req); err != nil { return err } - o.newRequests[id] = req + orc, _ := o.Module.Load().(*OracleService) + if orc != nil && *orc != nil { + o.newRequests[id] = req + } // Add request ID to the id list. lst := new(IDList) @@ -493,8 +495,8 @@ func (o *Oracle) getOriginalTxID(d *dao.Simple, tx *transaction.Transaction) uti return tx.Hash() } -// getRequests returns all requests which have not been finished yet. -func (o *Oracle) getRequests(d *dao.Simple) (map[uint64]*state.OracleRequest, error) { +// GetRequests returns all requests which have not been finished yet. +func (o *Oracle) GetRequests(d *dao.Simple) (map[uint64]*state.OracleRequest, error) { var reqs = make(map[uint64]*state.OracleRequest) var err error d.Seek(o.ID, storage.SeekRange{Prefix: prefixRequest}, func(k, v []byte) bool { @@ -534,8 +536,8 @@ func (o *Oracle) getConvertibleFromDAO(d *dao.Simple, key []byte, item stackitem // updateCache updates cached Oracle values if they've been changed. func (o *Oracle) updateCache(d *dao.Simple) error { - orc, _ := o.Module.Load().(OracleService) - if orc == nil { + orc, _ := o.Module.Load().(*OracleService) + if orc == nil || *orc == nil { return nil } @@ -547,7 +549,7 @@ func (o *Oracle) updateCache(d *dao.Simple) error { delete(reqs, id) } } - orc.AddRequests(reqs) + (*orc).AddRequests(reqs) return nil } diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 96cfdb4dd..d765db731 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -186,6 +186,7 @@ func (o *Oracle) Shutdown() { if !o.running { return } + o.Log.Info("stopping oracle service") o.running = false close(o.close) o.ResponseHandler.Shutdown() diff --git a/pkg/services/oracle/oracle_test.go b/pkg/services/oracle/oracle_test.go index f8fc6c023..f30aebbb1 100644 --- a/pkg/services/oracle/oracle_test.go +++ b/pkg/services/oracle/oracle_test.go @@ -121,8 +121,8 @@ func TestCreateResponseTx(t *testing.T) { Result: []byte{0}, } cInvoker.Invoke(t, stackitem.Null{}, "requestURL", req.URL, *req.Filter, req.CallbackMethod, req.UserData, int64(req.GasForResponse)) - orc.UpdateOracleNodes(keys.PublicKeys{acc.PrivateKey().PublicKey()}) bc.SetOracle(orc) + orc.UpdateOracleNodes(keys.PublicKeys{acc.PrivateKey().PublicKey()}) tx, err = orc.CreateResponseTx(int64(req.GasForResponse), 1, resp) require.NoError(t, err) assert.Equal(t, 166, tx.Size()) diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 64e78656a..b7534c13b 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -122,7 +122,7 @@ type ( network netmode.Magic stateRootEnabled bool coreServer *network.Server - oracle OracleHandler + oracle *atomic.Value log *zap.Logger https *http.Server shutdown chan struct{} @@ -275,6 +275,10 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, log.Info("SessionPoolSize is not set or wrong, setting default value", zap.Int("SessionPoolSize", defaultSessionPoolSize)) } } + var oracleWrapped = new(atomic.Value) + if orc != nil { + oracleWrapped.Store(&orc) + } return Server{ Server: httpServer, chain: chain, @@ -284,7 +288,7 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, stateRootEnabled: protoCfg.StateRootInHeader, coreServer: coreServer, log: log, - oracle: orc, + oracle: oracleWrapped, https: tlsServer, shutdown: make(chan struct{}), started: atomic.NewBool(false), @@ -400,6 +404,11 @@ func (s *Server) Shutdown() { <-s.executionCh } +// SetOracleHandler allows to update oracle handler used by the Server. +func (s *Server) SetOracleHandler(orc OracleHandler) { + s.oracle.Store(&orc) +} + func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) { req := params.NewRequest() @@ -2328,7 +2337,8 @@ func getRelayResult(err error, hash util.Uint256) (interface{}, *neorpc.Error) { } func (s *Server) submitOracleResponse(ps params.Params) (interface{}, *neorpc.Error) { - if s.oracle == nil { + oracle := s.oracle.Load().(*OracleHandler) + if oracle == nil || *oracle == nil { return nil, neorpc.NewRPCError("Oracle is not enabled", "") } var pub *keys.PublicKey @@ -2355,7 +2365,7 @@ func (s *Server) submitOracleResponse(ps params.Params) (interface{}, *neorpc.Er if !pub.Verify(msgSig, hash.Sha256(data).BytesBE()) { return nil, neorpc.NewRPCError("Invalid request signature", "") } - s.oracle.AddResponse(pub, uint64(reqID), txSig) + (*oracle).AddResponse(pub, uint64(reqID), txSig) return json.RawMessage([]byte("{}")), nil }