cli: reload Oracle service on USR1

Which allows to enable/disable the service, change nodes, keys and other
settings. Unfortunately, atomic.Value doesn't allow Store(nil), so we have to
store a pointer there that can point to nil interface.
This commit is contained in:
Roman Khimov 2022-07-26 21:36:37 +03:00
parent 98e2c5568c
commit 2adcf406d3
7 changed files with 85 additions and 39 deletions

View file

@ -12,6 +12,7 @@ import (
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/consensus"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block"
@ -385,14 +386,14 @@ func restoreDB(ctx *cli.Context) error {
return nil
}
func mkOracle(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) {
if !config.OracleCfg.Enabled {
func mkOracle(config config.OracleConfiguration, magic netmode.Magic, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) {
if !config.Enabled {
return nil, nil
}
orcCfg := oracle.Config{
Log: log,
Network: config.Net,
MainCfg: config.OracleCfg,
Network: magic,
MainCfg: config,
Chain: chain,
OnTransaction: serv.RelayTxn,
}
@ -492,7 +493,7 @@ func startServer(ctx *cli.Context) error {
}
serv.AddExtensibleService(sr, stateroot.Category, sr.OnPayload)
oracleSrv, err := mkOracle(serverConfig, chain, serv, log)
oracleSrv, err := mkOracle(cfg.ApplicationConfiguration.Oracle, cfg.ProtocolConfiguration.Magic, chain, serv, log)
if err != nil {
return cli.NewExitError(err, 1)
}
@ -513,8 +514,9 @@ func startServer(ctx *cli.Context) error {
rpcServer.Start()
}
sighupCh := make(chan os.Signal, 1)
signal.Notify(sighupCh, syscall.SIGHUP)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP)
signal.Notify(sigCh, syscall.SIGUSR1)
fmt.Fprintln(ctx.App.Writer, Logo())
fmt.Fprintln(ctx.App.Writer, serv.UserAgent)
@ -527,7 +529,7 @@ Main:
case err := <-errChan:
shutdownErr = fmt.Errorf("server error: %w", err)
cancel()
case sig := <-sighupCh:
case sig := <-sigCh:
log.Info("signal received", zap.Stringer("name", sig))
cfgnew, err := getConfigFromContext(ctx)
if err != nil {
@ -557,10 +559,27 @@ Main:
prometheus.ShutDown()
prometheus = metrics.NewPrometheusService(cfgnew.ApplicationConfiguration.Prometheus, log)
go prometheus.Start()
case syscall.SIGUSR1:
if oracleSrv != nil {
chain.SetOracle(nil)
rpcServer.SetOracleHandler(nil)
oracleSrv.Shutdown()
}
oracleSrv, err = mkOracle(cfgnew.ApplicationConfiguration.Oracle, cfgnew.ProtocolConfiguration.Magic, chain, serv, log)
if err != nil {
log.Error("failed to create oracle service", zap.Error(err))
break // Keep going.
}
if oracleSrv != nil {
rpcServer.SetOracleHandler(oracleSrv)
if serv.IsInSync() {
oracleSrv.Start()
}
}
}
cfg = cfgnew
case <-grace.Done():
signal.Stop(sighupCh)
signal.Stop(sigCh)
serv.Shutdown()
break Main
}

View file

@ -303,14 +303,28 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
// must be called before `bc.Run()` to avoid data race.
func (bc *Blockchain) SetOracle(mod native.OracleService) {
orc := bc.contracts.Oracle
if mod != nil {
md, ok := orc.GetMethod(manifest.MethodVerify, -1)
if !ok {
panic(fmt.Errorf("%s method not found", manifest.MethodVerify))
}
mod.UpdateNativeContract(orc.NEF.Script, orc.GetOracleResponseScript(),
orc.Hash, md.MD.Offset)
orc.Module.Store(mod)
bc.contracts.Designate.OracleService.Store(mod)
keys, _, err := bc.contracts.Designate.GetDesignatedByRole(bc.dao, noderoles.Oracle, bc.BlockHeight())
if err != nil {
bc.log.Error("failed to get oracle key list")
return
}
mod.UpdateOracleNodes(keys)
reqs, err := bc.contracts.Oracle.GetRequests(bc.dao)
if err != nil {
bc.log.Error("failed to get current oracle request list")
return
}
mod.AddRequests(reqs)
}
orc.Module.Store(&mod)
bc.contracts.Designate.OracleService.Store(&mod)
}
// SetNotary sets notary module. It doesn't protected by mutex and

View file

@ -238,8 +238,8 @@ func (s *Designate) updateCachedRoleData(cache *DesignationCache, d *dao.Simple,
func (s *Designate) notifyRoleChanged(v *roleData, r noderoles.Role) {
switch r {
case noderoles.Oracle:
if orc, _ := s.OracleService.Load().(OracleService); orc != nil {
orc.UpdateOracleNodes(v.nodes.Copy())
if orc, _ := s.OracleService.Load().(*OracleService); orc != nil && *orc != nil {
(*orc).UpdateOracleNodes(v.nodes.Copy())
}
case noderoles.P2PNotary:
if ntr, _ := s.NotaryService.Load().(NotaryService); ntr != nil {

View file

@ -113,7 +113,10 @@ func copyOracleCache(src, dst *OracleCache) {
}
func newOracle() *Oracle {
o := &Oracle{ContractMD: *interop.NewContractMD(nativenames.Oracle, oracleContractID)}
o := &Oracle{
ContractMD: *interop.NewContractMD(nativenames.Oracle, oracleContractID),
newRequests: make(map[uint64]*state.OracleRequest),
}
defer o.UpdateHash()
o.oracleScript = CreateOracleResponseScript(o.Hash)
@ -161,11 +164,7 @@ func (o *Oracle) GetOracleResponseScript() []byte {
// OnPersist implements the Contract interface.
func (o *Oracle) OnPersist(ic *interop.Context) error {
var err error
if o.newRequests == nil {
o.newRequests, err = o.getRequests(ic.DAO)
}
return err
return nil
}
// PostPersist represents `postPersist` method.
@ -177,7 +176,7 @@ func (o *Oracle) PostPersist(ic *interop.Context) error {
single := big.NewInt(p)
var removedIDs []uint64
orc, _ := o.Module.Load().(OracleService)
orc, _ := o.Module.Load().(*OracleService)
for _, tx := range ic.Block.Transactions {
resp := getResponse(tx)
if resp == nil {
@ -189,7 +188,7 @@ func (o *Oracle) PostPersist(ic *interop.Context) error {
continue
}
ic.DAO.DeleteStorageItem(o.ID, reqKey)
if orc != nil {
if orc != nil && *orc != nil {
removedIDs = append(removedIDs, resp.ID)
}
@ -229,8 +228,8 @@ func (o *Oracle) PostPersist(ic *interop.Context) error {
o.GAS.mint(ic, nodes[i].GetScriptHash(), &reward[i], false)
}
if len(removedIDs) != 0 && orc != nil {
orc.RemoveRequests(removedIDs)
if len(removedIDs) != 0 {
(*orc).RemoveRequests(removedIDs)
}
return o.updateCache(ic.DAO)
}
@ -415,7 +414,10 @@ func (o *Oracle) PutRequestInternal(id uint64, req *state.OracleRequest, d *dao.
if err := putConvertibleToDAO(o.ID, d, reqKey, req); err != nil {
return err
}
orc, _ := o.Module.Load().(*OracleService)
if orc != nil && *orc != nil {
o.newRequests[id] = req
}
// Add request ID to the id list.
lst := new(IDList)
@ -493,8 +495,8 @@ func (o *Oracle) getOriginalTxID(d *dao.Simple, tx *transaction.Transaction) uti
return tx.Hash()
}
// getRequests returns all requests which have not been finished yet.
func (o *Oracle) getRequests(d *dao.Simple) (map[uint64]*state.OracleRequest, error) {
// GetRequests returns all requests which have not been finished yet.
func (o *Oracle) GetRequests(d *dao.Simple) (map[uint64]*state.OracleRequest, error) {
var reqs = make(map[uint64]*state.OracleRequest)
var err error
d.Seek(o.ID, storage.SeekRange{Prefix: prefixRequest}, func(k, v []byte) bool {
@ -534,8 +536,8 @@ func (o *Oracle) getConvertibleFromDAO(d *dao.Simple, key []byte, item stackitem
// updateCache updates cached Oracle values if they've been changed.
func (o *Oracle) updateCache(d *dao.Simple) error {
orc, _ := o.Module.Load().(OracleService)
if orc == nil {
orc, _ := o.Module.Load().(*OracleService)
if orc == nil || *orc == nil {
return nil
}
@ -547,7 +549,7 @@ func (o *Oracle) updateCache(d *dao.Simple) error {
delete(reqs, id)
}
}
orc.AddRequests(reqs)
(*orc).AddRequests(reqs)
return nil
}

View file

@ -186,6 +186,7 @@ func (o *Oracle) Shutdown() {
if !o.running {
return
}
o.Log.Info("stopping oracle service")
o.running = false
close(o.close)
o.ResponseHandler.Shutdown()

View file

@ -121,8 +121,8 @@ func TestCreateResponseTx(t *testing.T) {
Result: []byte{0},
}
cInvoker.Invoke(t, stackitem.Null{}, "requestURL", req.URL, *req.Filter, req.CallbackMethod, req.UserData, int64(req.GasForResponse))
orc.UpdateOracleNodes(keys.PublicKeys{acc.PrivateKey().PublicKey()})
bc.SetOracle(orc)
orc.UpdateOracleNodes(keys.PublicKeys{acc.PrivateKey().PublicKey()})
tx, err = orc.CreateResponseTx(int64(req.GasForResponse), 1, resp)
require.NoError(t, err)
assert.Equal(t, 166, tx.Size())

View file

@ -122,7 +122,7 @@ type (
network netmode.Magic
stateRootEnabled bool
coreServer *network.Server
oracle OracleHandler
oracle *atomic.Value
log *zap.Logger
https *http.Server
shutdown chan struct{}
@ -275,6 +275,10 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
log.Info("SessionPoolSize is not set or wrong, setting default value", zap.Int("SessionPoolSize", defaultSessionPoolSize))
}
}
var oracleWrapped = new(atomic.Value)
if orc != nil {
oracleWrapped.Store(&orc)
}
return Server{
Server: httpServer,
chain: chain,
@ -284,7 +288,7 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
stateRootEnabled: protoCfg.StateRootInHeader,
coreServer: coreServer,
log: log,
oracle: orc,
oracle: oracleWrapped,
https: tlsServer,
shutdown: make(chan struct{}),
started: atomic.NewBool(false),
@ -400,6 +404,11 @@ func (s *Server) Shutdown() {
<-s.executionCh
}
// SetOracleHandler allows to update oracle handler used by the Server.
func (s *Server) SetOracleHandler(orc OracleHandler) {
s.oracle.Store(&orc)
}
func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) {
req := params.NewRequest()
@ -2328,7 +2337,8 @@ func getRelayResult(err error, hash util.Uint256) (interface{}, *neorpc.Error) {
}
func (s *Server) submitOracleResponse(ps params.Params) (interface{}, *neorpc.Error) {
if s.oracle == nil {
oracle := s.oracle.Load().(*OracleHandler)
if oracle == nil || *oracle == nil {
return nil, neorpc.NewRPCError("Oracle is not enabled", "")
}
var pub *keys.PublicKey
@ -2355,7 +2365,7 @@ func (s *Server) submitOracleResponse(ps params.Params) (interface{}, *neorpc.Er
if !pub.Verify(msgSig, hash.Sha256(data).BytesBE()) {
return nil, neorpc.NewRPCError("Invalid request signature", "")
}
s.oracle.AddResponse(pub, uint64(reqID), txSig)
(*oracle).AddResponse(pub, uint64(reqID), txSig)
return json.RawMessage([]byte("{}")), nil
}