From 43e4d3af88dca7a17a19bfc741c90218b98feb5d Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 28 Sep 2020 14:58:04 +0300 Subject: [PATCH] oracle: integrate module in core and RPC 1. Initialization is performed via `Blockchain` methods. 2. Native Oracle contract updates list of oracle nodes and in-fly requests in `PostPersist`. 3. RPC uses Oracle module directly. --- cli/executor_test.go | 2 +- cli/server/server.go | 2 +- config/protocol.mainnet.yml | 3 + config/protocol.privnet.docker.four.yml | 11 +++ config/protocol.privnet.docker.one.yml | 11 +++ config/protocol.privnet.docker.single.yml | 8 ++ config/protocol.privnet.docker.three.yml | 11 +++ config/protocol.privnet.docker.two.yml | 11 +++ config/protocol.testnet.yml | 3 + pkg/config/application_config.go | 1 + pkg/config/oracle_config.go | 13 ++++ pkg/core/blockchain.go | 8 ++ pkg/core/blockchainer/blockchainer.go | 2 + pkg/core/blockchainer/services/oracle.go | 20 +++++ pkg/core/helper_test.go | 7 +- pkg/core/native/designate.go | 6 ++ pkg/core/native/oracle.go | 68 +++++++++++++++- pkg/core/oracle_test.go | 94 +++++++++++++++++------ pkg/network/helper_test.go | 5 +- pkg/network/server.go | 37 +++++++++ pkg/network/server_config.go | 4 + pkg/rpc/client/rpc.go | 6 ++ pkg/rpc/server/server.go | 45 ++++++++++- pkg/rpc/server/server_helper_test.go | 32 ++++++-- pkg/rpc/server/server_test.go | 42 ++++++++++ pkg/services/oracle/broadcaster/oracle.go | 85 ++++++++++++++++++++ pkg/services/oracle/nodes.go | 2 +- pkg/services/oracle/oracle.go | 59 +++++++++++++- pkg/services/oracle/request.go | 28 ++++++- pkg/services/oracle/response.go | 6 +- pkg/services/oracle/transaction.go | 2 + 31 files changed, 590 insertions(+), 44 deletions(-) create mode 100644 pkg/config/oracle_config.go create mode 100644 pkg/core/blockchainer/services/oracle.go create mode 100644 pkg/services/oracle/broadcaster/oracle.go diff --git a/cli/executor_test.go b/cli/executor_test.go index 417e4f73b..6f39abb81 100644 --- a/cli/executor_test.go +++ b/cli/executor_test.go @@ -80,7 +80,7 @@ func newTestChain(t *testing.T, f func(*config.Config)) (*core.Blockchain, *serv netSrv, err := network.NewServer(serverConfig, chain, zap.NewNop()) require.NoError(t, err) go netSrv.Start(make(chan error, 1)) - rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, logger) + rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger) errCh := make(chan error, 2) rpcServer.Start(errCh) diff --git a/cli/server/server.go b/cli/server/server.go index 6341bbbad..daf0438e3 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -319,7 +319,7 @@ func startServer(ctx *cli.Context) error { if err != nil { return cli.NewExitError(fmt.Errorf("failed to create network server: %w", err), 1) } - rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, log) + rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, serv.GetOracle(), log) errChan := make(chan error) go serv.Start(errChan) diff --git a/config/protocol.mainnet.yml b/config/protocol.mainnet.yml index c9784fe11..347a16187 100644 --- a/config/protocol.mainnet.yml +++ b/config/protocol.mainnet.yml @@ -48,6 +48,9 @@ ApplicationConfiguration: MaxPeers: 100 AttemptConnPeers: 20 MinPeers: 5 + Oracle: + Enabled: false + RPC: Enabled: true MaxGasInvoke: 15 diff --git a/config/protocol.privnet.docker.four.yml b/config/protocol.privnet.docker.four.yml index d7ca912c6..4ef5c7714 100644 --- a/config/protocol.privnet.docker.four.yml +++ b/config/protocol.privnet.docker.four.yml @@ -44,6 +44,17 @@ ApplicationConfiguration: MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 + Oracle: + Enabled: false + Nodes: + - 172.200.0.1:30333 + - 172.200.0.2:30334 + - 172.200.0.3:30335 + - 172.200.0.4:30336 + RequestTimeout: 5s + UnlockWallet: + Path: "/wallet4.json" + Password: "four" RPC: Enabled: true MaxGasInvoke: 15 diff --git a/config/protocol.privnet.docker.one.yml b/config/protocol.privnet.docker.one.yml index 86390309d..1e54251df 100644 --- a/config/protocol.privnet.docker.one.yml +++ b/config/protocol.privnet.docker.one.yml @@ -44,6 +44,17 @@ ApplicationConfiguration: MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 + Oracle: + Enabled: false + Nodes: + - 172.200.0.1:30333 + - 172.200.0.2:30334 + - 172.200.0.3:30335 + - 172.200.0.4:30336 + RequestTimeout: 5s + UnlockWallet: + Path: "/wallet1.json" + Password: "one" RPC: Enabled: true MaxGasInvoke: 15 diff --git a/config/protocol.privnet.docker.single.yml b/config/protocol.privnet.docker.single.yml index 159ac2e56..5896d816a 100644 --- a/config/protocol.privnet.docker.single.yml +++ b/config/protocol.privnet.docker.single.yml @@ -38,6 +38,14 @@ ApplicationConfiguration: MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 0 + Oracle: + Enabled: false + Nodes: + - 172.200.0.1:30333 + RequestTimeout: 5s + UnlockWallet: + Path: "/wallet1_solo.json" + Password: "one" RPC: Enabled: true EnableCORSWorkaround: false diff --git a/config/protocol.privnet.docker.three.yml b/config/protocol.privnet.docker.three.yml index e510c9010..e8f182c6d 100644 --- a/config/protocol.privnet.docker.three.yml +++ b/config/protocol.privnet.docker.three.yml @@ -44,6 +44,17 @@ ApplicationConfiguration: MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 + Oracle: + Enabled: false + Nodes: + - 172.200.0.1:30333 + - 172.200.0.2:30334 + - 172.200.0.3:30335 + - 172.200.0.4:30336 + RequestTimeout: 5s + UnlockWallet: + Path: "/wallet3.json" + Password: "three" RPC: Enabled: true MaxGasInvoke: 15 diff --git a/config/protocol.privnet.docker.two.yml b/config/protocol.privnet.docker.two.yml index 1cb825c58..19470e4d9 100644 --- a/config/protocol.privnet.docker.two.yml +++ b/config/protocol.privnet.docker.two.yml @@ -44,6 +44,17 @@ ApplicationConfiguration: MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 + Oracle: + Enabled: false + Nodes: + - 172.200.0.1:30333 + - 172.200.0.2:30334 + - 172.200.0.3:30335 + - 172.200.0.4:30336 + RequestTimeout: 5s + UnlockWallet: + Path: "/wallet2.json" + Password: "two" RPC: Enabled: true MaxGasInvoke: 15 diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index c1c81091d..57d2dd840 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -48,6 +48,9 @@ ApplicationConfiguration: MaxPeers: 100 AttemptConnPeers: 20 MinPeers: 5 + Oracle: + Enabled: false + RPC: Enabled: true MaxGasInvoke: 15 diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 9c61cb9a3..7a65f7c00 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -26,4 +26,5 @@ type ApplicationConfiguration struct { Relay bool `yaml:"Relay"` RPC rpc.Config `yaml:"RPC"` UnlockWallet Wallet `yaml:"UnlockWallet"` + Oracle OracleConfiguration `yaml:"Oracle"` } diff --git a/pkg/config/oracle_config.go b/pkg/config/oracle_config.go new file mode 100644 index 000000000..9c2904c3c --- /dev/null +++ b/pkg/config/oracle_config.go @@ -0,0 +1,13 @@ +package config + +import "time" + +// OracleConfiguration is a config for the oracle module. +type OracleConfiguration struct { + Enabled bool `yaml:"Enabled"` + AllowPrivateHost bool `yaml:"AllowPrivateHost"` + Nodes []string `yaml:"Nodes"` + RequestTimeout time.Duration `yaml:"RequestTimeout"` + ResponseTimeout time.Duration `yaml:"ResponseTimeout"` + UnlockWallet Wallet `yaml:"UnlockWallet"` +} diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index b65cae7d4..d179f8454 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -14,6 +14,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" + "github.com/nspcc-dev/neo-go/pkg/core/blockchainer/services" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/interop/contract" @@ -191,6 +192,13 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L return bc, nil } +// SetOracle sets oracle module. It doesn't protected by mutex and +// must be called before `bc.Run()` to avoid data race. +func (bc *Blockchain) SetOracle(mod services.Oracle) { + bc.contracts.Oracle.Module.Store(mod) + bc.contracts.Designate.OracleService.Store(mod) +} + func (bc *Blockchain) init() error { // If we could not find the version in the Store, we know that there is nothing stored. ver, err := bc.dao.GetVersion() diff --git a/pkg/core/blockchainer/blockchainer.go b/pkg/core/blockchainer/blockchainer.go index e3db276d1..b3b7cc8db 100644 --- a/pkg/core/blockchainer/blockchainer.go +++ b/pkg/core/blockchainer/blockchainer.go @@ -5,6 +5,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/blockchainer/services" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -57,6 +58,7 @@ type Blockchainer interface { GetStorageItems(id int32) (map[string]*state.StorageItem, error) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *vm.VM GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) + SetOracle(service services.Oracle) mempool.Feer // fee interface ManagementContractHash() util.Uint160 PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error diff --git a/pkg/core/blockchainer/services/oracle.go b/pkg/core/blockchainer/services/oracle.go new file mode 100644 index 000000000..77b534415 --- /dev/null +++ b/pkg/core/blockchainer/services/oracle.go @@ -0,0 +1,20 @@ +package services + +import ( + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" +) + +// Oracle specifies oracle service interface. +type Oracle interface { + // AddRequests processes new requests. + AddRequests(map[uint64]*state.OracleRequest) + // RemoveRequests removes already processed requests. + RemoveRequests([]uint64) + // UpdateOracleNodes updates oracle nodes. + UpdateOracleNodes(keys.PublicKeys) + // Run runs oracle module. Must be invoked in a separate goroutine. + Run() + // Shutdown shutdowns oracle module. + Shutdown() +} diff --git a/pkg/core/helper_test.go b/pkg/core/helper_test.go index cff9c3274..e85d8112a 100644 --- a/pkg/core/helper_test.go +++ b/pkg/core/helper_test.go @@ -51,6 +51,12 @@ func newTestChainWithCustomCfg(t *testing.T, f func(*config.Config)) *Blockchain } func newTestChainWithCustomCfgAndStore(t *testing.T, st storage.Store, f func(*config.Config)) *Blockchain { + chain := initTestChain(t, st, f) + go chain.Run() + return chain +} + +func initTestChain(t *testing.T, st storage.Store, f func(*config.Config)) *Blockchain { unitTestNetCfg, err := config.Load("../../config", testchain.Network()) require.NoError(t, err) if f != nil { @@ -61,7 +67,6 @@ func newTestChainWithCustomCfgAndStore(t *testing.T, st storage.Store, f func(*c } chain, err := NewBlockchain(st, unitTestNetCfg.ProtocolConfiguration, zaptest.NewLogger(t)) require.NoError(t, err) - go chain.Run() return chain } diff --git a/pkg/core/native/designate.go b/pkg/core/native/designate.go index aa8b68c9a..b3836fe31 100644 --- a/pkg/core/native/designate.go +++ b/pkg/core/native/designate.go @@ -7,6 +7,7 @@ import ( "sort" "sync/atomic" + "github.com/nspcc-dev/neo-go/pkg/core/blockchainer/services" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/interop/runtime" @@ -32,6 +33,8 @@ type Designate struct { // p2pSigExtensionsEnabled defines whether the P2P signature extensions logic is relevant. p2pSigExtensionsEnabled bool + + OracleService atomic.Value } type oraclesData struct { @@ -117,6 +120,9 @@ func (s *Designate) PostPersist(ic *interop.Context) error { height: height, } s.oracles.Store(od) + if orc, _ := s.OracleService.Load().(services.Oracle); orc != nil { + orc.UpdateOracleNodes(od.nodes.Copy()) + } s.rolesChangedFlag.Store(false) return nil } diff --git a/pkg/core/native/oracle.go b/pkg/core/native/oracle.go index 460b9222c..1c43623ec 100644 --- a/pkg/core/native/oracle.go +++ b/pkg/core/native/oracle.go @@ -7,7 +7,9 @@ import ( "math" "math/big" "strings" + "sync/atomic" + "github.com/nspcc-dev/neo-go/pkg/core/blockchainer/services" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/interop/contract" @@ -36,6 +38,11 @@ type Oracle struct { Desig *Designate oracleScript []byte + + // Module is an oracle module capable of talking with the external world. + Module atomic.Value + // newRequests contains new requests created during current block. + newRequests map[uint64]*state.OracleRequest } const ( @@ -112,7 +119,11 @@ func (o *Oracle) GetOracleResponseScript() []byte { // OnPersist implements Contract interface. func (o *Oracle) OnPersist(ic *interop.Context) error { - return nil + var err error + if o.newRequests == nil { + o.newRequests, err = o.getRequests(ic.DAO) + } + return err } // PostPersist represents `postPersist` method. @@ -120,6 +131,9 @@ func (o *Oracle) PostPersist(ic *interop.Context) error { var nodes keys.PublicKeys var reward []big.Int single := new(big.Int).SetUint64(oracleRequestPrice) + var removedIDs []uint64 + + orc, _ := o.Module.Load().(services.Oracle) for _, tx := range ic.Block.Transactions { resp := getResponse(tx) if resp == nil { @@ -133,6 +147,9 @@ func (o *Oracle) PostPersist(ic *interop.Context) error { if err := ic.DAO.DeleteStorageItem(o.ContractID, reqKey); err != nil { return err } + if orc != nil { + removedIDs = append(removedIDs, resp.ID) + } idKey := makeIDListKey(req.URL) idList := new(IDList) @@ -170,7 +187,11 @@ func (o *Oracle) PostPersist(ic *interop.Context) error { for i := range reward { o.GAS.mint(ic, nodes[i].GetScriptHash(), &reward[i], false) } - return nil + + if len(removedIDs) != 0 && orc != nil { + orc.RemoveRequests(removedIDs) + } + return o.updateCache(ic.DAO) } // Metadata returns contract metadata. @@ -338,6 +359,7 @@ func (o *Oracle) PutRequestInternal(id uint64, req *state.OracleRequest, d dao.D if err := d.PutStorageItem(o.ContractID, reqKey, reqItem); err != nil { return err } + o.newRequests[id] = req // Add request ID to the id list. lst := new(IDList) @@ -393,6 +415,29 @@ func (o *Oracle) getOriginalTxID(d dao.DAO, tx *transaction.Transaction) util.Ui return tx.Hash() } +// getRequests returns all requests which have not been finished yet. +func (o *Oracle) getRequests(d dao.DAO) (map[uint64]*state.OracleRequest, error) { + m, err := d.GetStorageItemsWithPrefix(o.ContractID, prefixRequest) + if err != nil { + return nil, err + } + reqs := make(map[uint64]*state.OracleRequest, len(m)) + for k, si := range m { + if len(k) != 8 { + return nil, errors.New("invalid request ID") + } + r := io.NewBinReaderFromBuf(si.Value) + req := new(state.OracleRequest) + req.DecodeBinary(r) + if r.Err != nil { + return nil, r.Err + } + id := binary.LittleEndian.Uint64([]byte(k)) + reqs[id] = req + } + return reqs, nil +} + func makeRequestKey(id uint64) []byte { k := make([]byte, 9) k[0] = prefixRequest[0] @@ -407,3 +452,22 @@ func makeIDListKey(url string) []byte { func (o *Oracle) getSerializableFromDAO(d dao.DAO, key []byte, item io.Serializable) error { return getSerializableFromDAO(o.ContractID, d, key, item) } + +// updateCache updates cached Oracle values if they've been changed +func (o *Oracle) updateCache(d dao.DAO) error { + orc, _ := o.Module.Load().(services.Oracle) + if orc == nil { + return nil + } + + reqs := o.newRequests + o.newRequests = make(map[uint64]*state.OracleRequest) + for id := range reqs { + key := makeRequestKey(id) + if si := d.GetStorageItem(o.ContractID, key); si == nil { // tx has failed + delete(reqs, id) + } + } + orc.AddRequests(reqs) + return nil +} diff --git a/pkg/core/oracle_test.go b/pkg/core/oracle_test.go index f6888f291..88bf84d36 100644 --- a/pkg/core/oracle_test.go +++ b/pkg/core/oracle_test.go @@ -10,6 +10,7 @@ import ( "path" "strings" "testing" + "time" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" @@ -27,34 +28,39 @@ import ( const oracleModulePath = "../services/oracle/" +func getOracleConfig(t *testing.T, bc *Blockchain, w, pass string) oracle.Config { + return oracle.Config{ + Log: zaptest.NewLogger(t), + Network: netmode.UnitTestNet, + MainCfg: config.OracleConfiguration{ + UnlockWallet: config.Wallet{ + Path: path.Join(oracleModulePath, w), + Password: pass, + }, + }, + Chain: bc, + Client: newDefaultHTTPClient(), + OracleScript: bc.contracts.Oracle.NEF.Script, + OracleResponse: bc.contracts.Oracle.GetOracleResponseScript(), + OracleHash: bc.contracts.Oracle.Hash, + } +} + func getTestOracle(t *testing.T, bc *Blockchain, walletPath, pass string) ( *wallet.Account, *oracle.Oracle, map[uint64]*responseWithSig, chan *transaction.Transaction) { - m := make(map[uint64]*responseWithSig) ch := make(chan *transaction.Transaction, 5) - orcCfg := oracle.Config{ - Log: zaptest.NewLogger(t), - Network: netmode.UnitTestNet, - Wallet: config.Wallet{ - Path: path.Join(oracleModulePath, walletPath), - Password: pass, - }, - Chain: bc, - Client: newDefaultHTTPClient(), - ResponseHandler: saveToMapBroadcaster{m}, - OnTransaction: saveTxToChan(ch), - URIValidator: func(u *url.URL) error { - if strings.HasPrefix(u.Host, "private") { - return errors.New("private network") - } - return nil - }, - OracleScript: bc.contracts.Oracle.NEF.Script, - OracleResponse: bc.contracts.Oracle.GetOracleResponseScript(), - OracleHash: bc.contracts.Oracle.Hash, + orcCfg := getOracleConfig(t, bc, walletPath, pass) + orcCfg.ResponseHandler = saveToMapBroadcaster{m} + orcCfg.OnTransaction = saveTxToChan(ch) + orcCfg.URIValidator = func(u *url.URL) error { + if strings.HasPrefix(u.Host, "private") { + return errors.New("private network") + } + return nil } orc, err := oracle.NewOracle(orcCfg) require.NoError(t, err) @@ -97,6 +103,16 @@ func TestCreateResponseTx(t *testing.T) { assert.Equal(t, int64(97783360), tx.SystemFee) } +func TestOracle_InvalidWallet(t *testing.T) { + bc := newTestChain(t) + + _, err := oracle.NewOracle(getOracleConfig(t, bc, "./testdata/oracle1.json", "invalid")) + require.Error(t, err) + + _, err = oracle.NewOracle(getOracleConfig(t, bc, "./testdata/oracle1.json", "one")) + require.NoError(t, err) +} + func TestOracle(t *testing.T) { bc := newTestChain(t) defer bc.Close() @@ -128,7 +144,7 @@ func TestOracle(t *testing.T) { require.NoError(t, err) reqs := map[uint64]*state.OracleRequest{id: req} - orc1.AddRequests(reqs) + orc1.ProcessRequestsInternal(reqs) require.NotNil(t, m1[id]) require.Equal(t, resp, m1[id].resp) require.Empty(t, ch1) @@ -151,10 +167,14 @@ func TestOracle(t *testing.T) { req := checkResp(t, 1, resp) reqs := map[uint64]*state.OracleRequest{1: req} - orc2.AddRequests(reqs) + orc2.ProcessRequestsInternal(reqs) require.Equal(t, resp, m2[1].resp) require.Empty(t, ch2) + t.Run("InvalidSignature", func(t *testing.T) { + orc1.AddResponse(acc2.PrivateKey().PublicKey(), m2[1].resp.ID, []byte{1, 2, 3}) + require.Empty(t, ch1) + }) orc1.AddResponse(acc2.PrivateKey().PublicKey(), m2[1].resp.ID, m2[1].txSig) checkEmitTx(t, ch1) @@ -171,7 +191,7 @@ func TestOracle(t *testing.T) { require.Empty(t, ch2) reqs := map[uint64]*state.OracleRequest{reqID: req} - orc2.AddRequests(reqs) + orc2.ProcessRequestsInternal(reqs) require.Equal(t, resp, m2[reqID].resp) checkEmitTx(t, ch2) }) @@ -223,6 +243,32 @@ func TestOracle(t *testing.T) { }) } +func TestOracleFull(t *testing.T) { + bc := initTestChain(t, nil, nil) + acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two") + mp := bc.GetMemPool() + orc.OnTransaction = func(tx *transaction.Transaction) { _ = mp.Add(tx, bc) } + bc.SetOracle(orc) + + cs := getOracleContractState(bc.contracts.Oracle.Hash) + require.NoError(t, bc.contracts.Management.PutContractState(bc.dao, cs)) + + go bc.Run() + defer bc.Close() + go orc.Run() + defer orc.Shutdown() + + bc.setNodesByRole(t, true, native.RoleOracle, keys.PublicKeys{acc.PrivateKey().PublicKey()}) + putOracleRequest(t, cs.Hash, bc, "http://get.1234", new(string), "handle", []byte{}, 10_000_000) + + require.Eventually(t, func() bool { return mp.Count() == 1 }, + time.Second*2, time.Millisecond*200) + + txes := mp.GetVerifiedTransactions() + require.Len(t, txes, 1) + require.True(t, txes[0].HasAttribute(transaction.OracleResponseT)) +} + type saveToMapBroadcaster struct { m map[uint64]*responseWithSig } diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 716ba2afa..9fa810334 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -14,6 +14,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" + "github.com/nspcc-dev/neo-go/pkg/core/blockchainer/services" "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/native" @@ -278,7 +279,9 @@ func (chain testChain) ManagementContractHash() util.Uint160 { func (chain *testChain) PoolTx(tx *transaction.Transaction, _ ...*mempool.Pool) error { return chain.poolTx(tx) } - +func (chain testChain) SetOracle(services.Oracle) { + panic("TODO") +} func (chain *testChain) SubscribeForBlocks(ch chan<- *block.Block) { chain.blocksCh = append(chain.blocksCh, ch) } diff --git a/pkg/network/server.go b/pkg/network/server.go index 8b5c1ea89..d6616a404 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -20,6 +20,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/oracle" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" "go.uber.org/zap" @@ -82,6 +83,8 @@ type ( consensusStarted *atomic.Bool + oracle *oracle.Oracle + log *zap.Logger } @@ -142,6 +145,29 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai } }) + if config.OracleCfg.Enabled { + orcCfg := oracle.Config{ + Log: log, + Network: config.Net, + MainCfg: config.OracleCfg, + Chain: chain, + } + orc, err := oracle.NewOracle(orcCfg) + if err != nil { + return nil, fmt.Errorf("can't initialize Oracle module: %w", err) + } + orc.SetOnTransaction(func(tx *transaction.Transaction) { + r := s.RelayTxn(tx) + if r != RelaySucceed { + orc.Log.Error("can't pool oracle tx", + zap.String("hash", tx.Hash().StringLE()), + zap.Uint8("reason", byte(r))) + } + }) + s.oracle = orc + chain.SetOracle(orc) + } + srv, err := newConsensus(consensus.Config{ Logger: log, Broadcast: s.handleNewPayload, @@ -203,6 +229,9 @@ func (s *Server) Start(errChan chan error) { s.initStaleMemPools() go s.broadcastTxLoop() + if s.oracle != nil { + go s.oracle.Run() + } go s.relayBlocksLoop() go s.bQueue.run() go s.transport.Accept() @@ -222,9 +251,17 @@ func (s *Server) Shutdown() { p.Disconnect(errServerShutdown) } s.bQueue.discard() + if s.oracle != nil { + s.oracle.Shutdown() + } close(s.quit) } +// GetOracle returns oracle module instance. +func (s *Server) GetOracle() *oracle.Oracle { + return s.oracle +} + // UnconnectedPeers returns a list of peers that are in the discovery peer list // but are not connected to the server. func (s *Server) UnconnectedPeers() []string { diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index a1ccd0e07..860ade239 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -66,6 +66,9 @@ type ( // TimePerBlock is an interval which should pass between two successive blocks. TimePerBlock time.Duration + + // OracleCfg is oracle module configuration. + OracleCfg config.OracleConfiguration } ) @@ -96,5 +99,6 @@ func NewServerConfig(cfg config.Config) ServerConfig { MinPeers: appConfig.MinPeers, Wallet: wc, TimePerBlock: time.Duration(protoConfig.SecondsPerBlock) * time.Second, + OracleCfg: appConfig.Oracle, } } diff --git a/pkg/rpc/client/rpc.go b/pkg/rpc/client/rpc.go index c52497892..2f51150ab 100644 --- a/pkg/rpc/client/rpc.go +++ b/pkg/rpc/client/rpc.go @@ -491,6 +491,12 @@ func (c *Client) SubmitBlock(b block.Block) (util.Uint256, error) { return resp.Hash, nil } +// SubmitRawOracleResponse submits raw oracle response to the oracle node. +// Raw params are used to avoid excessive marshalling. +func (c *Client) SubmitRawOracleResponse(ps request.RawParams) error { + return c.performRequest("submitoracleresponse", ps, new(result.RelayResult)) +} + // SignAndPushInvocationTx signs and pushes given script as an invocation // transaction using given wif to sign it and spending the amount of gas // specified. It returns a hash of the invocation transaction and an error. diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 8edafd530..6e1ca675e 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -2,6 +2,7 @@ package server import ( "context" + "crypto/elliptic" "encoding/binary" "encoding/hex" "encoding/json" @@ -23,10 +24,13 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network" + "github.com/nspcc-dev/neo-go/pkg/services/oracle" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster" "github.com/nspcc-dev/neo-go/pkg/rpc" "github.com/nspcc-dev/neo-go/pkg/rpc/request" "github.com/nspcc-dev/neo-go/pkg/rpc/response" @@ -47,6 +51,7 @@ type ( network netmode.Magic stateRootEnabled bool coreServer *network.Server + oracle *oracle.Oracle log *zap.Logger https *http.Server shutdown chan struct{} @@ -116,6 +121,7 @@ var rpcHandlers = map[string]func(*Server, request.Params) (interface{}, *respon "invokecontractverify": (*Server).invokeContractVerify, "sendrawtransaction": (*Server).sendrawtransaction, "submitblock": (*Server).submitBlock, + "submitoracleresponse": (*Server).submitOracleResponse, "validateaddress": (*Server).validateAddress, "verifyproof": (*Server).verifyProof, } @@ -134,7 +140,8 @@ var invalidBlockHeightError = func(index int, height int) *response.Error { var upgrader = websocket.Upgrader{} // New creates a new Server struct. -func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server, log *zap.Logger) Server { +func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server, + orc *oracle.Oracle, log *zap.Logger) Server { httpServer := &http.Server{ Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10), } @@ -146,6 +153,9 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S } } + if orc != nil { + orc.SetBroadcaster(broadcaster.New(orc.MainCfg, log)) + } return Server{ Server: httpServer, chain: chain, @@ -154,6 +164,7 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S stateRootEnabled: chain.GetConfig().StateRootInHeader, coreServer: coreServer, log: log, + oracle: orc, https: tlsServer, shutdown: make(chan struct{}), @@ -1223,6 +1234,38 @@ func (s *Server) submitBlock(reqParams request.Params) (interface{}, *response.E }, nil } +func (s *Server) submitOracleResponse(ps request.Params) (interface{}, *response.Error) { + if s.oracle == nil { + return nil, response.NewInternalServerError("oracle is not enabled", nil) + } + var pub *keys.PublicKey + pubBytes, err := ps.Value(0).GetBytesBase64() + if err == nil { + pub, err = keys.NewPublicKeyFromBytes(pubBytes, elliptic.P256()) + } + if err != nil { + return nil, response.NewInvalidParamsError("public key is missing", err) + } + reqID, err := ps.Value(1).GetInt() + if err != nil { + return nil, response.NewInvalidParamsError("request ID is missing", err) + } + txSig, err := ps.Value(2).GetBytesBase64() + if err != nil { + return nil, response.NewInvalidParamsError("tx signature is missing", err) + } + msgSig, err := ps.Value(3).GetBytesBase64() + if err != nil { + return nil, response.NewInvalidParamsError("msg signature is missing", err) + } + data := broadcaster.GetMessage(pubBytes, uint64(reqID), txSig) + if !pub.Verify(msgSig, hash.Sha256(data).BytesBE()) { + return nil, response.NewRPCError("Invalid sign", "", nil) + } + s.oracle.AddResponse(pub, uint64(reqID), txSig) + return json.RawMessage([]byte("{}")), nil +} + func (s *Server) sendrawtransaction(reqParams request.Params) (interface{}, *response.Error) { var resultsErr *response.Error var results interface{} diff --git a/pkg/rpc/server/server_helper_test.go b/pkg/rpc/server/server_helper_test.go index 6f49e8bf9..3511ac5c2 100644 --- a/pkg/rpc/server/server_helper_test.go +++ b/pkg/rpc/server/server_helper_test.go @@ -16,13 +16,14 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network" + "github.com/nspcc-dev/neo-go/pkg/services/oracle" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) -func getUnitTestChain(t *testing.T) (*core.Blockchain, config.Config, *zap.Logger) { +func getUnitTestChain(t *testing.T, enableOracle bool) (*core.Blockchain, *oracle.Oracle, config.Config, *zap.Logger) { net := netmode.UnitTestNet configPath := "../../../config" cfg, err := config.Load(configPath, net) @@ -33,9 +34,26 @@ func getUnitTestChain(t *testing.T) (*core.Blockchain, config.Config, *zap.Logge chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration, logger) require.NoError(t, err, "could not create chain") + var orc *oracle.Oracle + if enableOracle { + cfg.ApplicationConfiguration.Oracle.Enabled = true + cfg.ApplicationConfiguration.Oracle.UnlockWallet = config.Wallet{ + Path: "../../services/oracle/testdata/oracle1.json", + Password: "one", + } + orc, err = oracle.NewOracle(oracle.Config{ + Log: logger, + Network: netmode.UnitTestNet, + MainCfg: cfg.ApplicationConfiguration.Oracle, + Chain: chain, + }) + require.NoError(t, err) + chain.SetOracle(orc) + } + go chain.Run() - return chain, cfg, logger + return chain, orc, cfg, logger } func getTestBlocks(t *testing.T) []*block.Block { @@ -61,13 +79,13 @@ func getTestBlocks(t *testing.T) []*block.Block { return blocks } -func initClearServerWithInMemoryChain(t *testing.T) (*core.Blockchain, *Server, *httptest.Server) { - chain, cfg, logger := getUnitTestChain(t) +func initClearServerWithOracle(t *testing.T, needOracle bool) (*core.Blockchain, *Server, *httptest.Server) { + chain, orc, cfg, logger := getUnitTestChain(t, needOracle) serverConfig := network.NewServerConfig(cfg) server, err := network.NewServer(serverConfig, chain, logger) require.NoError(t, err) - rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, logger) + rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger) errCh := make(chan error, 2) rpcServer.Start(errCh) @@ -77,6 +95,10 @@ func initClearServerWithInMemoryChain(t *testing.T) (*core.Blockchain, *Server, return chain, &rpcServer, srv } +func initClearServerWithInMemoryChain(t *testing.T) (*core.Blockchain, *Server, *httptest.Server) { + return initClearServerWithOracle(t, false) +} + func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, *Server, *httptest.Server) { chain, rpcServer, srv := initClearServerWithInMemoryChain(t) diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index f88d5a985..a2da8e512 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -25,8 +25,10 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/fee" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" + rpc2 "github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster" "github.com/nspcc-dev/neo-go/pkg/rpc/response" "github.com/nspcc-dev/neo-go/pkg/rpc/response/result" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" @@ -885,6 +887,13 @@ var rpcTestCases = map[string][]rpcTestCase{ fail: true, }, }, + "submitoracleresponse": { + { + name: "no params", + params: `[]`, + fail: true, + }, + }, "validateaddress": { { name: "positive", @@ -920,6 +929,39 @@ func TestRPC(t *testing.T) { }) } +func TestSubmitOracle(t *testing.T) { + chain, rpcSrv, httpSrv := initClearServerWithOracle(t, true) + defer chain.Close() + defer rpcSrv.Shutdown() + + rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitoracleresponse", "params": %s}` + runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) { + return func(t *testing.T) { + ps := `[` + strings.Join(params, ",") + `]` + req := fmt.Sprintf(rpc, ps) + body := doRPCCallOverHTTP(req, httpSrv.URL, t) + checkErrGetResult(t, body, fail) + } + } + t.Run("MissingKey", runCase(t, true)) + t.Run("InvalidKey", runCase(t, true, `"1234"`)) + + priv, err := keys.NewPrivateKey() + require.NoError(t, err) + pubStr := `"` + base64.StdEncoding.EncodeToString(priv.PublicKey().Bytes()) + `"` + t.Run("InvalidReqID", runCase(t, true, pubStr, `"notanumber"`)) + t.Run("InvalidTxSignature", runCase(t, true, pubStr, `1`, `"qwerty"`)) + + txSig := priv.Sign([]byte{1, 2, 3}) + txSigStr := `"` + base64.StdEncoding.EncodeToString(txSig) + `"` + t.Run("MissingMsgSignature", runCase(t, true, pubStr, `1`, txSigStr)) + t.Run("InvalidMsgSignature", runCase(t, true, pubStr, `1`, txSigStr, `"0123"`)) + + msg := rpc2.GetMessage(priv.PublicKey().Bytes(), 1, txSig) + msgSigStr := `"` + base64.StdEncoding.EncodeToString(priv.Sign(msg)) + `"` + t.Run("Valid", runCase(t, false, pubStr, `1`, txSigStr, msgSigStr)) +} + // testRPCProtocol runs a full set of tests using given callback to make actual // calls. Some tests change the chain state, thus we reinitialize the chain from // scratch here. diff --git a/pkg/services/oracle/broadcaster/oracle.go b/pkg/services/oracle/broadcaster/oracle.go new file mode 100644 index 000000000..a4182d3d2 --- /dev/null +++ b/pkg/services/oracle/broadcaster/oracle.go @@ -0,0 +1,85 @@ +package broadcaster + +import ( + "context" + "encoding/base64" + "encoding/binary" + "time" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/rpc/client" + "github.com/nspcc-dev/neo-go/pkg/rpc/request" + "github.com/nspcc-dev/neo-go/pkg/services/oracle" + "go.uber.org/zap" +) + +type rpcBroascaster struct { + clients map[string]*client.Client + log *zap.Logger + + sendTimeout time.Duration +} + +const ( + defaultSendTimeout = time.Second * 4 +) + +// New returns new struct capable of broadcasting oracle responses. +func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster { + if cfg.ResponseTimeout == 0 { + cfg.ResponseTimeout = defaultSendTimeout + } + r := &rpcBroascaster{ + clients: make(map[string]*client.Client, len(cfg.Nodes)), + log: log, + sendTimeout: cfg.ResponseTimeout, + } + for i := range cfg.Nodes { + // We ignore error as not every node can be available on startup. + r.clients[cfg.Nodes[i]], _ = client.New(context.Background(), "http://"+cfg.Nodes[i], client.Options{ + DialTimeout: cfg.ResponseTimeout, + RequestTimeout: cfg.ResponseTimeout, + }) + } + return r +} + +// SendResponse implements interfaces.Broadcaster. +func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { + pub := priv.PublicKey() + data := GetMessage(pub.Bytes(), resp.ID, txSig) + msgSig := priv.Sign(data) + params := request.NewRawParams( + base64.StdEncoding.EncodeToString(pub.Bytes()), + resp.ID, + base64.StdEncoding.EncodeToString(txSig), + base64.StdEncoding.EncodeToString(msgSig), + ) + for addr, c := range r.clients { + if c == nil { + var err error + c, err = client.New(context.Background(), addr, client.Options{ + DialTimeout: r.sendTimeout, + RequestTimeout: r.sendTimeout, + }) + if err != nil { + r.log.Debug("can't connect to oracle node", zap.String("address", addr), zap.Error(err)) + continue + } + r.clients[addr] = c + } + err := c.SubmitRawOracleResponse(params) + r.log.Debug("error during oracle response submit", zap.String("address", addr), zap.Error(err)) + } +} + +// GetMessage returns data which is signed upon sending response by RPC. +func GetMessage(pubBytes []byte, reqID uint64, txSig []byte) []byte { + data := make([]byte, len(pubBytes)+8+len(txSig)) + copy(data, pubBytes) + binary.LittleEndian.PutUint64(data[len(pubBytes):], uint64(reqID)) + copy(data[len(pubBytes)+8:], txSig) + return data +} diff --git a/pkg/services/oracle/nodes.go b/pkg/services/oracle/nodes.go index ee0434868..87acd4a02 100644 --- a/pkg/services/oracle/nodes.go +++ b/pkg/services/oracle/nodes.go @@ -33,7 +33,7 @@ func (o *Oracle) UpdateOracleNodes(oracleNodes keys.PublicKeys) { if acc.PrivateKey() != nil { break } - err := acc.Decrypt(o.Wallet.Password) + err := acc.Decrypt(o.MainCfg.UnlockWallet.Password) if err != nil { o.Log.Error("can't unlock account", zap.String("address", address.Uint160ToString(acc.Contract.ScriptHash())), diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 64682c196..4a3fde10d 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -1,6 +1,7 @@ package oracle import ( + "errors" "net/http" "net/url" "sync" @@ -9,6 +10,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" @@ -31,6 +33,9 @@ type ( oracleNodes keys.PublicKeys oracleSignContract []byte + close chan struct{} + requestMap chan map[uint64]*state.OracleRequest + // respMtx protects responses map. respMtx sync.RWMutex responses map[uint64]*incompleteTx @@ -42,7 +47,7 @@ type ( Config struct { Log *zap.Logger Network netmode.Magic - Wallet config.Wallet + MainCfg config.OracleConfiguration Client HTTPClient Chain blockchainer.Blockchainer ResponseHandler Broadcaster @@ -81,19 +86,35 @@ func NewOracle(cfg Config) (*Oracle, error) { o := &Oracle{ Config: cfg, - responses: make(map[uint64]*incompleteTx), + close: make(chan struct{}), + requestMap: make(chan map[uint64]*state.OracleRequest, 1), + responses: make(map[uint64]*incompleteTx), + } + if o.MainCfg.RequestTimeout == 0 { + o.MainCfg.RequestTimeout = defaultRequestTimeout } var err error - w := cfg.Wallet + w := cfg.MainCfg.UnlockWallet if o.wallet, err = wallet.NewWalletFromFile(w.Path); err != nil { return nil, err } + haveAccount := false + for _, acc := range o.wallet.Accounts { + if err := acc.Decrypt(w.Password); err == nil { + haveAccount = true + break + } + } + if !haveAccount { + return nil, errors.New("no wallet account could be unlocked") + } + if o.Client == nil { var client http.Client client.Transport = &http.Transport{DisableKeepAlives: true} - client.Timeout = defaultRequestTimeout + client.Timeout = o.MainCfg.RequestTimeout o.Client = &client } if o.ResponseHandler == nil { @@ -108,6 +129,36 @@ func NewOracle(cfg Config) (*Oracle, error) { return o, nil } +// Shutdown shutdowns Oracle. +func (o *Oracle) Shutdown() { + close(o.close) +} + +// Run runs must be executed in a separate goroutine. +func (o *Oracle) Run() { + for { + select { + case <-o.close: + return + case reqs := <-o.requestMap: + o.ProcessRequestsInternal(reqs) + } + } +} + +func (o *Oracle) getOnTransaction() TxCallback { + o.mtx.RLock() + defer o.mtx.RUnlock() + return o.OnTransaction +} + +// SetOnTransaction sets callback to pool and broadcast tx. +func (o *Oracle) SetOnTransaction(cb TxCallback) { + o.mtx.Lock() + defer o.mtx.Unlock() + o.OnTransaction = cb +} + func (o *Oracle) getBroadcaster() Broadcaster { o.mtx.RLock() defer o.mtx.RUnlock() diff --git a/pkg/services/oracle/request.go b/pkg/services/oracle/request.go index a3b5f232b..06143341a 100644 --- a/pkg/services/oracle/request.go +++ b/pkg/services/oracle/request.go @@ -24,6 +24,26 @@ func (o *Oracle) RemoveRequests(ids []uint64) { // AddRequests saves all requests in-fly for further processing. func (o *Oracle) AddRequests(reqs map[uint64]*state.OracleRequest) { + if len(reqs) == 0 { + return + } + + select { + case o.requestMap <- reqs: + default: + select { + case old := <-o.requestMap: + for id, r := range old { + reqs[id] = r + } + default: + } + o.requestMap <- reqs + } +} + +// ProcessRequestsInternal processes provided requests synchronously. +func (o *Oracle) ProcessRequestsInternal(reqs map[uint64]*state.OracleRequest) { acc := o.getAccount() if acc == nil { return @@ -40,7 +60,7 @@ func (o *Oracle) AddRequests(reqs map[uint64]*state.OracleRequest) { func (o *Oracle) processRequest(priv *keys.PrivateKey, id uint64, req *state.OracleRequest) error { resp := &transaction.OracleResponse{ID: id} u, err := url.ParseRequestURI(req.URL) - if err == nil { + if err == nil && !o.MainCfg.AllowPrivateHost { err = o.URIValidator(u) } if err != nil { @@ -107,11 +127,15 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, id uint64, req *state.Ora incTx.addResponse(priv.PublicKey(), backupSig, true) readyTx, ready := incTx.finalize(o.getOracleNodes()) + if ready { + ready = !incTx.isSent + incTx.isSent = true + } incTx.Unlock() o.getBroadcaster().SendResponse(priv, resp, txSig) if ready { - o.OnTransaction(readyTx) + o.getOnTransaction()(readyTx) } return nil } diff --git a/pkg/services/oracle/response.go b/pkg/services/oracle/response.go index bb9f0896a..29c7da858 100644 --- a/pkg/services/oracle/response.go +++ b/pkg/services/oracle/response.go @@ -50,10 +50,14 @@ func (o *Oracle) AddResponse(pub *keys.PublicKey, reqID uint64, txSig []byte) { } incTx.addResponse(pub, txSig, isBackup) readyTx, ready := incTx.finalize(o.getOracleNodes()) + if ready { + ready = !incTx.isSent + incTx.isSent = true + } incTx.Unlock() if ready { - o.OnTransaction(readyTx) + o.getOnTransaction()(readyTx) } } diff --git a/pkg/services/oracle/transaction.go b/pkg/services/oracle/transaction.go index 7f64d5bd3..38aa30ea3 100644 --- a/pkg/services/oracle/transaction.go +++ b/pkg/services/oracle/transaction.go @@ -13,6 +13,8 @@ import ( type ( incompleteTx struct { sync.RWMutex + // isSent is true tx was already broadcasted. + isSent bool // tx is oracle response transaction. tx *transaction.Transaction // sigs contains signature from every oracle node.