diff --git a/cli/executor_test.go b/cli/executor_test.go index d1e4b1402..5ff7b2996 100644 --- a/cli/executor_test.go +++ b/cli/executor_test.go @@ -12,6 +12,7 @@ import ( "github.com/nspcc-dev/neo-go/cli/input" "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -82,8 +83,19 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch } serverConfig := network.NewServerConfig(cfg) - netSrv, err := network.NewServer(serverConfig, chain, zap.NewNop()) + netSrv, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), zap.NewNop()) require.NoError(t, err) + cons, err := consensus.NewService(consensus.Config{ + Logger: zap.NewNop(), + Broadcast: netSrv.BroadcastExtensible, + Chain: chain, + ProtocolConfiguration: chain.GetConfig(), + RequestTx: netSrv.RequestTx, + Wallet: serverConfig.Wallet, + TimePerBlock: serverConfig.TimePerBlock, + }) + require.NoError(t, err) + netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) go netSrv.Start(make(chan error, 1)) rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger) errCh := make(chan error, 2) diff --git a/cli/server/server.go b/cli/server/server.go index 1a37ca997..21156cec8 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -2,6 +2,7 @@ package server import ( "context" + "errors" "fmt" "os" "os/signal" @@ -9,14 +10,20 @@ import ( "github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/chaindump" + corestate "github.com/nspcc-dev/neo-go/pkg/core/stateroot" "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network" "github.com/nspcc-dev/neo-go/pkg/network/metrics" "github.com/nspcc-dev/neo-go/pkg/rpc/server" + "github.com/nspcc-dev/neo-go/pkg/services/notary" + "github.com/nspcc-dev/neo-go/pkg/services/oracle" + "github.com/nspcc-dev/neo-go/pkg/services/stateroot" "github.com/urfave/cli" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -316,6 +323,73 @@ func restoreDB(ctx *cli.Context) error { return nil } +func mkOracle(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) { + if !config.OracleCfg.Enabled { + return nil, nil + } + orcCfg := oracle.Config{ + Log: log, + Network: config.Net, + MainCfg: config.OracleCfg, + Chain: chain, + OnTransaction: serv.RelayTxn, + } + orc, err := oracle.NewOracle(orcCfg) + if err != nil { + return nil, fmt.Errorf("can't initialize Oracle module: %w", err) + } + chain.SetOracle(orc) + serv.AddService(orc) + return orc, nil +} + +func mkConsensus(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (consensus.Service, error) { + if config.Wallet == nil { + return nil, nil + } + srv, err := consensus.NewService(consensus.Config{ + Logger: log, + Broadcast: serv.BroadcastExtensible, + Chain: chain, + ProtocolConfiguration: chain.GetConfig(), + RequestTx: serv.RequestTx, + Wallet: config.Wallet, + TimePerBlock: config.TimePerBlock, + }) + if err != nil { + return nil, fmt.Errorf("can't initialize Consensus module: %w", err) + } + + serv.AddExtensibleHPService(srv, consensus.Category, srv.OnPayload, srv.OnTransaction) + return srv, nil +} + +func mkP2PNotary(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*notary.Notary, error) { + if !config.P2PNotaryCfg.Enabled { + return nil, nil + } + if !chain.P2PSigExtensionsEnabled() { + return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enabled") + } + cfg := notary.Config{ + MainCfg: config.P2PNotaryCfg, + Chain: chain, + Log: log, + } + n, err := notary.NewNotary(cfg, serv.Net, serv.GetNotaryPool(), func(tx *transaction.Transaction) error { + if err := serv.RelayTxn(tx); err != nil { + return fmt.Errorf("can't relay completed notary transaction: hash %s, error: %w", tx.Hash().StringLE(), err) + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("failed to create Notary module: %w", err) + } + serv.AddService(n) + chain.SetNotary(n) + return n, nil +} + func startServer(ctx *cli.Context) error { cfg, err := getConfigFromContext(ctx) if err != nil { @@ -336,11 +410,30 @@ func startServer(ctx *cli.Context) error { return err } - serv, err := network.NewServer(serverConfig, chain, log) + serv, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), log) if err != nil { return cli.NewExitError(fmt.Errorf("failed to create network server: %w", err), 1) } - rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, serv.GetOracle(), log) + srMod := chain.GetStateModule().(*corestate.Module) // Take full responsibility here. + sr, err := stateroot.New(serverConfig.StateRootCfg, srMod, log, chain, serv.BroadcastExtensible) + if err != nil { + return cli.NewExitError(fmt.Errorf("can't initialize StateRoot service: %w", err), 1) + } + serv.AddExtensibleService(sr, stateroot.Category, sr.OnPayload) + + oracleSrv, err := mkOracle(serverConfig, chain, serv, log) + if err != nil { + return err + } + _, err = mkConsensus(serverConfig, chain, serv, log) + if err != nil { + return err + } + _, err = mkP2PNotary(serverConfig, chain, serv, log) + if err != nil { + return err + } + rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) errChan := make(chan error) go serv.Start(errChan) @@ -369,7 +462,7 @@ Main: errChan <- fmt.Errorf("error while restarting rpc-server: %w", serverErr) break } - rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, serv.GetOracle(), log) + rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) rpcServer.Start(errChan) } case <-grace.Done(): diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index d24004722..3573d1286 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -22,7 +22,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/rpc/response/result/subscriptions" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" - "github.com/nspcc-dev/neo-go/pkg/vm" uatomic "go.uber.org/atomic" ) @@ -41,7 +40,7 @@ type FakeChain struct { MaxVerificationGAS int64 NotaryContractScriptHash util.Uint160 NotaryDepositExpiration uint32 - PostBlock []func(blockchainer.Blockchainer, *mempool.Pool, *block.Block) + PostBlock []func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block) UtilityTokenBalance *big.Int } @@ -104,8 +103,8 @@ func (chain *FakeChain) IsTxStillRelevant(t *transaction.Transaction, txpool *me panic("TODO") } -// InitVerificationVM initializes VM for witness check. -func (chain *FakeChain) InitVerificationVM(v *vm.VM, getContract func(util.Uint160) (*state.Contract, error), hash util.Uint160, witness *transaction.Witness) error { +// InitVerificationContext initializes context for witness check. +func (chain *FakeChain) InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error { panic("TODO") } @@ -140,11 +139,6 @@ func (chain *FakeChain) GetNotaryBalance(acc util.Uint160) *big.Int { panic("TODO") } -// GetPolicer implements Blockchainer interface. -func (chain *FakeChain) GetPolicer() blockchainer.Policer { - return chain -} - // GetBaseExecFee implements Policer interface. func (chain *FakeChain) GetBaseExecFee() int64 { return interop.DefaultBaseExecFee @@ -164,12 +158,12 @@ func (chain *FakeChain) GetMaxVerificationGAS() int64 { } // PoolTxWithData implements Blockchainer interface. -func (chain *FakeChain) PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(bc blockchainer.Blockchainer, t *transaction.Transaction, data interface{}) error) error { +func (chain *FakeChain) PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data interface{}) error) error { return chain.poolTxWithData(t, data, mp) } // RegisterPostBlock implements Blockchainer interface. -func (chain *FakeChain) RegisterPostBlock(f func(blockchainer.Blockchainer, *mempool.Pool, *block.Block)) { +func (chain *FakeChain) RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) { chain.PostBlock = append(chain.PostBlock, f) } @@ -326,18 +320,13 @@ func (chain *FakeChain) GetStateModule() blockchainer.StateRoot { return nil } -// GetStateSyncModule implements Blockchainer interface. -func (chain *FakeChain) GetStateSyncModule() blockchainer.StateSync { - return &FakeStateSync{} -} - // GetStorageItem implements Blockchainer interface. func (chain *FakeChain) GetStorageItem(id int32, key []byte) state.StorageItem { panic("TODO") } // GetTestVM implements Blockchainer interface. -func (chain *FakeChain) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) (*vm.VM, func()) { +func (chain *FakeChain) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *interop.Context { panic("TODO") } diff --git a/internal/testchain/transaction.go b/internal/testchain/transaction.go index 5797bee08..c4636f08d 100644 --- a/internal/testchain/transaction.go +++ b/internal/testchain/transaction.go @@ -120,7 +120,7 @@ func SignTxCommittee(bc blockchainer.Blockchainer, txs ...*transaction.Transacti func signTxGeneric(bc blockchainer.Blockchainer, sign func(hash.Hashable) []byte, verif []byte, txs ...*transaction.Transaction) { for _, tx := range txs { size := io.GetVarSize(tx) - netFee, sizeDelta := fee.Calculate(bc.GetPolicer().GetBaseExecFee(), verif) + netFee, sizeDelta := fee.Calculate(bc.GetBaseExecFee(), verif) tx.NetworkFee += netFee size += sizeDelta tx.NetworkFee += int64(size) * bc.FeePerByte() diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index c1dff3105..1edcb8a45 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -14,6 +14,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config/netmode" coreb "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" + "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -43,6 +44,22 @@ const nsInMs = 1000000 // Category is message category for extensible payloads. const Category = "dBFT" +// Ledger is the interface to Blockchain sufficient for Service. +type Ledger interface { + AddBlock(block *coreb.Block) error + ApplyPolicyToTxSet([]*transaction.Transaction) []*transaction.Transaction + GetMemPool() *mempool.Pool + GetNextBlockValidators() ([]*keys.PublicKey, error) + GetStateModule() blockchainer.StateRoot + GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) + GetValidators() ([]*keys.PublicKey, error) + PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error + SubscribeForBlocks(ch chan<- *coreb.Block) + UnsubscribeFromBlocks(ch chan<- *coreb.Block) + interop.Ledger + mempool.Feer +} + // Service represents consensus instance. type Service interface { // Start initializes dBFT and starts event loop for consensus service. @@ -52,7 +69,7 @@ type Service interface { Shutdown() // OnPayload is a callback to notify Service about new received payload. - OnPayload(p *npayload.Extensible) + OnPayload(p *npayload.Extensible) error // OnTransaction is a callback to notify Service about new received transaction. OnTransaction(tx *transaction.Transaction) } @@ -92,8 +109,8 @@ type Config struct { // Broadcast is a callback which is called to notify server // about new consensus payload to sent. Broadcast func(p *npayload.Extensible) - // Chain is a core.Blockchainer instance. - Chain blockchainer.Blockchainer + // Chain is a Ledger instance. + Chain Ledger // ProtocolConfiguration contains protocol settings. ProtocolConfiguration config.ProtocolConfiguration // RequestTx is a callback to which will be called @@ -129,10 +146,6 @@ func NewService(cfg Config) (Service, error) { finished: make(chan struct{}), } - if cfg.Wallet == nil { - return srv, nil - } - var err error if srv.wallet, err = wallet.NewWalletFromFile(cfg.Wallet.Path); err != nil { @@ -369,26 +382,27 @@ func (s *service) payloadFromExtensible(ep *npayload.Extensible) *Payload { } // OnPayload handles Payload receive. -func (s *service) OnPayload(cp *npayload.Extensible) { +func (s *service) OnPayload(cp *npayload.Extensible) error { log := s.log.With(zap.Stringer("hash", cp.Hash())) p := s.payloadFromExtensible(cp) // decode payload data into message if err := p.decodeData(); err != nil { log.Info("can't decode payload data", zap.Error(err)) - return + return nil } if !s.validatePayload(p) { log.Info("can't validate payload") - return + return nil } if s.dbft == nil || !s.started.Load() { log.Debug("dbft is inactive or not started yet") - return + return nil } s.messages <- *p + return nil } func (s *service) OnTransaction(tx *transaction.Transaction) { diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 50ceca6f2..db3601867 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -13,7 +13,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/core" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/fee" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/storage" @@ -351,7 +350,7 @@ func TestService_OnPayload(t *testing.T) { p.encodeData() // sender is invalid - srv.OnPayload(&p.Extensible) + require.NoError(t, srv.OnPayload(&p.Extensible)) shouldNotReceive(t, srv.messages) p = new(Payload) @@ -359,16 +358,17 @@ func TestService_OnPayload(t *testing.T) { p.Sender = priv.GetScriptHash() p.SetPayload(&prepareRequest{}) require.NoError(t, p.Sign(priv)) - srv.OnPayload(&p.Extensible) + require.NoError(t, srv.OnPayload(&p.Extensible)) shouldReceive(t, srv.messages) } func TestVerifyBlock(t *testing.T) { srv := newTestService(t) + bc := srv.Chain.(*core.Blockchain) srv.lastTimestamp = 1 t.Run("good empty", func(t *testing.T) { - b := testchain.NewBlock(t, srv.Chain, 1, 0) + b := testchain.NewBlock(t, bc, 1, 0) require.True(t, srv.verifyBlock(&neoBlock{Block: *b})) }) t.Run("good pooled tx", func(t *testing.T) { @@ -377,7 +377,7 @@ func TestVerifyBlock(t *testing.T) { addSender(t, tx) signTx(t, srv.Chain, tx) require.NoError(t, srv.Chain.PoolTx(tx)) - b := testchain.NewBlock(t, srv.Chain, 1, 0, tx) + b := testchain.NewBlock(t, bc, 1, 0, tx) require.True(t, srv.verifyBlock(&neoBlock{Block: *b})) }) t.Run("good non-pooled tx", func(t *testing.T) { @@ -385,7 +385,7 @@ func TestVerifyBlock(t *testing.T) { tx.ValidUntilBlock = 1 addSender(t, tx) signTx(t, srv.Chain, tx) - b := testchain.NewBlock(t, srv.Chain, 1, 0, tx) + b := testchain.NewBlock(t, bc, 1, 0, tx) require.True(t, srv.verifyBlock(&neoBlock{Block: *b})) }) t.Run("good conflicting tx", func(t *testing.T) { @@ -402,11 +402,11 @@ func TestVerifyBlock(t *testing.T) { signTx(t, srv.Chain, tx2) require.NoError(t, srv.Chain.PoolTx(tx1)) require.Error(t, srv.Chain.PoolTx(tx2)) - b := testchain.NewBlock(t, srv.Chain, 1, 0, tx2) + b := testchain.NewBlock(t, bc, 1, 0, tx2) require.True(t, srv.verifyBlock(&neoBlock{Block: *b})) }) t.Run("bad old", func(t *testing.T) { - b := testchain.NewBlock(t, srv.Chain, 1, 0) + b := testchain.NewBlock(t, bc, 1, 0) b.Index = srv.Chain.BlockHeight() require.False(t, srv.verifyBlock(&neoBlock{Block: *b})) }) @@ -417,11 +417,11 @@ func TestVerifyBlock(t *testing.T) { tx.ValidUntilBlock = 1 addSender(t, tx) signTx(t, srv.Chain, tx) - b := testchain.NewBlock(t, srv.Chain, 1, 0, tx) + b := testchain.NewBlock(t, bc, 1, 0, tx) require.False(t, srv.verifyBlock(&neoBlock{Block: *b})) }) t.Run("bad timestamp", func(t *testing.T) { - b := testchain.NewBlock(t, srv.Chain, 1, 0) + b := testchain.NewBlock(t, bc, 1, 0) b.Timestamp = srv.lastTimestamp - 1 require.False(t, srv.verifyBlock(&neoBlock{Block: *b})) }) @@ -431,7 +431,7 @@ func TestVerifyBlock(t *testing.T) { addSender(t, tx) signTx(t, srv.Chain, tx) tx.Scripts[0].InvocationScript[16] = ^tx.Scripts[0].InvocationScript[16] - b := testchain.NewBlock(t, srv.Chain, 1, 0, tx) + b := testchain.NewBlock(t, bc, 1, 0, tx) require.False(t, srv.verifyBlock(&neoBlock{Block: *b})) }) t.Run("bad big sys fee", func(t *testing.T) { @@ -442,7 +442,7 @@ func TestVerifyBlock(t *testing.T) { addSender(t, txes[i]) signTx(t, srv.Chain, txes[i]) } - b := testchain.NewBlock(t, srv.Chain, 1, 0, txes...) + b := testchain.NewBlock(t, bc, 1, 0, txes...) require.False(t, srv.verifyBlock(&neoBlock{Block: *b})) }) } @@ -532,7 +532,7 @@ func addSender(t *testing.T, txs ...*transaction.Transaction) { } } -func signTx(t *testing.T, bc blockchainer.Blockchainer, txs ...*transaction.Transaction) { +func signTx(t *testing.T, bc Ledger, txs ...*transaction.Transaction) { validators := make([]*keys.PublicKey, 4) privNetKeys := make([]*keys.PrivateKey, 4) for i := 0; i < 4; i++ { @@ -544,7 +544,7 @@ func signTx(t *testing.T, bc blockchainer.Blockchainer, txs ...*transaction.Tran require.NoError(t, err) for _, tx := range txs { size := io.GetVarSize(tx) - netFee, sizeDelta := fee.Calculate(bc.GetPolicer().GetBaseExecFee(), rawScript) + netFee, sizeDelta := fee.Calculate(bc.GetBaseExecFee(), rawScript) tx.NetworkFee += +netFee size += sizeDelta tx.NetworkFee += int64(size) * bc.FeePerByte() diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 300f6a993..ad382a49a 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -150,7 +150,7 @@ type Blockchain struct { // postBlock is a set of callback methods which should be run under the Blockchain lock after new block is persisted. // Block's transactions are passed via mempool. - postBlock []func(blockchainer.Blockchainer, *mempool.Pool, *block.Block) + postBlock []func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block) sbCommittee keys.PublicKeys @@ -267,7 +267,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L contracts: *native.NewContracts(cfg), } - bc.stateRoot = stateroot.NewModule(bc, bc.log, bc.dao.Store) + bc.stateRoot = stateroot.NewModule(bc.GetConfig(), bc.VerifyWitness, bc.log, bc.dao.Store) bc.contracts.Designate.StateRootService = bc.stateRoot if err := bc.init(); err != nil { @@ -929,8 +929,8 @@ func (bc *Blockchain) GetStateModule() blockchainer.StateRoot { } // GetStateSyncModule returns new state sync service instance. -func (bc *Blockchain) GetStateSyncModule() blockchainer.StateSync { - return statesync.NewModule(bc, bc.log, bc.dao, bc.jumpToState) +func (bc *Blockchain) GetStateSyncModule() *statesync.Module { + return statesync.NewModule(bc, bc.stateRoot, bc.log, bc.dao, bc.jumpToState) } // storeBlock performs chain update using the block given, it executes all @@ -1157,7 +1157,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error atomic.StoreUint32(&bc.blockHeight, block.Index) bc.memPool.RemoveStale(func(tx *transaction.Transaction) bool { return bc.IsTxStillRelevant(tx, txpool, false) }, bc) for _, f := range bc.postBlock { - f(bc, txpool, block) + f(bc.IsTxStillRelevant, txpool, block) } if err := bc.updateExtensibleWhitelist(block.Index); err != nil { bc.lock.Unlock() @@ -2095,12 +2095,12 @@ func (bc *Blockchain) PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) } // PoolTxWithData verifies and tries to add given transaction with additional data into the mempool. -func (bc *Blockchain) PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(bc blockchainer.Blockchainer, tx *transaction.Transaction, data interface{}) error) error { +func (bc *Blockchain) PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(tx *transaction.Transaction, data interface{}) error) error { bc.lock.RLock() defer bc.lock.RUnlock() if verificationFunction != nil { - err := verificationFunction(bc, t, data) + err := verificationFunction(t, data) if err != nil { return err } @@ -2140,14 +2140,14 @@ func (bc *Blockchain) GetEnrollments() ([]state.Validator, error) { return bc.contracts.NEO.GetCandidates(bc.dao) } -// GetTestVM returns a VM setup for a test run of some sort of code and finalizer function. -func (bc *Blockchain) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) (*vm.VM, func()) { +// GetTestVM returns an interop context with VM set up for a test run. +func (bc *Blockchain) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *interop.Context { d := bc.dao.GetWrapped().(*dao.Simple) systemInterop := bc.newInteropContext(t, d, b, tx) vm := systemInterop.SpawnVM() vm.SetPriceGetter(systemInterop.GetPrice) vm.LoadToken = contract.LoadToken(systemInterop) - return vm, systemInterop.Finalize + return systemInterop } // Various witness verification errors. @@ -2162,8 +2162,8 @@ var ( ErrInvalidVerificationContract = errors.New("verification contract is missing `verify` method") ) -// InitVerificationVM initializes VM for witness check. -func (bc *Blockchain) InitVerificationVM(v *vm.VM, getContract func(util.Uint160) (*state.Contract, error), hash util.Uint160, witness *transaction.Witness) error { +// InitVerificationContext initializes context for witness check. +func (bc *Blockchain) InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error { if len(witness.VerificationScript) != 0 { if witness.ScriptHash() != hash { return ErrWitnessHashMismatch @@ -2175,9 +2175,9 @@ func (bc *Blockchain) InitVerificationVM(v *vm.VM, getContract func(util.Uint160 if err != nil { return fmt.Errorf("%w: %v", ErrInvalidVerification, err) } - v.LoadScriptWithHash(witness.VerificationScript, hash, callflag.ReadOnly) + ic.VM.LoadScriptWithHash(witness.VerificationScript, hash, callflag.ReadOnly) } else { - cs, err := getContract(hash) + cs, err := ic.GetContract(hash) if err != nil { return ErrUnknownVerificationContract } @@ -2191,7 +2191,8 @@ func (bc *Blockchain) InitVerificationVM(v *vm.VM, getContract func(util.Uint160 if md != nil { initOffset = md.Offset } - v.LoadNEFMethod(&cs.NEF, util.Uint160{}, hash, callflag.ReadOnly, + ic.Invocations[cs.Hash]++ + ic.VM.LoadNEFMethod(&cs.NEF, util.Uint160{}, hash, callflag.ReadOnly, true, verifyOffset, initOffset) } if len(witness.InvocationScript) != 0 { @@ -2199,7 +2200,7 @@ func (bc *Blockchain) InitVerificationVM(v *vm.VM, getContract func(util.Uint160 if err != nil { return fmt.Errorf("%w: %v", ErrInvalidInvocation, err) } - v.LoadScript(witness.InvocationScript) + ic.VM.LoadScript(witness.InvocationScript) } return nil } @@ -2223,7 +2224,7 @@ func (bc *Blockchain) verifyHashAgainstScript(hash util.Uint160, witness *transa vm.SetPriceGetter(interopCtx.GetPrice) vm.LoadToken = contract.LoadToken(interopCtx) vm.GasLimit = gas - if err := bc.InitVerificationVM(vm, interopCtx.GetContract, hash, witness); err != nil { + if err := bc.InitVerificationContext(interopCtx, hash, witness); err != nil { return 0, err } err := interopCtx.Exec() @@ -2331,17 +2332,10 @@ func (bc *Blockchain) P2PSigExtensionsEnabled() bool { // RegisterPostBlock appends provided function to the list of functions which should be run after new block // is stored. -func (bc *Blockchain) RegisterPostBlock(f func(blockchainer.Blockchainer, *mempool.Pool, *block.Block)) { +func (bc *Blockchain) RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) { bc.postBlock = append(bc.postBlock, f) } -// -- start Policer. - -// GetPolicer provides access to policy values via Policer interface. -func (bc *Blockchain) GetPolicer() blockchainer.Policer { - return bc -} - // GetBaseExecFee return execution price for `NOP`. func (bc *Blockchain) GetBaseExecFee() int64 { return bc.contracts.Policy.GetExecFeeFactorInternal(bc.dao) @@ -2359,5 +2353,3 @@ func (bc *Blockchain) GetStoragePrice() int64 { } return bc.contracts.Policy.GetStoragePriceInternal(bc.dao) } - -// -- end Policer. diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 21458d3ba..fee36ac0d 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -16,7 +16,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/chaindump" "github.com/nspcc-dev/neo-go/pkg/core/fee" "github.com/nspcc-dev/neo-go/pkg/core/interop/interopnames" @@ -1073,7 +1072,7 @@ func TestVerifyTx(t *testing.T) { } mp := mempool.New(10, 1, false) - verificationF := func(bc blockchainer.Blockchainer, tx *transaction.Transaction, data interface{}) error { + verificationF := func(tx *transaction.Transaction, data interface{}) error { if data.(int) > 5 { return errors.New("bad data") } diff --git a/pkg/core/blockchainer/blockchainer.go b/pkg/core/blockchainer/blockchainer.go index 1b12cd9c6..710d5dfb1 100644 --- a/pkg/core/blockchainer/blockchainer.go +++ b/pkg/core/blockchainer/blockchainer.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer/services" + "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -14,18 +15,19 @@ import ( "github.com/nspcc-dev/neo-go/pkg/rpc/response/result/subscriptions" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" - "github.com/nspcc-dev/neo-go/pkg/vm" ) // Blockchainer is an interface that abstract the implementation // of the blockchain. type Blockchainer interface { ApplyPolicyToTxSet([]*transaction.Transaction) []*transaction.Transaction + AddBlock(block *block.Block) error + AddHeaders(...*block.Header) error + BlockHeight() uint32 GetConfig() config.ProtocolConfiguration - Blockqueuer // Blockqueuer interface CalculateClaimable(h util.Uint160, endHeight uint32) (*big.Int, error) Close() - InitVerificationVM(v *vm.VM, getContract func(util.Uint160) (*state.Contract, error), hash util.Uint160, witness *transaction.Witness) error + InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error IsTxStillRelevant(t *transaction.Transaction, txpool *mempool.Pool, isPartialTx bool) bool HeaderHeight() uint32 GetBlock(hash util.Uint256) (*block.Block, error) @@ -53,22 +55,19 @@ type Blockchainer interface { GetTokenLastUpdated(acc util.Uint160) (map[int32]uint32, error) GetNotaryContractScriptHash() util.Uint160 GetNotaryBalance(acc util.Uint160) *big.Int - GetPolicer() Policer GetValidators() ([]*keys.PublicKey, error) GetStandByCommittee() keys.PublicKeys GetStandByValidators() keys.PublicKeys GetStateModule() StateRoot - GetStateSyncModule() StateSync GetStorageItem(id int32, key []byte) state.StorageItem GetStorageItems(id int32) ([]state.StorageItemWithKey, error) - GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) (*vm.VM, func()) + GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *interop.Context GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) SetOracle(service services.Oracle) mempool.Feer // fee interface ManagementContractHash() util.Uint160 PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error - PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(bc Blockchainer, t *transaction.Transaction, data interface{}) error) error - RegisterPostBlock(f func(Blockchainer, *mempool.Pool, *block.Block)) + PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data interface{}) error) error SetNotary(mod services.Notary) SubscribeForBlocks(ch chan<- *block.Block) SubscribeForExecutions(ch chan<- *state.AppExecResult) @@ -81,4 +80,9 @@ type Blockchainer interface { UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) UnsubscribeFromNotifications(ch chan<- *subscriptions.NotificationEvent) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) + // Policer. + GetBaseExecFee() int64 + GetMaxVerificationGAS() int64 + GetStoragePrice() int64 + FeePerByte() int64 } diff --git a/pkg/core/blockchainer/blockqueuer.go b/pkg/core/blockchainer/blockqueuer.go deleted file mode 100644 index bae45281f..000000000 --- a/pkg/core/blockchainer/blockqueuer.go +++ /dev/null @@ -1,10 +0,0 @@ -package blockchainer - -import "github.com/nspcc-dev/neo-go/pkg/core/block" - -// Blockqueuer is an interface for blockqueue. -type Blockqueuer interface { - AddBlock(block *block.Block) error - AddHeaders(...*block.Header) error - BlockHeight() uint32 -} diff --git a/pkg/core/blockchainer/policer.go b/pkg/core/blockchainer/policer.go deleted file mode 100644 index 29c3a3ca5..000000000 --- a/pkg/core/blockchainer/policer.go +++ /dev/null @@ -1,9 +0,0 @@ -package blockchainer - -// Policer is an interface that abstracts the implementation of policy methods. -type Policer interface { - GetBaseExecFee() int64 - GetMaxVerificationGAS() int64 - GetStoragePrice() int64 - FeePerByte() int64 -} diff --git a/pkg/core/blockchainer/services/oracle.go b/pkg/core/blockchainer/services/oracle.go index 102ea390b..426e03967 100644 --- a/pkg/core/blockchainer/services/oracle.go +++ b/pkg/core/blockchainer/services/oracle.go @@ -16,8 +16,8 @@ type Oracle interface { UpdateOracleNodes(keys.PublicKeys) // UpdateNativeContract updates oracle contract native script and hash. UpdateNativeContract([]byte, []byte, util.Uint160, int) - // Run runs oracle module. Must be invoked in a separate goroutine. - Run() + // Start runs oracle module. + Start() // Shutdown shutdowns oracle module. Shutdown() } diff --git a/pkg/core/blockchainer/state_root.go b/pkg/core/blockchainer/state_root.go index 0ba680da0..0328c3a63 100644 --- a/pkg/core/blockchainer/state_root.go +++ b/pkg/core/blockchainer/state_root.go @@ -3,14 +3,11 @@ package blockchainer import ( "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" - "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" ) // StateRoot represents local state root module. type StateRoot interface { - AddStateRoot(root *state.MPTRoot) error - CleanStorage() error CurrentLocalHeight() uint32 CurrentLocalStateRoot() util.Uint256 CurrentValidatedHeight() uint32 @@ -18,7 +15,4 @@ type StateRoot interface { GetState(root util.Uint256, key []byte) ([]byte, error) GetStateProof(root util.Uint256, key []byte) ([][]byte, error) GetStateRoot(height uint32) (*state.MPTRoot, error) - GetStateValidators(height uint32) keys.PublicKeys - SetUpdateValidatorsCallback(func(uint32, keys.PublicKeys)) - UpdateStateValidators(height uint32, pubs keys.PublicKeys) } diff --git a/pkg/core/chaindump/dump.go b/pkg/core/chaindump/dump.go index 6f5cd7259..54ae5d514 100644 --- a/pkg/core/chaindump/dump.go +++ b/pkg/core/chaindump/dump.go @@ -3,14 +3,23 @@ package chaindump import ( "fmt" + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/util" ) +// DumperRestorer in the interface to get/add blocks from/to. +type DumperRestorer interface { + AddBlock(block *block.Block) error + GetBlock(hash util.Uint256) (*block.Block, error) + GetConfig() config.ProtocolConfiguration + GetHeaderHash(int) util.Uint256 +} + // Dump writes count blocks from start to the provided writer. // Note: header needs to be written separately by client. -func Dump(bc blockchainer.Blockchainer, w *io.BinWriter, start, count uint32) error { +func Dump(bc DumperRestorer, w *io.BinWriter, start, count uint32) error { for i := start; i < start+count; i++ { bh := bc.GetHeaderHash(int(i)) b, err := bc.GetBlock(bh) @@ -31,7 +40,7 @@ func Dump(bc blockchainer.Blockchainer, w *io.BinWriter, start, count uint32) er // Restore restores blocks from provided reader. // f is called after addition of every block. -func Restore(bc blockchainer.Blockchainer, r *io.BinReader, skip, count uint32, f func(b *block.Block) error) error { +func Restore(bc DumperRestorer, r *io.BinReader, skip, count uint32, f func(b *block.Block) error) error { readBlock := func(r *io.BinReader) ([]byte, error) { var size = r.ReadU32LE() buf := make([]byte, size) diff --git a/pkg/core/helper_test.go b/pkg/core/helper_test.go index bc8bb785d..146860c51 100644 --- a/pkg/core/helper_test.go +++ b/pkg/core/helper_test.go @@ -635,12 +635,12 @@ func setTxSystemFee(bc *Blockchain, sysFee int64, tx *transaction.Transaction) { } ttx := *tx // prevent setting 'hash' field - v, f := bc.GetTestVM(trigger.Application, &ttx, b) - defer f() + ic := bc.GetTestVM(trigger.Application, &ttx, b) + defer ic.Finalize() - v.LoadWithFlags(tx.Script, callflag.All) - _ = v.Run() - tx.SystemFee = v.GasConsumed() + ic.VM.LoadWithFlags(tx.Script, callflag.All) + _ = ic.VM.Run() + tx.SystemFee = ic.VM.GasConsumed() } func signTxWithAccounts(chain *Blockchain, sysFee int64, tx *transaction.Transaction, accs ...*wallet.Account) { diff --git a/pkg/core/interop/context.go b/pkg/core/interop/context.go index daa202b94..93417ecc7 100644 --- a/pkg/core/interop/context.go +++ b/pkg/core/interop/context.go @@ -8,13 +8,14 @@ import ( "sort" "strings" + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/interop/interopnames" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag" "github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest" @@ -33,9 +34,22 @@ const ( DefaultBaseExecFee = 30 ) +// Ledger is the interface to Blockchain required for Context functionality. +type Ledger interface { + BlockHeight() uint32 + CurrentBlockHash() util.Uint256 + GetBaseExecFee() int64 + GetBlock(hash util.Uint256) (*block.Block, error) + GetConfig() config.ProtocolConfiguration + GetHeaderHash(int) util.Uint256 + GetStandByCommittee() keys.PublicKeys + GetStandByValidators() keys.PublicKeys + GetStoragePrice() int64 +} + // Context represents context in which interops are executed. type Context struct { - Chain blockchainer.Blockchainer + Chain Ledger Container hash.Hashable Network uint32 Natives []Contract @@ -56,14 +70,14 @@ type Context struct { } // NewContext returns new interop context. -func NewContext(trigger trigger.Type, bc blockchainer.Blockchainer, d dao.DAO, +func NewContext(trigger trigger.Type, bc Ledger, d dao.DAO, getContract func(dao.DAO, util.Uint160) (*state.Contract, error), natives []Contract, block *block.Block, tx *transaction.Transaction, log *zap.Logger) *Context { baseExecFee := int64(DefaultBaseExecFee) dao := d.GetWrapped() if bc != nil && (block == nil || block.Index != 0) { - baseExecFee = bc.GetPolicer().GetBaseExecFee() + baseExecFee = bc.GetBaseExecFee() } return &Context{ Chain: bc, diff --git a/pkg/core/interop_system.go b/pkg/core/interop_system.go index 7908e17d4..d5d3d70a3 100644 --- a/pkg/core/interop_system.go +++ b/pkg/core/interop_system.go @@ -125,7 +125,7 @@ func putWithContext(ic *interop.Context, stc *StorageContext, key []byte, value sizeInc = (len(si)-1)/4 + 1 + len(value) - len(si) } } - if !ic.VM.AddGas(int64(sizeInc) * ic.Chain.GetPolicer().GetStoragePrice()) { + if !ic.VM.AddGas(int64(sizeInc) * ic.Chain.GetStoragePrice()) { return errGasLimitExceeded } return ic.DAO.PutStorageItem(stc.ID, key, value) diff --git a/pkg/core/native/designate.go b/pkg/core/native/designate.go index ccf5767b8..08ed87013 100644 --- a/pkg/core/native/designate.go +++ b/pkg/core/native/designate.go @@ -8,7 +8,6 @@ import ( "sort" "sync/atomic" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer/services" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/interop" @@ -16,6 +15,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/native/nativenames" "github.com/nspcc-dev/neo-go/pkg/core/native/noderoles" "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/stateroot" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/smartcontract" @@ -43,7 +43,7 @@ type Designate struct { // NotaryService represents Notary node module. NotaryService atomic.Value // StateRootService represents StateRoot node module. - StateRootService blockchainer.StateRoot + StateRootService *stateroot.Module } type roleData struct { diff --git a/pkg/core/native/interop.go b/pkg/core/native/interop.go index cf12a419b..077a18aab 100644 --- a/pkg/core/native/interop.go +++ b/pkg/core/native/interop.go @@ -41,8 +41,8 @@ func Call(ic *interop.Context) error { return fmt.Errorf("missing call flags for native %d `%s` operation call: %05b vs %05b", version, m.MD.Name, ic.VM.Context().GetCallFlags(), m.RequiredFlags) } - invokeFee := m.CPUFee*ic.Chain.GetPolicer().GetBaseExecFee() + - m.StorageFee*ic.Chain.GetPolicer().GetStoragePrice() + invokeFee := m.CPUFee*ic.Chain.GetBaseExecFee() + + m.StorageFee*ic.Chain.GetStoragePrice() if !ic.VM.AddGas(invokeFee) { return errors.New("gas limit exceeded") } diff --git a/pkg/core/native/ledger.go b/pkg/core/native/ledger.go index 2d958d629..496ccc640 100644 --- a/pkg/core/native/ledger.go +++ b/pkg/core/native/ledger.go @@ -6,7 +6,6 @@ import ( "math/big" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/native/nativenames" @@ -145,17 +144,17 @@ func (l *Ledger) getTransactionFromBlock(ic *interop.Context, params []stackitem // isTraceableBlock defines whether we're able to give information about // the block with index specified. -func isTraceableBlock(bc blockchainer.Blockchainer, index uint32) bool { +func isTraceableBlock(bc interop.Ledger, index uint32) bool { height := bc.BlockHeight() MaxTraceableBlocks := bc.GetConfig().MaxTraceableBlocks return index <= height && index+MaxTraceableBlocks > height } // getBlockHashFromItem converts given stackitem.Item to block hash using given -// Blockchainer if needed. Interop functions accept both block numbers and +// Ledger if needed. Interop functions accept both block numbers and // block hashes as parameters, thus this function is needed. It's supposed to // be called within VM context, so it panics if anything goes wrong. -func getBlockHashFromItem(bc blockchainer.Blockchainer, item stackitem.Item) util.Uint256 { +func getBlockHashFromItem(bc interop.Ledger, item stackitem.Item) util.Uint256 { bigindex, err := item.TryInteger() if err == nil && bigindex.IsUint64() { index := bigindex.Uint64() diff --git a/pkg/core/native/management.go b/pkg/core/native/management.go index 2180f7aee..0213fcb06 100644 --- a/pkg/core/native/management.go +++ b/pkg/core/native/management.go @@ -198,7 +198,7 @@ func (m *Management) getNefAndManifestFromItems(ic *interop.Context, args []stac return nil, nil, fmt.Errorf("invalid manifest: %w", err) } - gas := ic.Chain.GetPolicer().GetStoragePrice() * int64(len(nefBytes)+len(manifestBytes)) + gas := ic.Chain.GetStoragePrice() * int64(len(nefBytes)+len(manifestBytes)) if isDeploy { fee := m.GetMinimumDeploymentFee(ic.DAO) if fee > gas { diff --git a/pkg/core/native/native_neo.go b/pkg/core/native/native_neo.go index 9d5b61514..200911ccc 100644 --- a/pkg/core/native/native_neo.go +++ b/pkg/core/native/native_neo.go @@ -10,7 +10,6 @@ import ( "strings" "sync/atomic" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/interop/runtime" @@ -244,7 +243,7 @@ func (n *NEO) Initialize(ic *interop.Context) error { // InitializeCache initializes all NEO cache with the proper values from storage. // Cache initialisation should be done apart from Initialize because Initialize is // called only when deploying native contracts. -func (n *NEO) InitializeCache(bc blockchainer.Blockchainer, d dao.DAO) error { +func (n *NEO) InitializeCache(bc interop.Ledger, d dao.DAO) error { var committee = keysWithVotes{} si := d.GetStorageItem(n.ID, prefixCommittee) if err := committee.DecodeBytes(si); err != nil { @@ -264,7 +263,7 @@ func (n *NEO) InitializeCache(bc blockchainer.Blockchainer, d dao.DAO) error { return nil } -func (n *NEO) updateCache(cvs keysWithVotes, bc blockchainer.Blockchainer) error { +func (n *NEO) updateCache(cvs keysWithVotes, bc interop.Ledger) error { n.committee.Store(cvs) var committee = n.GetCommitteeMembers() @@ -300,7 +299,7 @@ func (n *NEO) updateCommittee(ic *interop.Context) error { } // ShouldUpdateCommittee returns true if committee is updated at block h. -func ShouldUpdateCommittee(h uint32, bc blockchainer.Blockchainer) bool { +func ShouldUpdateCommittee(h uint32, bc interop.Ledger) bool { cfg := bc.GetConfig() r := len(cfg.StandbyCommittee) return h%uint32(r) == 0 @@ -936,7 +935,7 @@ func (n *NEO) getAccountState(ic *interop.Context, args []stackitem.Item) stacki } // ComputeNextBlockValidators returns an actual list of current validators. -func (n *NEO) ComputeNextBlockValidators(bc blockchainer.Blockchainer, d dao.DAO) (keys.PublicKeys, error) { +func (n *NEO) ComputeNextBlockValidators(bc interop.Ledger, d dao.DAO) (keys.PublicKeys, error) { if vals := n.validators.Load().(keys.PublicKeys); vals != nil { return vals.Copy(), nil } @@ -993,7 +992,7 @@ func toKeysWithVotes(pubs keys.PublicKeys) keysWithVotes { } // computeCommitteeMembers returns public keys of nodes in committee. -func (n *NEO) computeCommitteeMembers(bc blockchainer.Blockchainer, d dao.DAO) (keys.PublicKeys, keysWithVotes, error) { +func (n *NEO) computeCommitteeMembers(bc interop.Ledger, d dao.DAO) (keys.PublicKeys, keysWithVotes, error) { key := []byte{prefixVotersCount} si := d.GetStorageItem(n.ID, key) if si == nil { diff --git a/pkg/core/notary_test.go b/pkg/core/notary_test.go index 67b5bbd3d..236bae50b 100644 --- a/pkg/core/notary_test.go +++ b/pkg/core/notary_test.go @@ -13,7 +13,6 @@ import ( "github.com/nspcc-dev/neo-go/internal/testchain" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/native/noderoles" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -135,14 +134,14 @@ func TestNotary(t *testing.T) { require.NoError(t, err) bc.SetNotary(ntr1) - bc.RegisterPostBlock(func(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) { + bc.RegisterPostBlock(func(f func(*transaction.Transaction, *mempool.Pool, bool) bool, pool *mempool.Pool, b *block.Block) { ntr1.PostPersist() }) mp1.RunSubscriptions() - go ntr1.Run() + ntr1.Start() t.Cleanup(func() { - ntr1.Stop() + ntr1.Shutdown() mp1.StopSubscriptions() }) diff --git a/pkg/core/oracle_test.go b/pkg/core/oracle_test.go index c07b0b545..22f575fa6 100644 --- a/pkg/core/oracle_test.go +++ b/pkg/core/oracle_test.go @@ -445,14 +445,14 @@ func TestOracleFull(t *testing.T) { bc := initTestChain(t, nil, nil) acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two") mp := bc.GetMemPool() - orc.OnTransaction = func(tx *transaction.Transaction) { _ = mp.Add(tx, bc) } + orc.OnTransaction = func(tx *transaction.Transaction) error { return mp.Add(tx, bc) } bc.SetOracle(orc) cs := getOracleContractState(t, util.Uint160{}, 42) require.NoError(t, bc.contracts.Management.PutContractState(bc.dao, cs)) go bc.Run() - go orc.Run() + orc.Start() t.Cleanup(orc.Shutdown) bc.setNodesByRole(t, true, noderoles.Oracle, keys.PublicKeys{acc.PrivateKey().PublicKey()}) @@ -470,7 +470,7 @@ func TestNotYetRunningOracle(t *testing.T) { bc := initTestChain(t, nil, nil) acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two") mp := bc.GetMemPool() - orc.OnTransaction = func(tx *transaction.Transaction) { _ = mp.Add(tx, bc) } + orc.OnTransaction = func(tx *transaction.Transaction) error { return mp.Add(tx, bc) } bc.SetOracle(orc) cs := getOracleContractState(t, util.Uint160{}, 42) @@ -498,7 +498,7 @@ func TestNotYetRunningOracle(t *testing.T) { ids = []uint64{3} orc.RemoveRequests(ids) // 3 removed from pending -> 2, 4 in pending. - go orc.Run() + orc.Start() t.Cleanup(orc.Shutdown) require.Eventually(t, func() bool { return mp.Count() == 2 }, @@ -541,8 +541,9 @@ type responseWithSig struct { } func saveTxToChan(ch chan *transaction.Transaction) oracle.TxCallback { - return func(tx *transaction.Transaction) { + return func(tx *transaction.Transaction) error { ch <- tx + return nil } } diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index 86d3e7c54..fc6e68baf 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -6,11 +6,13 @@ import ( "fmt" "sync" + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" @@ -18,13 +20,17 @@ import ( ) type ( + // VerifierFunc is a function that allows to check witness of account + // for Hashable item with GAS limit. + VerifierFunc func(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) // Module represents module for local processing of state roots. Module struct { - Store *storage.MemCachedStore - network netmode.Magic - mpt *mpt.Trie - bc blockchainer.Blockchainer - log *zap.Logger + Store *storage.MemCachedStore + network netmode.Magic + srInHead bool + mpt *mpt.Trie + verifier VerifierFunc + log *zap.Logger currentLocal atomic.Value localHeight atomic.Uint32 @@ -45,12 +51,13 @@ type ( ) // NewModule returns new instance of stateroot module. -func NewModule(bc blockchainer.Blockchainer, log *zap.Logger, s *storage.MemCachedStore) *Module { +func NewModule(cfg config.ProtocolConfiguration, verif VerifierFunc, log *zap.Logger, s *storage.MemCachedStore) *Module { return &Module{ - network: bc.GetConfig().Magic, - bc: bc, - log: log, - Store: s, + network: cfg.Magic, + srInHead: cfg.StateRootInHeader, + verifier: verif, + log: log, + Store: s, } } @@ -191,7 +198,7 @@ func (s *Module) UpdateCurrentLocal(mpt *mpt.Trie, sr *state.MPTRoot) { s.mpt = mpt s.currentLocal.Store(sr.Root) s.localHeight.Store(sr.Index) - if s.bc.GetConfig().StateRootInHeader { + if s.srInHead { s.validatedHeight.Store(sr.Index) updateStateHeightMetric(sr.Index) } @@ -216,6 +223,6 @@ func (s *Module) verifyWitness(r *state.MPTRoot) error { s.mtx.Lock() h := s.getKeyCacheForHeight(r.Index).validatorsHash s.mtx.Unlock() - _, err := s.bc.VerifyWitness(h, r, &r.Witness[0], maxVerificationGAS) + _, err := s.verifier(h, r, &r.Witness[0], maxVerificationGAS) return err } diff --git a/pkg/core/stateroot/store.go b/pkg/core/stateroot/store.go index ee5a52f78..f059d1c31 100644 --- a/pkg/core/stateroot/store.go +++ b/pkg/core/stateroot/store.go @@ -83,7 +83,7 @@ func (s *Module) AddStateRoot(sr *state.MPTRoot) error { return err } s.validatedHeight.Store(sr.Index) - if !s.bc.GetConfig().StateRootInHeader { + if !s.srInHead { updateStateHeightMetric(sr.Index) } return nil diff --git a/pkg/core/stateroot_test.go b/pkg/core/stateroot_test.go index cb66c078e..29777f457 100644 --- a/pkg/core/stateroot_test.go +++ b/pkg/core/stateroot_test.go @@ -80,32 +80,32 @@ func TestStateRoot(t *testing.T) { tmpDir := t.TempDir() w := createAndWriteWallet(t, accs[0], filepath.Join(tmpDir, "w"), "pass") cfg := createStateRootConfig(w.Path(), "pass") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc, nil) + srv, err := stateroot.New(cfg, bc.stateRoot, zaptest.NewLogger(t), bc, nil) require.NoError(t, err) - require.EqualValues(t, 0, srv.CurrentValidatedHeight()) - r, err := srv.GetStateRoot(bc.BlockHeight()) + require.EqualValues(t, 0, bc.stateRoot.CurrentValidatedHeight()) + r, err := bc.stateRoot.GetStateRoot(bc.BlockHeight()) require.NoError(t, err) - require.Equal(t, r.Root, srv.CurrentLocalStateRoot()) + require.Equal(t, r.Root, bc.stateRoot.CurrentLocalStateRoot()) t.Run("invalid message", func(t *testing.T) { require.Error(t, srv.OnPayload(&payload.Extensible{Data: []byte{42}})) - require.EqualValues(t, 0, srv.CurrentValidatedHeight()) + require.EqualValues(t, 0, bc.stateRoot.CurrentValidatedHeight()) }) t.Run("drop zero index", func(t *testing.T) { - r, err := srv.GetStateRoot(0) + r, err := bc.stateRoot.GetStateRoot(0) require.NoError(t, err) data, err := testserdes.EncodeBinary(stateroot.NewMessage(stateroot.RootT, r)) require.NoError(t, err) require.NoError(t, srv.OnPayload(&payload.Extensible{Data: data})) - require.EqualValues(t, 0, srv.CurrentValidatedHeight()) + require.EqualValues(t, 0, bc.stateRoot.CurrentValidatedHeight()) }) t.Run("invalid height", func(t *testing.T) { - r, err := srv.GetStateRoot(1) + r, err := bc.stateRoot.GetStateRoot(1) require.NoError(t, err) r.Index = 10 data := testSignStateRoot(t, r, pubs, accs...) require.Error(t, srv.OnPayload(&payload.Extensible{Data: data})) - require.EqualValues(t, 0, srv.CurrentValidatedHeight()) + require.EqualValues(t, 0, bc.stateRoot.CurrentValidatedHeight()) }) t.Run("invalid signer", func(t *testing.T) { accInv, err := wallet.NewAccount() @@ -113,21 +113,21 @@ func TestStateRoot(t *testing.T) { pubs := keys.PublicKeys{accInv.PrivateKey().PublicKey()} require.NoError(t, accInv.ConvertMultisig(1, pubs)) transferTokenFromMultisigAccount(t, bc, accInv.Contract.ScriptHash(), bc.contracts.GAS.Hash, 1_0000_0000) - r, err := srv.GetStateRoot(1) + r, err := bc.stateRoot.GetStateRoot(1) require.NoError(t, err) data := testSignStateRoot(t, r, pubs, accInv) err = srv.OnPayload(&payload.Extensible{Data: data}) require.True(t, errors.Is(err, ErrWitnessHashMismatch), "got: %v", err) - require.EqualValues(t, 0, srv.CurrentValidatedHeight()) + require.EqualValues(t, 0, bc.stateRoot.CurrentValidatedHeight()) }) - r, err = srv.GetStateRoot(updateIndex + 1) + r, err = bc.stateRoot.GetStateRoot(updateIndex + 1) require.NoError(t, err) data := testSignStateRoot(t, r, pubs, accs...) require.NoError(t, srv.OnPayload(&payload.Extensible{Data: data})) - require.EqualValues(t, 2, srv.CurrentValidatedHeight()) + require.EqualValues(t, 2, bc.stateRoot.CurrentValidatedHeight()) - r, err = srv.GetStateRoot(updateIndex + 1) + r, err = bc.stateRoot.GetStateRoot(updateIndex + 1) require.NoError(t, err) require.NotEqual(t, 0, len(r.Witness)) require.Equal(t, h, r.Witness[0].ScriptHash()) @@ -148,14 +148,14 @@ func TestStateRootInitNonZeroHeight(t *testing.T) { tmpDir := t.TempDir() w := createAndWriteWallet(t, accs[0], filepath.Join(tmpDir, "w"), "pass") cfg := createStateRootConfig(w.Path(), "pass") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc, nil) + srv, err := stateroot.New(cfg, bc.stateRoot, zaptest.NewLogger(t), bc, nil) require.NoError(t, err) - r, err := srv.GetStateRoot(2) + r, err := bc.stateRoot.GetStateRoot(2) require.NoError(t, err) data := testSignStateRoot(t, r, pubs, accs...) require.NoError(t, srv.OnPayload(&payload.Extensible{Data: data})) - require.EqualValues(t, 2, srv.CurrentValidatedHeight()) - root = srv.CurrentLocalStateRoot() + require.EqualValues(t, 2, bc.stateRoot.CurrentValidatedHeight()) + root = bc.stateRoot.CurrentLocalStateRoot() }) bc2 := newTestChainWithCustomCfgAndStore(t, st, nil) @@ -194,12 +194,12 @@ func TestStateRootFull(t *testing.T) { var lastValidated atomic.Value var lastHeight atomic.Uint32 - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc, func(ep *payload.Extensible) { + srv, err := stateroot.New(cfg, bc.stateRoot, zaptest.NewLogger(t), bc, func(ep *payload.Extensible) { lastHeight.Store(ep.ValidBlockStart) lastValidated.Store(ep) }) require.NoError(t, err) - srv.Run() + srv.Start() t.Cleanup(srv.Shutdown) bc.setNodesByRole(t, true, noderoles.StateValidator, pubs) @@ -211,7 +211,7 @@ func TestStateRootFull(t *testing.T) { require.Eventually(t, func() bool { return lastHeight.Load() == 3 }, time.Second, time.Millisecond) checkVoteBroadcasted(t, bc, lastValidated.Load().(*payload.Extensible), 3, 1) - r, err := srv.GetStateRoot(2) + r, err := bc.stateRoot.GetStateRoot(2) require.NoError(t, err) require.NoError(t, srv.AddSignature(2, 0, accs[0].PrivateKey().SignHashable(uint32(netmode.UnitTestNet), r))) require.NotNil(t, lastValidated.Load().(*payload.Extensible)) @@ -220,7 +220,7 @@ func TestStateRootFull(t *testing.T) { require.NoError(t, testserdes.DecodeBinary(lastValidated.Load().(*payload.Extensible).Data, msg)) require.NotEqual(t, stateroot.RootT, msg.Type) // not a sender for this root - r, err = srv.GetStateRoot(3) + r, err = bc.stateRoot.GetStateRoot(3) require.NoError(t, err) require.Error(t, srv.AddSignature(2, 0, accs[0].PrivateKey().SignHashable(uint32(netmode.UnitTestNet), r))) require.NoError(t, srv.AddSignature(3, 0, accs[0].PrivateKey().SignHashable(uint32(netmode.UnitTestNet), r))) diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index ebf888e2a..e664fe9e2 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -23,10 +23,11 @@ import ( "fmt" "sync" + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/core/stateroot" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util" @@ -59,6 +60,16 @@ const ( blocksSynced ) +// Ledger is the interface required from Blockchain for Module to operate. +type Ledger interface { + AddHeaders(...*block.Header) error + BlockHeight() uint32 + GetConfig() config.ProtocolConfiguration + GetHeader(hash util.Uint256) (*block.Header, error) + GetHeaderHash(int) util.Uint256 + HeaderHeight() uint32 +} + // Module represents state sync module and aimed to gather state-related data to // perform an atomic state jump. type Module struct { @@ -74,9 +85,10 @@ type Module struct { // blockHeight is the index of the latest stored block. blockHeight uint32 - dao *dao.Simple - bc blockchainer.Blockchainer - mptpool *Pool + dao *dao.Simple + bc Ledger + stateMod *stateroot.Module + mptpool *Pool billet *mpt.Billet @@ -84,17 +96,19 @@ type Module struct { } // NewModule returns new instance of statesync module. -func NewModule(bc blockchainer.Blockchainer, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module { +func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module { if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) { return &Module{ dao: s, bc: bc, + stateMod: stateMod, syncStage: inactive, } } return &Module{ dao: s, bc: bc, + stateMod: stateMod, log: log, syncInterval: uint32(bc.GetConfig().StateSyncInterval), mptpool: NewPool(), @@ -146,7 +160,7 @@ func (s *Module) Init(currChainHeight uint32) error { // current chain's state until new state is completely fetched, outdated state-related data // will be removed from storage during (*Blockchain).jumpToState(...) execution. // All we need to do right now is to remove genesis-related MPT nodes. - err = s.bc.GetStateModule().CleanStorage() + err = s.stateMod.CleanStorage() if err != nil { return fmt.Errorf("failed to remove outdated MPT data from storage: %w", err) } @@ -201,7 +215,7 @@ func (s *Module) defineSyncStage() error { if s.blockHeight > s.syncPoint { s.syncStage |= mptSynced s.log.Info("MPT is in sync", - zap.Uint32("stateroot height", s.bc.GetStateModule().CurrentLocalHeight())) + zap.Uint32("stateroot height", s.stateMod.CurrentLocalHeight())) } else if s.syncStage&headersSynced != 0 { header, err := s.bc.GetHeader(s.bc.GetHeaderHash(int(s.syncPoint + 1))) if err != nil { diff --git a/pkg/neotest/basic.go b/pkg/neotest/basic.go index dc217f01e..750d72288 100644 --- a/pkg/neotest/basic.go +++ b/pkg/neotest/basic.go @@ -255,7 +255,7 @@ func addSystemFee(bc blockchainer.Blockchainer, tx *transaction.Transaction, sys } func addNetworkFee(bc blockchainer.Blockchainer, tx *transaction.Transaction, signers ...Signer) { - baseFee := bc.GetPolicer().GetBaseExecFee() + baseFee := bc.GetBaseExecFee() size := io.GetVarSize(tx) for _, sgr := range signers { netFee, sizeDelta := fee.Calculate(baseFee, sgr.Script()) @@ -335,12 +335,12 @@ func TestInvoke(bc blockchainer.Blockchainer, tx *transaction.Transaction) (*vm. // `GetTestVM` as well as `Run` can use transaction hash which will set cached value. // This is unwanted behaviour so we explicitly copy transaction to perform execution. ttx := *tx - v, f := bc.GetTestVM(trigger.Application, &ttx, b) - defer f() + ic := bc.GetTestVM(trigger.Application, &ttx, b) + defer ic.Finalize() - v.LoadWithFlags(tx.Script, callflag.All) - err = v.Run() - return v, err + ic.VM.LoadWithFlags(tx.Script, callflag.All) + err = ic.VM.Run() + return ic.VM, err } // GetTransaction returns transaction and its height by the specified hash. diff --git a/pkg/neotest/client.go b/pkg/neotest/client.go index 330cf8a3e..7767b02c8 100644 --- a/pkg/neotest/client.go +++ b/pkg/neotest/client.go @@ -41,12 +41,12 @@ func (e *Executor) ValidatorInvoker(h util.Uint160) *ContractInvoker { func (c *ContractInvoker) TestInvoke(t *testing.T, method string, args ...interface{}) (*vm.Stack, error) { tx := c.PrepareInvokeNoSign(t, method, args...) b := c.NewUnsignedBlock(t, tx) - v, f := c.Chain.GetTestVM(trigger.Application, tx, b) - t.Cleanup(f) + ic := c.Chain.GetTestVM(trigger.Application, tx, b) + t.Cleanup(ic.Finalize) - v.LoadWithFlags(tx.Script, callflag.All) - err := v.Run() - return v.Estack(), err + ic.VM.LoadWithFlags(tx.Script, callflag.All) + err := ic.VM.Run() + return ic.VM.Estack(), err } // WithSigners creates new client with the provided signer. diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index 3b92e62f6..e63947d9e 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -4,17 +4,23 @@ import ( "sync" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "go.uber.org/atomic" "go.uber.org/zap" ) +// Blockqueuer is the interface for block queue. +type Blockqueuer interface { + AddBlock(block *block.Block) error + AddHeaders(...*block.Header) error + BlockHeight() uint32 +} + type blockQueue struct { log *zap.Logger queueLock sync.Mutex queue []*block.Block checkBlocks chan struct{} - chain blockchainer.Blockqueuer + chain Blockqueuer relayF func(*block.Block) discarded *atomic.Bool len int @@ -26,7 +32,7 @@ const ( blockCacheSize = 2000 ) -func newBlockQueue(capacity int, bc blockchainer.Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue { +func newBlockQueue(capacity int, bc Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue { if log == nil { return nil } diff --git a/pkg/network/extpool/pool.go b/pkg/network/extpool/pool.go index 3044c461a..b6c4cd3e8 100644 --- a/pkg/network/extpool/pool.go +++ b/pkg/network/extpool/pool.go @@ -5,11 +5,19 @@ import ( "errors" "sync" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/util" ) +// Ledger is enough of Blockchain to satisfy Pool. +type Ledger interface { + BlockHeight() uint32 + IsExtensibleAllowed(util.Uint160) bool + VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) +} + // Pool represents pool of extensible payloads. type Pool struct { lock sync.RWMutex @@ -17,11 +25,11 @@ type Pool struct { senders map[util.Uint160]*list.List // singleCap represents maximum number of payloads from the single sender. singleCap int - chain blockchainer.Blockchainer + chain Ledger } // New returns new payload pool using provided chain. -func New(bc blockchainer.Blockchainer, capacity int) *Pool { +func New(bc Ledger, capacity int) *Pool { if capacity <= 0 { panic("invalid capacity") } diff --git a/pkg/network/extpool/pool_test.go b/pkg/network/extpool/pool_test.go index 754f4f493..49486caed 100644 --- a/pkg/network/extpool/pool_test.go +++ b/pkg/network/extpool/pool_test.go @@ -4,7 +4,6 @@ import ( "errors" "testing" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/network/payload" @@ -116,7 +115,7 @@ func (p *Pool) testAdd(t *testing.T, expectedOk bool, expectedErr error, ep *pay } type testChain struct { - blockchainer.Blockchainer + Ledger height uint32 verifyWitness func(util.Uint160) bool isAllowed func(util.Uint160) bool diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 23c399c91..cc2e8ee53 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -10,6 +10,7 @@ import ( "github.com/nspcc-dev/neo-go/internal/fakechain" "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/payload" @@ -192,9 +193,13 @@ func newTestServer(t *testing.T, serverConfig ServerConfig) *Server { } func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protocolCfg func(*config.ProtocolConfiguration)) *Server { - s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), zaptest.NewLogger(t), - newFakeTransp, newFakeConsensus, newTestDiscovery) + s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), new(fakechain.FakeStateSync), zaptest.NewLogger(t), + newFakeTransp, newTestDiscovery) require.NoError(t, err) + if serverConfig.Wallet != nil { + cons := new(fakeConsensus) + s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) + } t.Cleanup(s.discovery.Close) return s } diff --git a/pkg/network/notary_feer.go b/pkg/network/notary_feer.go index 97e179234..1fbc82111 100644 --- a/pkg/network/notary_feer.go +++ b/pkg/network/notary_feer.go @@ -3,13 +3,12 @@ package network import ( "math/big" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/util" ) // NotaryFeer implements mempool.Feer interface for Notary balance handling. type NotaryFeer struct { - bc blockchainer.Blockchainer + bc Ledger } // FeePerByte implements mempool.Feer interface. @@ -33,7 +32,7 @@ func (f NotaryFeer) P2PSigExtensionsEnabled() bool { } // NewNotaryFeer returns new NotaryFeer instance. -func NewNotaryFeer(bc blockchainer.Blockchainer) NotaryFeer { +func NewNotaryFeer(bc Ledger) NotaryFeer { return NotaryFeer{ bc: bc, } diff --git a/pkg/network/server.go b/pkg/network/server.go index 3eb6e06f7..98815b8af 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "fmt" + "math/big" mrand "math/rand" "net" "sort" @@ -12,10 +13,9 @@ import ( "sync" "time" + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" - "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/mpt" @@ -24,9 +24,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" - "github.com/nspcc-dev/neo-go/pkg/services/notary" - "github.com/nspcc-dev/neo-go/pkg/services/oracle" - "github.com/nspcc-dev/neo-go/pkg/services/stateroot" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" "go.uber.org/zap" @@ -52,6 +49,37 @@ var ( ) type ( + // Ledger is everything Server needs from the blockchain. + Ledger interface { + extpool.Ledger + mempool.Feer + Blockqueuer + GetBlock(hash util.Uint256) (*block.Block, error) + GetConfig() config.ProtocolConfiguration + GetHeader(hash util.Uint256) (*block.Header, error) + GetHeaderHash(int) util.Uint256 + GetMaxVerificationGAS() int64 + GetMemPool() *mempool.Pool + GetNotaryBalance(acc util.Uint160) *big.Int + GetNotaryContractScriptHash() util.Uint160 + GetNotaryDepositExpiration(acc util.Uint160) uint32 + GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) + HasBlock(util.Uint256) bool + HeaderHeight() uint32 + P2PSigExtensionsEnabled() bool + PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error + PoolTxWithData(t *transaction.Transaction, data interface{}, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data interface{}) error) error + RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) + SubscribeForBlocks(ch chan<- *block.Block) + UnsubscribeFromBlocks(ch chan<- *block.Block) + } + + // Service is a service abstraction (oracle, state root, consensus, etc). + Service interface { + Start() + Shutdown() + } + // Server represents the local Node in the network. Its transport could // be of any kind. Server struct { @@ -68,15 +96,17 @@ type ( transport Transporter discovery Discoverer - chain blockchainer.Blockchainer + chain Ledger bQueue *blockQueue bSyncQueue *blockQueue - consensus consensus.Service mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer - notaryModule *notary.Notary + services []Service + extensHandlers map[string]func(*payload.Extensible) error + extensHighPrio string + txCallback func(*transaction.Transaction) txInLock sync.Mutex txInMap map[util.Uint256]struct{} @@ -97,9 +127,7 @@ type ( syncReached *atomic.Bool - oracle *oracle.Oracle - stateRoot stateroot.Service - stateSync blockchainer.StateSync + stateSync StateSync log *zap.Logger } @@ -117,15 +145,14 @@ func randomID() uint32 { } // NewServer returns a new Server, initialized with the given configuration. -func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Logger) (*Server, error) { - return newServerFromConstructors(config, chain, log, func(s *Server) Transporter { +func NewServer(config ServerConfig, chain Ledger, stSync StateSync, log *zap.Logger) (*Server, error) { + return newServerFromConstructors(config, chain, stSync, log, func(s *Server) Transporter { return NewTCPTransport(s, net.JoinHostPort(s.ServerConfig.Address, strconv.Itoa(int(s.ServerConfig.Port))), s.log) - }, consensus.NewService, newDefaultDiscovery) + }, newDefaultDiscovery) } -func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Logger, +func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSync, log *zap.Logger, newTransport func(*Server) Transporter, - newConsensus func(consensus.Config) (consensus.Service, error), newDiscovery func([]string, time.Duration, Transporter) Discoverer, ) (*Server, error) { if log == nil { @@ -154,91 +181,23 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), log: log, transactions: make(chan *transaction.Transaction, 64), + extensHandlers: make(map[string]func(*payload.Extensible) error), + stateSync: stSync, } if chain.P2PSigExtensionsEnabled() { s.notaryFeer = NewNotaryFeer(chain) s.notaryRequestPool = mempool.New(chain.GetConfig().P2PNotaryRequestPayloadPoolSize, 1, true) - chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, txpool *mempool.Pool, _ *block.Block) { + chain.RegisterPostBlock(func(isRelevant func(*transaction.Transaction, *mempool.Pool, bool) bool, txpool *mempool.Pool, _ *block.Block) { s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool { - return bc.IsTxStillRelevant(t, txpool, true) + return isRelevant(t, txpool, true) }, s.notaryFeer) }) - if config.P2PNotaryCfg.Enabled { - cfg := notary.Config{ - MainCfg: config.P2PNotaryCfg, - Chain: chain, - Log: log, - } - n, err := notary.NewNotary(cfg, s.network, s.notaryRequestPool, func(tx *transaction.Transaction) error { - if err := s.RelayTxn(tx); err != nil { - return fmt.Errorf("can't relay completed notary transaction: hash %s, error: %w", tx.Hash().StringLE(), err) - } - return nil - }) - if err != nil { - return nil, fmt.Errorf("failed to create Notary module: %w", err) - } - s.notaryModule = n - chain.SetNotary(n) - } - } else if config.P2PNotaryCfg.Enabled { - return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enabled") } s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { s.tryStartServices() }) - if config.StateRootCfg.Enabled && chain.GetConfig().StateRootInHeader { - return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled") - } - - sr, err := stateroot.New(config.StateRootCfg, s.log, chain, s.handleNewPayload) - if err != nil { - return nil, fmt.Errorf("can't initialize StateRoot service: %w", err) - } - s.stateRoot = sr - - sSync := chain.GetStateSyncModule() - s.stateSync = sSync - s.bSyncQueue = newBlockQueue(maxBlockBatch, sSync, log, nil) - - if config.OracleCfg.Enabled { - orcCfg := oracle.Config{ - Log: log, - Network: config.Net, - MainCfg: config.OracleCfg, - Chain: chain, - } - orc, err := oracle.NewOracle(orcCfg) - if err != nil { - return nil, fmt.Errorf("can't initialize Oracle module: %w", err) - } - orc.SetOnTransaction(func(tx *transaction.Transaction) { - if err := s.RelayTxn(tx); err != nil { - orc.Log.Error("can't pool oracle tx", - zap.String("hash", tx.Hash().StringLE()), - zap.Error(err)) - } - }) - s.oracle = orc - chain.SetOracle(orc) - } - - srv, err := newConsensus(consensus.Config{ - Logger: log, - Broadcast: s.handleNewPayload, - Chain: chain, - ProtocolConfiguration: chain.GetConfig(), - RequestTx: s.requestTx, - Wallet: config.Wallet, - - TimePerBlock: config.TimePerBlock, - }) - if err != nil { - return nil, err - } - - s.consensus = srv + s.bSyncQueue = newBlockQueue(maxBlockBatch, s.stateSync, log, nil) if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", @@ -299,20 +258,13 @@ func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.transport.Close() s.discovery.Close() - s.consensus.Shutdown() for _, p := range s.getPeers(nil) { p.Disconnect(errServerShutdown) } s.bQueue.discard() s.bSyncQueue.discard() - if s.StateRootCfg.Enabled { - s.stateRoot.Shutdown() - } - if s.oracle != nil { - s.oracle.Shutdown() - } - if s.notaryModule != nil { - s.notaryModule.Stop() + for _, svc := range s.services { + svc.Shutdown() } if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.StopSubscriptions() @@ -320,14 +272,27 @@ func (s *Server) Shutdown() { close(s.quit) } -// GetOracle returns oracle module instance. -func (s *Server) GetOracle() *oracle.Oracle { - return s.oracle +// AddService allows to add a service to be started/stopped by Server. +func (s *Server) AddService(svc Service) { + s.services = append(s.services, svc) } -// GetStateRoot returns state root service instance. -func (s *Server) GetStateRoot() stateroot.Service { - return s.stateRoot +// AddExtensibleService register a service that handles extensible payload of some kind. +func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) { + s.extensHandlers[category] = handler + s.AddService(svc) +} + +// AddExtensibleHPService registers a high-priority service that handles extensible payload of some kind. +func (s *Server) AddExtensibleHPService(svc Service, category string, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) { + s.txCallback = txCallback + s.extensHighPrio = category + s.AddExtensibleService(svc, category, handler) +} + +// GetNotaryPool allows to retrieve notary pool, if it's configured. +func (s *Server) GetNotaryPool() *mempool.Pool { + return s.notaryRequestPool } // UnconnectedPeers returns a list of peers that are in the discovery peer list @@ -460,20 +425,11 @@ func (s *Server) tryStartServices() { if s.IsInSync() && s.syncReached.CAS(false, true) { s.log.Info("node reached synchronized state, starting services") - if s.Wallet != nil { - s.consensus.Start() - } - if s.StateRootCfg.Enabled { - s.stateRoot.Run() - } - if s.oracle != nil { - go s.oracle.Run() - } if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber. } - if s.notaryModule != nil { - go s.notaryModule.Run() + for _, svc := range s.services { + svc.Start() } } } @@ -674,7 +630,7 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error { return nil } var ( - bq blockchainer.Blockqueuer = s.chain + bq Blockqueuer = s.chain requestMPTNodes bool ) if s.stateSync.IsActive() { @@ -974,25 +930,26 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { if !ok { // payload is already in cache return nil } - switch e.Category { - case consensus.Category: - s.consensus.OnPayload(e) - case stateroot.Category: - err := s.stateRoot.OnPayload(e) + handler := s.extensHandlers[e.Category] + if handler != nil { + err = handler(e) if err != nil { return err } - default: - return errors.New("invalid category") } + s.advertiseExtensible(e) + return nil +} +func (s *Server) advertiseExtensible(e *payload.Extensible) { msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()})) - if e.Category == consensus.Category { + if e.Category == s.extensHighPrio { + // It's high priority because it directly affects consensus process, + // even though it's just an inv. s.broadcastHPMessage(msg) } else { s.broadcastMessage(msg) } - return nil } // handleTxCmd processes received transaction. @@ -1008,8 +965,10 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { } s.txInMap[tx.Hash()] = struct{}{} s.txInLock.Unlock() + if s.txCallback != nil { + s.txCallback(tx) + } if s.verifyAndPoolTX(tx) == nil { - s.consensus.OnTransaction(tx) s.broadcastTX(tx, nil) } s.txInLock.Lock() @@ -1041,21 +1000,21 @@ func (s *Server) RelayP2PNotaryRequest(r *payload.P2PNotaryRequest) error { // verifyAndPoolNotaryRequest verifies NotaryRequest payload and adds it to the payload mempool. func (s *Server) verifyAndPoolNotaryRequest(r *payload.P2PNotaryRequest) error { - return s.chain.PoolTxWithData(r.FallbackTransaction, r, s.notaryRequestPool, s.notaryFeer, verifyNotaryRequest) + return s.chain.PoolTxWithData(r.FallbackTransaction, r, s.notaryRequestPool, s.notaryFeer, s.verifyNotaryRequest) } // verifyNotaryRequest is a function for state-dependant P2PNotaryRequest payload verification which is executed before ordinary blockchain's verification. -func verifyNotaryRequest(bc blockchainer.Blockchainer, _ *transaction.Transaction, data interface{}) error { +func (s *Server) verifyNotaryRequest(_ *transaction.Transaction, data interface{}) error { r := data.(*payload.P2PNotaryRequest) payer := r.FallbackTransaction.Signers[1].Account - if _, err := bc.VerifyWitness(payer, r, &r.Witness, bc.GetPolicer().GetMaxVerificationGAS()); err != nil { + if _, err := s.chain.VerifyWitness(payer, r, &r.Witness, s.chain.GetMaxVerificationGAS()); err != nil { return fmt.Errorf("bad P2PNotaryRequest payload witness: %w", err) } - notaryHash := bc.GetNotaryContractScriptHash() + notaryHash := s.chain.GetNotaryContractScriptHash() if r.FallbackTransaction.Sender() != notaryHash { return errors.New("P2PNotary contract should be a sender of the fallback transaction") } - depositExpiration := bc.GetNotaryDepositExpiration(payer) + depositExpiration := s.chain.GetNotaryDepositExpiration(payer) if r.FallbackTransaction.ValidUntilBlock >= depositExpiration { return fmt.Errorf("fallback transaction is valid after deposit is unlocked: ValidUntilBlock is %d, deposit lock expires at %d", r.FallbackTransaction.ValidUntilBlock, depositExpiration) } @@ -1110,7 +1069,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // 1. Block range is divided into chunks of payload.MaxHashesCount. // 2. Send requests for chunk in increasing order. // 3. After all requests were sent, request random height. -func (s *Server) requestBlocks(bq blockchainer.Blockqueuer, p Peer) error { +func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error { pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, pl)) } @@ -1277,25 +1236,21 @@ func (s *Server) tryInitStateSync() { } } } -func (s *Server) handleNewPayload(p *payload.Extensible) { + +// BroadcastExtensible add locally-generated Extensible payload to the pool +// and advertises it to peers. +func (s *Server) BroadcastExtensible(p *payload.Extensible) { _, err := s.extensiblePool.Add(p) if err != nil { s.log.Error("created payload is not valid", zap.Error(err)) return } - msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{p.Hash()})) - switch p.Category { - case consensus.Category: - // It's high priority because it directly affects consensus process, - // even though it's just an inv. - s.broadcastHPMessage(msg) - default: - s.broadcastMessage(msg) - } + s.advertiseExtensible(p) } -func (s *Server) requestTx(hashes ...util.Uint256) { +// RequestTx asks for given transactions from Server peers using GetData message. +func (s *Server) RequestTx(hashes ...util.Uint256) { if len(hashes) == 0 { return } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 6cc83d3fc..6a6d290da 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -38,12 +38,12 @@ type fakeConsensus struct { var _ consensus.Service = (*fakeConsensus)(nil) -func newFakeConsensus(c consensus.Config) (consensus.Service, error) { - return new(fakeConsensus), nil +func (f *fakeConsensus) Start() { f.started.Store(true) } +func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } +func (f *fakeConsensus) OnPayload(p *payload.Extensible) error { + f.payloads = append(f.payloads, p) + return nil } -func (f *fakeConsensus) Start() { f.started.Store(true) } -func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } -func (f *fakeConsensus) OnPayload(p *payload.Extensible) { f.payloads = append(f.payloads, p) } func (f *fakeConsensus) OnTransaction(tx *transaction.Transaction) { f.txs = append(f.txs, tx) } func (f *fakeConsensus) GetPayload(h util.Uint256) *payload.Extensible { panic("implement me") } @@ -52,7 +52,7 @@ func TestNewServer(t *testing.T) { P2PStateExchangeExtensions: true, StateRootInHeader: true, }} - s, err := newServerFromConstructors(ServerConfig{}, bc, nil, newFakeTransp, newFakeConsensus, newTestDiscovery) + s, err := newServerFromConstructors(ServerConfig{}, bc, new(fakechain.FakeStateSync), nil, newFakeTransp, newTestDiscovery) require.Error(t, err) t.Run("set defaults", func(t *testing.T) { @@ -76,13 +76,6 @@ func TestNewServer(t *testing.T) { require.Equal(t, 2, s.ServerConfig.MaxPeers) require.Equal(t, 3, s.ServerConfig.AttemptConnPeers) }) - t.Run("consensus error is not dropped", func(t *testing.T) { - errConsensus := errors.New("can't create consensus") - _, err = newServerFromConstructors(ServerConfig{MinPeers: -1}, bc, zaptest.NewLogger(t), newFakeTransp, - func(consensus.Config) (consensus.Service, error) { return nil, errConsensus }, - newTestDiscovery) - require.True(t, errors.Is(err, errConsensus), "got: %#v", err) - }) } func startWithChannel(s *Server) chan error { @@ -104,13 +97,12 @@ func TestServerStartAndShutdown(t *testing.T) { require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) assert.True(t, s.transport.(*fakeTransp).started.Load()) - assert.False(t, s.consensus.(*fakeConsensus).started.Load()) + assert.Nil(t, s.txCallback) s.Shutdown() <-ch require.True(t, s.transport.(*fakeTransp).closed.Load()) - require.True(t, s.consensus.(*fakeConsensus).stopped.Load()) err, ok := p.droppedWith.Load().(error) require.True(t, ok) require.True(t, errors.Is(err, errServerShutdown)) @@ -122,12 +114,12 @@ func TestServerStartAndShutdown(t *testing.T) { p := newLocalPeer(t, s) s.register <- p - assert.True(t, s.consensus.(*fakeConsensus).started.Load()) + assert.True(t, s.services[0].(*fakeConsensus).started.Load()) s.Shutdown() <-ch - require.True(t, s.consensus.(*fakeConsensus).stopped.Load()) + require.True(t, s.services[0].(*fakeConsensus).stopped.Load()) }) } @@ -416,7 +408,8 @@ func TestBlock(t *testing.T) { } func TestConsensus(t *testing.T) { - s := startTestServer(t) + s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)}) + startWithCleanup(t, s) atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4) p := newLocalPeer(t, s) @@ -438,13 +431,13 @@ func TestConsensus(t *testing.T) { s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil } require.NoError(t, s.handleMessage(p, msg)) - require.Contains(t, s.consensus.(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) + require.Contains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) t.Run("small ValidUntilBlockEnd", func(t *testing.T) { t.Run("current height", func(t *testing.T) { msg := newConsensusMessage(0, s.chain.BlockHeight()) require.NoError(t, s.handleMessage(p, msg)) - require.NotContains(t, s.consensus.(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) + require.NotContains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) }) t.Run("invalid", func(t *testing.T) { msg := newConsensusMessage(0, s.chain.BlockHeight()-1) @@ -455,17 +448,11 @@ func TestConsensus(t *testing.T) { msg := newConsensusMessage(s.chain.BlockHeight()+1, s.chain.BlockHeight()+2) require.Error(t, s.handleMessage(p, msg)) }) - t.Run("invalid category", func(t *testing.T) { - pl := payload.NewExtensible() - pl.Category = "invalid" - pl.ValidBlockEnd = s.chain.BlockHeight() + 1 - msg := NewMessage(CMDExtensible, pl) - require.Error(t, s.handleMessage(p, msg)) - }) } func TestTransaction(t *testing.T) { - s := startTestServer(t) + s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)}) + startWithCleanup(t, s) t.Run("good", func(t *testing.T) { tx := newDummyTx() @@ -481,15 +468,13 @@ func TestTransaction(t *testing.T) { s.register <- p s.testHandleMessage(t, nil, CMDTX, tx) - require.Contains(t, s.consensus.(*fakeConsensus).txs, tx) + require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) }) t.Run("bad", func(t *testing.T) { tx := newDummyTx() s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.testHandleMessage(t, nil, CMDTX, tx) - for _, ftx := range s.consensus.(*fakeConsensus).txs { - require.NotEqual(t, ftx, tx) - } + require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) // Consensus receives everything. }) } @@ -907,13 +892,13 @@ func TestRequestTx(t *testing.T) { t.Run("no hashes, no message", func(t *testing.T) { actual = nil - s.requestTx() + s.RequestTx() require.Nil(t, actual) }) t.Run("good, small", func(t *testing.T) { actual = nil expected := []util.Uint256{random.Uint256(), random.Uint256()} - s.requestTx(expected...) + s.RequestTx(expected...) require.Equal(t, expected, actual) }) t.Run("good, exactly one chunk", func(t *testing.T) { @@ -922,7 +907,7 @@ func TestRequestTx(t *testing.T) { for i := range expected { expected[i] = random.Uint256() } - s.requestTx(expected...) + s.RequestTx(expected...) require.Equal(t, expected, actual) }) t.Run("good, multiple chunks", func(t *testing.T) { @@ -931,7 +916,7 @@ func TestRequestTx(t *testing.T) { for i := range expected { expected[i] = random.Uint256() } - s.requestTx(expected...) + s.RequestTx(expected...) require.Equal(t, expected, actual) }) } @@ -1022,6 +1007,9 @@ func TestVerifyNotaryRequest(t *testing.T) { bc := fakechain.NewFakeChain() bc.MaxVerificationGAS = 10 bc.NotaryContractScriptHash = util.Uint160{1, 2, 3} + s, err := newServerFromConstructors(ServerConfig{}, bc, new(fakechain.FakeStateSync), zaptest.NewLogger(t), newFakeTransp, newTestDiscovery) + require.NoError(t, err) + t.Cleanup(s.Shutdown) newNotaryRequest := func() *payload.P2PNotaryRequest { return &payload.P2PNotaryRequest{ MainTransaction: &transaction.Transaction{Script: []byte{0, 1, 2}}, @@ -1035,26 +1023,26 @@ func TestVerifyNotaryRequest(t *testing.T) { t.Run("bad payload witness", func(t *testing.T) { bc.VerifyWitnessF = func() (int64, error) { return 0, errors.New("bad witness") } - require.Error(t, verifyNotaryRequest(bc, nil, newNotaryRequest())) + require.Error(t, s.verifyNotaryRequest(nil, newNotaryRequest())) }) t.Run("bad fallback sender", func(t *testing.T) { bc.VerifyWitnessF = func() (int64, error) { return 0, nil } r := newNotaryRequest() r.FallbackTransaction.Signers[0] = transaction.Signer{Account: util.Uint160{7, 8, 9}} - require.Error(t, verifyNotaryRequest(bc, nil, r)) + require.Error(t, s.verifyNotaryRequest(nil, r)) }) t.Run("expired deposit", func(t *testing.T) { r := newNotaryRequest() bc.NotaryDepositExpiration = r.FallbackTransaction.ValidUntilBlock - require.Error(t, verifyNotaryRequest(bc, nil, r)) + require.Error(t, s.verifyNotaryRequest(nil, r)) }) t.Run("good", func(t *testing.T) { r := newNotaryRequest() bc.NotaryDepositExpiration = r.FallbackTransaction.ValidUntilBlock + 1 - require.NoError(t, verifyNotaryRequest(bc, nil, r)) + require.NoError(t, s.verifyNotaryRequest(nil, r)) }) } diff --git a/pkg/core/blockchainer/state_sync.go b/pkg/network/state_sync.go similarity index 88% rename from pkg/core/blockchainer/state_sync.go rename to pkg/network/state_sync.go index a8ff919d9..8e6392e7a 100644 --- a/pkg/core/blockchainer/state_sync.go +++ b/pkg/network/state_sync.go @@ -1,4 +1,4 @@ -package blockchainer +package network import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" @@ -8,7 +8,7 @@ import ( // StateSync represents state sync module. type StateSync interface { AddMPTNodes([][]byte) error - Blockqueuer // Blockqueuer interface + Blockqueuer Init(currChainHeight uint32) error IsActive() bool IsInitialized() bool diff --git a/pkg/rpc/response/result/block.go b/pkg/rpc/response/result/block.go index cd3c4ca1b..418067a7b 100644 --- a/pkg/rpc/response/result/block.go +++ b/pkg/rpc/response/result/block.go @@ -5,12 +5,16 @@ import ( "errors" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util" ) type ( + // LedgerAux is a set of methods needed to construct some outputs. + LedgerAux interface { + BlockHeight() uint32 + GetHeaderHash(int) util.Uint256 + } // Block wrapper used for the representation of // block.Block / block.Base on the RPC Server. Block struct { @@ -28,7 +32,7 @@ type ( ) // NewBlock creates a new Block wrapper. -func NewBlock(b *block.Block, chain blockchainer.Blockchainer) Block { +func NewBlock(b *block.Block, chain LedgerAux) Block { res := Block{ Block: *b, BlockMetadata: BlockMetadata{ diff --git a/pkg/rpc/response/result/block_header.go b/pkg/rpc/response/result/block_header.go index c68e7b38a..2c3bd8ae8 100644 --- a/pkg/rpc/response/result/block_header.go +++ b/pkg/rpc/response/result/block_header.go @@ -5,7 +5,6 @@ import ( "errors" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util" ) @@ -20,7 +19,7 @@ type ( ) // NewHeader creates a new Header wrapper. -func NewHeader(h *block.Header, chain blockchainer.Blockchainer) Header { +func NewHeader(h *block.Header, chain LedgerAux) Header { res := Header{ Header: *h, BlockMetadata: BlockMetadata{ diff --git a/pkg/rpc/response/result/tx_raw_output.go b/pkg/rpc/response/result/tx_raw_output.go index 2d14220cc..f2e74bbee 100644 --- a/pkg/rpc/response/result/tx_raw_output.go +++ b/pkg/rpc/response/result/tx_raw_output.go @@ -5,7 +5,6 @@ import ( "errors" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/util" @@ -27,7 +26,7 @@ type TransactionMetadata struct { } // NewTransactionOutputRaw returns a new ransactionOutputRaw object. -func NewTransactionOutputRaw(tx *transaction.Transaction, header *block.Header, appExecResult *state.AppExecResult, chain blockchainer.Blockchainer) TransactionOutputRaw { +func NewTransactionOutputRaw(tx *transaction.Transaction, header *block.Header, appExecResult *state.AppExecResult, chain LedgerAux) TransactionOutputRaw { result := TransactionOutputRaw{ Transaction: *tx, } diff --git a/pkg/rpc/server/client_test.go b/pkg/rpc/server/client_test.go index e24647cfe..7b92eccac 100644 --- a/pkg/rpc/server/client_test.go +++ b/pkg/rpc/server/client_test.go @@ -715,9 +715,9 @@ func TestCreateNEP17TransferTx(t *testing.T) { require.NoError(t, err) require.NoError(t, acc.SignTx(testchain.Network(), tx)) require.NoError(t, chain.VerifyTx(tx)) - v, _ := chain.GetTestVM(trigger.Application, tx, nil) - v.LoadScriptWithFlags(tx.Script, callflag.All) - require.NoError(t, v.Run()) + ic := chain.GetTestVM(trigger.Application, tx, nil) + ic.VM.LoadScriptWithFlags(tx.Script, callflag.All) + require.NoError(t, ic.VM.Run()) }) t.Run("none scope", func(t *testing.T) { _, err := c.CreateNEP17TransferTx(acc, util.Uint160{}, gasContractHash, 1000, 0, nil, []client.SignerAccount{{ @@ -739,9 +739,9 @@ func TestCreateNEP17TransferTx(t *testing.T) { require.NoError(t, err) require.NoError(t, acc.SignTx(testchain.Network(), tx)) require.NoError(t, chain.VerifyTx(tx)) - v, _ := chain.GetTestVM(trigger.Application, tx, nil) - v.LoadScriptWithFlags(tx.Script, callflag.All) - require.NoError(t, v.Run()) + ic := chain.GetTestVM(trigger.Application, tx, nil) + ic.VM.LoadScriptWithFlags(tx.Script, callflag.All) + require.NoError(t, ic.VM.Run()) }) } diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 76cf975c5..6ea513586 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -620,13 +620,13 @@ func (s *Server) calculateNetworkFee(reqParams request.Params) (interface{}, *re } if ef == 0 { - ef = s.chain.GetPolicer().GetBaseExecFee() + ef = s.chain.GetBaseExecFee() } fee, sizeDelta := fee.Calculate(ef, verificationScript) netFee += fee size += sizeDelta } - fee := s.chain.GetPolicer().FeePerByte() + fee := s.chain.FeePerByte() netFee += int64(size) * fee return result.NetworkFee{Value: netFee}, nil } @@ -853,19 +853,19 @@ func (s *Server) invokeReadOnly(bw *io.BufBinWriter, h util.Uint160, method stri if err != nil { return nil, nil, err } - v, finalize := s.chain.GetTestVM(trigger.Application, tx, b) - v.GasLimit = core.HeaderVerificationGasLimit - v.LoadScriptWithFlags(script, callflag.All) - err = v.Run() + ic := s.chain.GetTestVM(trigger.Application, tx, b) + ic.VM.GasLimit = core.HeaderVerificationGasLimit + ic.VM.LoadScriptWithFlags(script, callflag.All) + err = ic.VM.Run() if err != nil { - finalize() + ic.Finalize() return nil, nil, fmt.Errorf("failed to run `%s` for %s: %w", method, h.StringLE(), err) } - if v.Estack().Len() != 1 { - finalize() - return nil, nil, fmt.Errorf("invalid `%s` return values count: expected 1, got %d", method, v.Estack().Len()) + if ic.VM.Estack().Len() != 1 { + ic.Finalize() + return nil, nil, fmt.Errorf("invalid `%s` return values count: expected 1, got %d", method, ic.VM.Estack().Len()) } - return v.Estack().Pop().Item(), finalize, nil + return ic.VM.Estack().Pop().Item(), ic.Finalize, nil } func (s *Server) getTokenBalance(h util.Uint160, acc util.Uint160, id []byte, bw *io.BufBinWriter) (*big.Int, error) { @@ -1690,38 +1690,32 @@ func (s *Server) runScriptInVM(t trigger.Type, script []byte, contractScriptHash if err != nil { return nil, response.NewInternalServerError("can't create fake block", err) } - vm, finalize := s.chain.GetTestVM(t, tx, b) + ic := s.chain.GetTestVM(t, tx, b) if verbose { - vm.EnableInvocationTree() + ic.VM.EnableInvocationTree() } - vm.GasLimit = int64(s.config.MaxGasInvoke) + ic.VM.GasLimit = int64(s.config.MaxGasInvoke) if t == trigger.Verification { // We need this special case because witnesses verification is not the simple System.Contract.Call, // and we need to define exactly the amount of gas consumed for a contract witness verification. - gasPolicy := s.chain.GetPolicer().GetMaxVerificationGAS() - if vm.GasLimit > gasPolicy { - vm.GasLimit = gasPolicy + gasPolicy := s.chain.GetMaxVerificationGAS() + if ic.VM.GasLimit > gasPolicy { + ic.VM.GasLimit = gasPolicy } - err := s.chain.InitVerificationVM(vm, func(h util.Uint160) (*state.Contract, error) { - res := s.chain.GetContractState(h) - if res == nil { - return nil, fmt.Errorf("unknown contract: %s", h.StringBE()) - } - return res, nil - }, contractScriptHash, &transaction.Witness{InvocationScript: script, VerificationScript: []byte{}}) + err := s.chain.InitVerificationContext(ic, contractScriptHash, &transaction.Witness{InvocationScript: script, VerificationScript: []byte{}}) if err != nil { return nil, response.NewInternalServerError("can't prepare verification VM", err) } } else { - vm.LoadScriptWithFlags(script, callflag.All) + ic.VM.LoadScriptWithFlags(script, callflag.All) } - err = vm.Run() + err = ic.VM.Run() var faultException string if err != nil { faultException = err.Error() } - return result.NewInvoke(vm, finalize, script, faultException, s.config.MaxIteratorResultItems), nil + return result.NewInvoke(ic.VM, ic.Finalize, script, faultException, s.config.MaxIteratorResultItems), nil } // submitBlock broadcasts a raw block over the NEO network. diff --git a/pkg/rpc/server/server_helper_test.go b/pkg/rpc/server/server_helper_test.go index a746ca5e8..0baa86411 100644 --- a/pkg/rpc/server/server_helper_test.go +++ b/pkg/rpc/server/server_helper_test.go @@ -102,7 +102,7 @@ func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool) serverConfig := network.NewServerConfig(cfg) serverConfig.Port = 0 - server, err := network.NewServer(serverConfig, chain, logger) + server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger) require.NoError(t, err) rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger) errCh := make(chan error, 2) diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index a073db5ad..0b69169a4 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -2325,7 +2325,7 @@ func BenchmarkHandleIn(b *testing.B) { serverConfig := network.NewServerConfig(cfg) serverConfig.LogLevel = zapcore.FatalLevel - server, err := network.NewServer(serverConfig, chain, logger) + server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger) require.NoError(b, err) rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger) defer chain.Close() diff --git a/pkg/services/notary/node_test.go b/pkg/services/notary/node_test.go index 4e69eb246..979c3e848 100644 --- a/pkg/services/notary/node_test.go +++ b/pkg/services/notary/node_test.go @@ -6,7 +6,6 @@ import ( "github.com/nspcc-dev/neo-go/internal/fakechain" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/wallet" @@ -14,7 +13,7 @@ import ( "go.uber.org/zap/zaptest" ) -func getTestNotary(t *testing.T, bc blockchainer.Blockchainer, walletPath, pass string) (*wallet.Account, *Notary, *mempool.Pool) { +func getTestNotary(t *testing.T, bc Ledger, walletPath, pass string) (*wallet.Account, *Notary, *mempool.Pool) { mainCfg := config.P2PNotary{ Enabled: true, UnlockWallet: config.Wallet{ diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 9fd69fe22..5f6a8e747 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -11,7 +11,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -27,6 +26,16 @@ import ( ) type ( + // Ledger is the interface to Blockchain sufficient for Notary. + Ledger interface { + BlockHeight() uint32 + GetMaxVerificationGAS() int64 + GetNotaryContractScriptHash() util.Uint160 + SubscribeForBlocks(ch chan<- *block.Block) + UnsubscribeFromBlocks(ch chan<- *block.Block) + VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error) + } + // Notary represents Notary module. Notary struct { Config Config @@ -60,7 +69,7 @@ type ( // Config represents external configuration for Notary module. Config struct { MainCfg config.P2PNotary - Chain blockchainer.Blockchainer + Chain Ledger Log *zap.Logger } ) @@ -143,12 +152,16 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu }, nil } -// Run runs Notary module and should be called in a separate goroutine. -func (n *Notary) Run() { +// Start runs Notary module in a separate goroutine. +func (n *Notary) Start() { n.Config.Log.Info("starting notary service") n.Config.Chain.SubscribeForBlocks(n.blocksCh) n.mp.SubscribeForTransactions(n.reqCh) go n.newTxCallbackLoop() + go n.mainLoop() +} + +func (n *Notary) mainLoop() { for { select { case <-n.stopCh: @@ -171,8 +184,8 @@ func (n *Notary) Run() { } } -// Stop shutdowns Notary module. -func (n *Notary) Stop() { +// Shutdown stops Notary module. +func (n *Notary) Shutdown() { close(n.stopCh) } @@ -227,7 +240,7 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) { switch r.witnessInfo[i].typ { case Contract: // Need to check even if r.main.Scripts[i].InvocationScript is already filled in. - _, err := n.Config.Chain.VerifyWitness(r.main.Signers[i].Account, r.main, &w, n.Config.Chain.GetPolicer().GetMaxVerificationGAS()) + _, err := n.Config.Chain.VerifyWitness(r.main.Signers[i].Account, r.main, &w, n.Config.Chain.GetMaxVerificationGAS()) if err != nil { continue } diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index a9fcad05d..b63c96ebe 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -9,10 +9,12 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/wallet" @@ -20,6 +22,17 @@ import ( ) type ( + // Ledger is the interface to Blockchain sufficient for Oracle. + Ledger interface { + BlockHeight() uint32 + FeePerByte() int64 + GetBaseExecFee() int64 + GetConfig() config.ProtocolConfiguration + GetMaxVerificationGAS() int64 + GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *interop.Context + GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error) + } + // Oracle represents oracle module capable of talking // with the external world. Oracle struct { @@ -64,7 +77,7 @@ type ( Network netmode.Magic MainCfg config.OracleConfiguration Client HTTPClient - Chain blockchainer.Blockchainer + Chain Ledger ResponseHandler Broadcaster OnTransaction TxCallback URIValidator URIValidator @@ -85,7 +98,7 @@ type ( defaultResponseHandler struct{} // TxCallback executes on new transactions when they are ready to be pooled. - TxCallback = func(tx *transaction.Transaction) + TxCallback = func(tx *transaction.Transaction) error // URIValidator is used to check if provided URL is valid. URIValidator = func(*url.URL) error ) @@ -156,7 +169,7 @@ func NewOracle(cfg Config) (*Oracle, error) { o.ResponseHandler = defaultResponseHandler{} } if o.OnTransaction == nil { - o.OnTransaction = func(*transaction.Transaction) {} + o.OnTransaction = func(*transaction.Transaction) error { return nil } } if o.URIValidator == nil { o.URIValidator = defaultURIValidator @@ -170,15 +183,18 @@ func (o *Oracle) Shutdown() { o.getBroadcaster().Shutdown() } -// Run runs must be executed in a separate goroutine. -func (o *Oracle) Run() { +// Start runs the oracle service in a separate goroutine. +func (o *Oracle) Start() { o.respMtx.Lock() if o.running { o.respMtx.Unlock() return } o.Log.Info("starting oracle service") + go o.start() +} +func (o *Oracle) start() { o.requestMap <- o.pending // Guaranteed to not block, only AddRequests sends to it. o.pending = nil o.running = true @@ -236,17 +252,12 @@ func (o *Oracle) UpdateNativeContract(script, resp []byte, h util.Uint160, verif o.verifyOffset = verifyOffset } -func (o *Oracle) getOnTransaction() TxCallback { - o.mtx.RLock() - defer o.mtx.RUnlock() - return o.OnTransaction -} - -// SetOnTransaction sets callback to pool and broadcast tx. -func (o *Oracle) SetOnTransaction(cb TxCallback) { - o.mtx.Lock() - defer o.mtx.Unlock() - o.OnTransaction = cb +func (o *Oracle) sendTx(tx *transaction.Transaction) { + if err := o.OnTransaction(tx); err != nil { + o.Log.Error("can't pool oracle tx", + zap.String("hash", tx.Hash().StringLE()), + zap.Error(err)) + } } func (o *Oracle) getBroadcaster() Broadcaster { diff --git a/pkg/services/oracle/request.go b/pkg/services/oracle/request.go index fea5c61b3..360d84276 100644 --- a/pkg/services/oracle/request.go +++ b/pkg/services/oracle/request.go @@ -240,7 +240,7 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error { o.getBroadcaster().SendResponse(priv, resp, txSig) if ready { - o.getOnTransaction()(readyTx) + o.sendTx(readyTx) } return nil } @@ -253,7 +253,7 @@ func (o *Oracle) processFailedRequest(priv *keys.PrivateKey, req request) { return } else if incTx.isSent { // Tx was sent but not yet persisted. Try to pool it again. - o.getOnTransaction()(incTx.tx) + o.sendTx(incTx.tx) return } @@ -271,7 +271,7 @@ func (o *Oracle) processFailedRequest(priv *keys.PrivateKey, req request) { o.getBroadcaster().SendResponse(priv, getFailedResponse(req.ID), txSig) if ready { - o.getOnTransaction()(readyTx) + o.sendTx(readyTx) } } diff --git a/pkg/services/oracle/response.go b/pkg/services/oracle/response.go index 2c4be8262..779290b8a 100644 --- a/pkg/services/oracle/response.go +++ b/pkg/services/oracle/response.go @@ -6,13 +6,13 @@ import ( gio "io" "github.com/nspcc-dev/neo-go/pkg/core/fee" + "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" - "github.com/nspcc-dev/neo-go/pkg/vm" "go.uber.org/zap" ) @@ -59,7 +59,7 @@ func (o *Oracle) AddResponse(pub *keys.PublicKey, reqID uint64, txSig []byte) { incTx.Unlock() if ready { - o.getOnTransaction()(readyTx) + o.sendTx(readyTx) } } @@ -115,7 +115,7 @@ func (o *Oracle) CreateResponseTx(gasForResponse int64, vub uint32, resp *transa } tx.NetworkFee += gasConsumed - netFee, sizeDelta := fee.Calculate(o.Chain.GetPolicer().GetBaseExecFee(), tx.Scripts[1].VerificationScript) + netFee, sizeDelta := fee.Calculate(o.Chain.GetBaseExecFee(), tx.Scripts[1].VerificationScript) tx.NetworkFee += netFee size += sizeDelta @@ -138,24 +138,24 @@ func (o *Oracle) testVerify(tx *transaction.Transaction) (int64, bool) { // method caches transaction hash, but tx building is not yet completed and hash will be changed. // So make a copy of tx to avoid wrong hash caching. cp := *tx - v, finalize := o.Chain.GetTestVM(trigger.Verification, &cp, nil) - v.GasLimit = o.Chain.GetPolicer().GetMaxVerificationGAS() - v.LoadScriptWithHash(o.oracleScript, o.oracleHash, callflag.ReadOnly) - v.Context().Jump(o.verifyOffset) + ic := o.Chain.GetTestVM(trigger.Verification, &cp, nil) + ic.VM.GasLimit = o.Chain.GetMaxVerificationGAS() + ic.VM.LoadScriptWithHash(o.oracleScript, o.oracleHash, callflag.ReadOnly) + ic.VM.Context().Jump(o.verifyOffset) - ok := isVerifyOk(v, finalize) - return v.GasConsumed(), ok + ok := isVerifyOk(ic) + return ic.VM.GasConsumed(), ok } -func isVerifyOk(v *vm.VM, finalize func()) bool { - defer finalize() - if err := v.Run(); err != nil { +func isVerifyOk(ic *interop.Context) bool { + defer ic.Finalize() + if err := ic.VM.Run(); err != nil { return false } - if v.Estack().Len() != 1 { + if ic.VM.Estack().Len() != 1 { return false } - ok, err := v.Estack().Pop().Item().TryBool() + ok, err := ic.VM.Estack().Pop().Item().TryBool() return err == nil && ok } diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 6716d4b68..8da2ae13d 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -8,7 +8,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/stateroot" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -19,19 +18,26 @@ import ( ) type ( + // Ledger is the interface to Blockchain sufficient for Service. + Ledger interface { + GetConfig() config.ProtocolConfiguration + HeaderHeight() uint32 + SubscribeForBlocks(ch chan<- *block.Block) + UnsubscribeFromBlocks(ch chan<- *block.Block) + } + // Service represents state root service. Service interface { - blockchainer.StateRoot OnPayload(p *payload.Extensible) error AddSignature(height uint32, validatorIndex int32, sig []byte) error GetConfig() config.StateRoot - Run() + Start() Shutdown() } service struct { - blockchainer.StateRoot - chain blockchainer.Blockchainer + *stateroot.Module + chain Ledger MainCfg config.StateRoot Network netmode.Magic @@ -60,10 +66,10 @@ const ( ) // New returns new state root service instance using underlying module. -func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb RelayCallback) (Service, error) { +func New(cfg config.StateRoot, sm *stateroot.Module, log *zap.Logger, bc Ledger, cb RelayCallback) (Service, error) { bcConf := bc.GetConfig() s := &service{ - StateRoot: bc.GetStateModule(), + Module: sm, Network: bcConf.Magic, chain: bc, log: log, @@ -77,6 +83,9 @@ func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb s.MainCfg = cfg if cfg.Enabled { + if bcConf.StateRootInHeader { + return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled") + } var err error w := cfg.UnlockWallet if s.wallet, err = wallet.NewWalletFromFile(w.Path); err != nil { diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index e36ba9bd5..8c934590f 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -17,8 +17,8 @@ const ( firstVoteResendDelay = 3 * time.Second ) -// Run runs service instance in a separate goroutine. -func (s *service) Run() { +// Start runs service instance in a separate goroutine. +func (s *service) Start() { s.log.Info("starting state validation service") s.chain.SubscribeForBlocks(s.blockCh) go s.run()