From 717be43a5d1d69d443fee4c2a29bb1845bf615ea Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Tue, 2 Feb 2021 10:45:02 +0300 Subject: [PATCH 1/6] oracle: split broadcaster into parts 1. Generic parallel sending part can be reused by state service. 2. Specific oracle marshaling is implemented on top of (1). --- .../helpers/rpcbroadcaster/broadcaster.go | 55 ++++++++++++++++++ .../rpcbroadcaster}/client.go | 21 ++++--- pkg/services/oracle/broadcaster/oracle.go | 56 ++++--------------- 3 files changed, 81 insertions(+), 51 deletions(-) create mode 100644 pkg/services/helpers/rpcbroadcaster/broadcaster.go rename pkg/services/{oracle/broadcaster => helpers/rpcbroadcaster}/client.go (63%) diff --git a/pkg/services/helpers/rpcbroadcaster/broadcaster.go b/pkg/services/helpers/rpcbroadcaster/broadcaster.go new file mode 100644 index 000000000..d1126cfc7 --- /dev/null +++ b/pkg/services/helpers/rpcbroadcaster/broadcaster.go @@ -0,0 +1,55 @@ +package rpcbroadcaster + +import ( + "time" + + "github.com/nspcc-dev/neo-go/pkg/rpc/request" + "go.uber.org/zap" +) + +// RPCBroadcaster represent generic RPC broadcaster. +type RPCBroadcaster struct { + Clients map[string]*RPCClient + Log *zap.Logger + Responses chan request.RawParams + + close chan struct{} + sendTimeout time.Duration +} + +// NewRPCBroadcaster returns new RPC broadcaster instance. +func NewRPCBroadcaster(log *zap.Logger, sendTimeout time.Duration) *RPCBroadcaster { + return &RPCBroadcaster{ + Clients: make(map[string]*RPCClient), + Log: log, + close: make(chan struct{}), + Responses: make(chan request.RawParams), + sendTimeout: sendTimeout, + } +} + +// Run implements oracle.Broadcaster. +func (r *RPCBroadcaster) Run() { + for _, c := range r.Clients { + go c.run() + } + for { + select { + case <-r.close: + return + case ps := <-r.Responses: + for _, c := range r.Clients { + select { + case c.responses <- ps: + default: + c.log.Error("can't send response, channel is full") + } + } + } + } +} + +// Shutdown implements oracle.Broadcaster. +func (r *RPCBroadcaster) Shutdown() { + close(r.close) +} diff --git a/pkg/services/oracle/broadcaster/client.go b/pkg/services/helpers/rpcbroadcaster/client.go similarity index 63% rename from pkg/services/oracle/broadcaster/client.go rename to pkg/services/helpers/rpcbroadcaster/client.go index bfc7accbe..95cbcffa9 100644 --- a/pkg/services/oracle/broadcaster/client.go +++ b/pkg/services/helpers/rpcbroadcaster/client.go @@ -1,4 +1,4 @@ -package broadcaster +package rpcbroadcaster import ( "context" @@ -9,26 +9,33 @@ import ( "go.uber.org/zap" ) -type oracleClient struct { +// RPCClient represent rpc client for a single node. +type RPCClient struct { client *client.Client addr string close chan struct{} responses chan request.RawParams log *zap.Logger sendTimeout time.Duration + method SendMethod } -func (r *rpcBroascaster) newOracleClient(addr string, timeout time.Duration, ch chan request.RawParams) *oracleClient { - return &oracleClient{ +// SendMethod represents rpc method for sending data to other nodes. +type SendMethod func(*client.Client, request.RawParams) error + +// NewRPCClient returns new rpc client for provided address and method. +func (r *RPCBroadcaster) NewRPCClient(addr string, method SendMethod, timeout time.Duration, ch chan request.RawParams) *RPCClient { + return &RPCClient{ addr: addr, close: r.close, responses: ch, - log: r.log.With(zap.String("address", addr)), + log: r.Log.With(zap.String("address", addr)), sendTimeout: timeout, + method: method, } } -func (c *oracleClient) run() { +func (c *RPCClient) run() { // We ignore error as not every node can be available on startup. c.client, _ = client.New(context.Background(), "http://"+c.addr, client.Options{ DialTimeout: c.sendTimeout, @@ -49,7 +56,7 @@ func (c *oracleClient) run() { continue } } - err := c.client.SubmitRawOracleResponse(ps) + err := c.method(c.client, ps) if err != nil { c.log.Error("error while submitting oracle response", zap.Error(err)) } diff --git a/pkg/services/oracle/broadcaster/oracle.go b/pkg/services/oracle/broadcaster/oracle.go index 2d8baafd2..db9aec096 100644 --- a/pkg/services/oracle/broadcaster/oracle.go +++ b/pkg/services/oracle/broadcaster/oracle.go @@ -8,72 +8,40 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/rpc/client" "github.com/nspcc-dev/neo-go/pkg/rpc/request" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/rpcbroadcaster" "github.com/nspcc-dev/neo-go/pkg/services/oracle" "go.uber.org/zap" ) -type rpcBroascaster struct { - clients map[string]*oracleClient - log *zap.Logger - - close chan struct{} - responses chan request.RawParams - sendTimeout time.Duration -} - const ( defaultSendTimeout = time.Second * 4 defaultChanCapacity = 16 ) +type oracleBroadcaster struct { + rpcbroadcaster.RPCBroadcaster +} + // New returns new struct capable of broadcasting oracle responses. func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster { if cfg.ResponseTimeout == 0 { cfg.ResponseTimeout = defaultSendTimeout } - r := &rpcBroascaster{ - clients: make(map[string]*oracleClient, len(cfg.Nodes)), - log: log, - close: make(chan struct{}), - responses: make(chan request.RawParams), - sendTimeout: cfg.ResponseTimeout, + r := &oracleBroadcaster{ + RPCBroadcaster: *rpcbroadcaster.NewRPCBroadcaster(log, cfg.ResponseTimeout), } for i := range cfg.Nodes { - r.clients[cfg.Nodes[i]] = r.newOracleClient(cfg.Nodes[i], cfg.ResponseTimeout, make(chan request.RawParams, defaultChanCapacity)) + r.Clients[cfg.Nodes[i]] = r.NewRPCClient(cfg.Nodes[i], (*client.Client).SubmitRawOracleResponse, + cfg.ResponseTimeout, make(chan request.RawParams, defaultChanCapacity)) } return r } -// Run implements oracle.Broadcaster. -func (r *rpcBroascaster) Run() { - for _, c := range r.clients { - go c.run() - } - for { - select { - case <-r.close: - return - case ps := <-r.responses: - for _, c := range r.clients { - select { - case c.responses <- ps: - default: - c.log.Error("can't send response, channel is full") - } - } - } - } -} - -// Shutdown implements oracle.Broadcaster. -func (r *rpcBroascaster) Shutdown() { - close(r.close) -} - // SendResponse implements interfaces.Broadcaster. -func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { +func (r *oracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { pub := priv.PublicKey() data := GetMessage(pub.Bytes(), resp.ID, txSig) msgSig := priv.Sign(data) @@ -83,7 +51,7 @@ func (r *rpcBroascaster) SendResponse(priv *keys.PrivateKey, resp *transaction.O base64.StdEncoding.EncodeToString(txSig), base64.StdEncoding.EncodeToString(msgSig), ) - r.responses <- params + r.Responses <- params } // GetMessage returns data which is signed upon sending response by RPC. From 7a176727caec5a9f872e6146713a595257eedd85 Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Tue, 2 Feb 2021 12:34:15 +0300 Subject: [PATCH 2/6] smartcontract: add `GetMajorityNodeCount()` --- pkg/smartcontract/contract.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/smartcontract/contract.go b/pkg/smartcontract/contract.go index e413c7c07..7168e83e4 100644 --- a/pkg/smartcontract/contract.go +++ b/pkg/smartcontract/contract.go @@ -49,7 +49,7 @@ func CreateDefaultMultiSigRedeemScript(publicKeys keys.PublicKeys) ([]byte, erro // using publicKeys length with m set to majority. func CreateMajorityMultiSigRedeemScript(publicKeys keys.PublicKeys) ([]byte, error) { n := len(publicKeys) - m := n - (n-1)/2 + m := GetMajorityHonestNodeCount(n) return CreateMultiSigRedeemScript(m, publicKeys) } @@ -58,3 +58,9 @@ func CreateMajorityMultiSigRedeemScript(publicKeys keys.PublicKeys) ([]byte, err func GetDefaultHonestNodeCount(n int) int { return n - (n-1)/3 } + +// GetMajorityHonestNodeCount returns minimum number of honest nodes +// required for majority-style agreement. +func GetMajorityHonestNodeCount(n int) int { + return n - (n-1)/2 +} From bf20db09e05fae4d7e3ef142b823bab8df32ce7e Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Fri, 29 Jan 2021 17:33:24 +0300 Subject: [PATCH 3/6] stateroot: move state-root related logic to core/stateroot --- internal/fakechain/fakechain.go | 14 +-- pkg/consensus/consensus.go | 6 +- pkg/consensus/consensus_test.go | 2 +- pkg/core/blockchain.go | 129 +++----------------------- pkg/core/blockchain_test.go | 2 +- pkg/core/blockchainer/blockchainer.go | 4 +- pkg/core/blockchainer/state_root.go | 13 +++ pkg/core/dao/dao.go | 82 +--------------- pkg/core/helper_test.go | 2 +- pkg/core/native_management_test.go | 4 +- pkg/core/prometheus.go | 12 --- pkg/core/state/mpt_root.go | 113 +++------------------- pkg/core/state/mpt_root_test.go | 54 ++--------- pkg/core/stateroot/module.go | 98 +++++++++++++++++++ pkg/core/stateroot/prometheus.go | 20 ++++ pkg/core/stateroot/store.go | 56 +++++++++++ pkg/rpc/server/server.go | 8 +- pkg/rpc/server/server_test.go | 6 +- scripts/gendump/main.go | 2 +- 19 files changed, 245 insertions(+), 382 deletions(-) create mode 100644 pkg/core/blockchainer/state_root.go create mode 100644 pkg/core/stateroot/module.go create mode 100644 pkg/core/stateroot/prometheus.go create mode 100644 pkg/core/stateroot/store.go diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index 9d89e91d2..fae3804a7 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -177,11 +177,6 @@ func (chain *FakeChain) AddBlock(block *block.Block) error { return nil } -// AddStateRoot implements Blockchainer interface. -func (chain *FakeChain) AddStateRoot(r *state.MPTRoot) error { - panic("TODO") -} - // BlockHeight implements Feer interface. func (chain *FakeChain) BlockHeight() uint32 { return atomic.LoadUint32(&chain.Blockheight) @@ -279,13 +274,8 @@ func (chain *FakeChain) GetEnrollments() ([]state.Validator, error) { panic("TODO") } -// GetStateProof implements Blockchainer interface. -func (chain *FakeChain) GetStateProof(util.Uint256, []byte) ([][]byte, error) { - panic("TODO") -} - -// GetStateRoot implements Blockchainer interface. -func (chain *FakeChain) GetStateRoot(height uint32) (*state.MPTRootState, error) { +// GetStateModule implements Blockchainer interface. +func (chain *FakeChain) GetStateModule() blockchainer.StateRoot { panic("TODO") } diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 57de2c3ba..03451d438 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -236,7 +236,7 @@ func (s *service) newPrepareRequest() payload.PrepareRequest { r := new(prepareRequest) if s.stateRootEnabled { r.stateRootEnabled = true - if sr, err := s.Chain.GetStateRoot(s.dbft.BlockIndex - 1); err == nil { + if sr, err := s.Chain.GetStateModule().GetStateRoot(s.dbft.BlockIndex - 1); err == nil { r.stateRoot = sr.Root } else { panic(err) @@ -483,7 +483,7 @@ func (s *service) verifyRequest(p payload.ConsensusPayload) error { return errInvalidVersion } if s.stateRootEnabled { - sr, err := s.Chain.GetStateRoot(s.dbft.BlockIndex - 1) + sr, err := s.Chain.GetStateModule().GetStateRoot(s.dbft.BlockIndex - 1) if err != nil { return err } else if sr.Root != req.stateRoot { @@ -637,7 +637,7 @@ func (s *service) newBlockFromContext(ctx *dbft.Context) block.Block { block.Block.Timestamp = ctx.Timestamp / nsInMs block.Block.Index = ctx.BlockIndex if s.stateRootEnabled { - sr, err := s.Chain.GetStateRoot(ctx.BlockIndex - 1) + sr, err := s.Chain.GetStateModule().GetStateRoot(ctx.BlockIndex - 1) if err != nil { return nil } diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index fa59e0248..fa65fba1b 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -321,7 +321,7 @@ func TestService_PrepareRequest(t *testing.T) { prevHash: prevHash, }) - sr, err := srv.Chain.GetStateRoot(srv.dbft.BlockIndex - 1) + sr, err := srv.Chain.GetStateModule().GetStateRoot(srv.dbft.BlockIndex - 1) require.NoError(t, err) checkRequest(t, nil, &prepareRequest{ stateRootEnabled: true, diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 988b864f4..998992a12 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -19,9 +19,9 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/interop/contract" "github.com/nspcc-dev/neo-go/pkg/core/mempool" - "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/stateroot" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto" @@ -134,6 +134,8 @@ type Blockchain struct { extensible atomic.Value + stateRoot *stateroot.Module + // Notification subsystem. events chan bcEvent subCh chan interface{} @@ -193,6 +195,8 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L contracts: *native.NewContracts(cfg.P2PSigExtensions), } + bc.stateRoot = stateroot.NewModule(bc, bc.log, bc.dao.Store) + if err := bc.init(); err != nil { return nil, err } @@ -237,7 +241,7 @@ func (bc *Blockchain) init() error { if err != nil { return err } - if err := bc.dao.InitMPT(0, bc.config.KeepOnlyLatestState); err != nil { + if err := bc.stateRoot.Init(0, bc.config.KeepOnlyLatestState); err != nil { return fmt.Errorf("can't init MPT: %w", err) } return bc.storeBlock(genesisBlock, nil) @@ -257,7 +261,7 @@ func (bc *Blockchain) init() error { } bc.blockHeight = bHeight bc.persistedHeight = bHeight - if err = bc.dao.InitMPT(bHeight, bc.config.KeepOnlyLatestState); err != nil { + if err = bc.stateRoot.Init(bHeight, bc.config.KeepOnlyLatestState); err != nil { return fmt.Errorf("can't init MPT at height %d: %w", bHeight, err) } @@ -479,7 +483,7 @@ func (bc *Blockchain) AddBlock(block *block.Block) error { ErrHdrStateRootSetting, bc.config.StateRootInHeader, block.StateRootEnabled) } if bc.config.StateRootInHeader { - if sr := bc.dao.MPT.StateRoot(); block.PrevStateRoot != sr { + if sr := bc.stateRoot.CurrentLocalStateRoot(); block.PrevStateRoot != sr { return fmt.Errorf("%w: %s != %s", ErrHdrInvalidStateRoot, block.PrevStateRoot.StringLE(), sr.StringLE()) } @@ -606,15 +610,9 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error { return nil } -// GetStateProof returns proof of having key in the MPT with the specified root. -func (bc *Blockchain) GetStateProof(root util.Uint256, key []byte) ([][]byte, error) { - tr := mpt.NewTrie(mpt.NewHashNode(root), false, storage.NewMemCachedStore(bc.dao.Store)) - return tr.GetProof(key) -} - -// GetStateRoot returns state root for a given height. -func (bc *Blockchain) GetStateRoot(height uint32) (*state.MPTRootState, error) { - return bc.dao.GetStateRoot(height) +// GetStateModule returns state root service instance. +func (bc *Blockchain) GetStateModule() blockchainer.StateRoot { + return bc.stateRoot } // storeBlock performs chain update using the block given, it executes all @@ -718,33 +716,14 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error writeBuf.Reset() d := cache.DAO.(*dao.Simple) - if err := d.UpdateMPT(); err != nil { + b := d.GetMPTBatch() + if err := bc.stateRoot.AddMPTBatch(block.Index, b); err != nil { // Here MPT can be left in a half-applied state. // However if this error occurs, this is a bug somewhere in code // because changes applied are the ones from HALTed transactions. return fmt.Errorf("error while trying to apply MPT changes: %w", err) } - root := d.MPT.StateRoot() - var prevHash util.Uint256 - if block.Index > 0 { - prev, err := bc.dao.GetStateRoot(block.Index - 1) - if err != nil { - return fmt.Errorf("can't get previous state root: %w", err) - } - prevHash = hash.DoubleSha256(prev.GetSignedPart()) - } - err = bc.AddStateRoot(&state.MPTRoot{ - MPTRootBase: state.MPTRootBase{ - Index: block.Index, - PrevHash: prevHash, - Root: root, - }, - }) - if err != nil { - return err - } - if bc.config.SaveStorageBatch { bc.lastBatch = cache.DAO.GetBatch() } @@ -767,13 +746,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error bc.lock.Unlock() return err } - bc.dao.MPT.Flush() - // Every persist cycle we also compact our in-memory MPT. - persistedHeight := atomic.LoadUint32(&bc.persistedHeight) - if persistedHeight == block.Index-1 { - // 10 is good and roughly estimated to fit remaining trie into 1M of memory. - bc.dao.MPT.Collapse(10) - } + bc.topBlock.Store(block) atomic.StoreUint32(&bc.blockHeight, block.Index) bc.memPool.RemoveStale(func(tx *transaction.Transaction) bool { return bc.IsTxStillRelevant(tx, txpool, false) }, bc) @@ -1582,79 +1555,6 @@ func (bc *Blockchain) IsTxStillRelevant(t *transaction.Transaction, txpool *memp } -// AddStateRoot add new (possibly unverified) state root to the blockchain. -func (bc *Blockchain) AddStateRoot(r *state.MPTRoot) error { - our, err := bc.GetStateRoot(r.Index) - if err == nil { - if our.Flag == state.Verified { - return bc.updateStateHeight(r.Index) - } else if r.Witness == nil && our.Witness != nil { - r.Witness = our.Witness - } - } - if err := bc.verifyStateRoot(r); err != nil { - return fmt.Errorf("invalid state root: %w", err) - } - if r.Index > bc.BlockHeight() { // just put it into the store for future checks - return bc.dao.PutStateRoot(&state.MPTRootState{ - MPTRoot: *r, - Flag: state.Unverified, - }) - } - - flag := state.Unverified - if r.Witness != nil { - if err := bc.verifyStateRootWitness(r); err != nil { - return fmt.Errorf("can't verify signature: %w", err) - } - flag = state.Verified - } - err = bc.dao.PutStateRoot(&state.MPTRootState{ - MPTRoot: *r, - Flag: flag, - }) - if err != nil { - return err - } - return bc.updateStateHeight(r.Index) -} - -func (bc *Blockchain) updateStateHeight(newHeight uint32) error { - h, err := bc.dao.GetCurrentStateRootHeight() - if err != nil { - return fmt.Errorf("can't get current state root height: %w", err) - } else if newHeight == h+1 { - updateStateHeightMetric(newHeight) - return bc.dao.PutCurrentStateRootHeight(h + 1) - } - return nil -} - -// verifyStateRoot checks if state root is valid. -func (bc *Blockchain) verifyStateRoot(r *state.MPTRoot) error { - if r.Index == 0 { - return nil - } - prev, err := bc.GetStateRoot(r.Index - 1) - if err != nil { - return errors.New("can't get previous state root") - } else if !r.PrevHash.Equals(hash.DoubleSha256(prev.GetSignedPart())) { - return errors.New("previous hash mismatch") - } else if prev.Version != r.Version { - return errors.New("version mismatch") - } - return nil -} - -// verifyStateRootWitness verifies that state root signature is correct. -func (bc *Blockchain) verifyStateRootWitness(r *state.MPTRoot) error { - b, err := bc.GetBlock(bc.GetHeaderHash(int(r.Index))) - if err != nil { - return err - } - return bc.VerifyWitness(b.NextConsensus, r, r.Witness, bc.contracts.Policy.GetMaxVerificationGas(bc.dao)) -} - // VerifyTx verifies whether transaction is bonafide or not relative to the // current blockchain state. Note that this verification is completely isolated // from the main node's mempool. @@ -1731,7 +1631,6 @@ func (bc *Blockchain) GetEnrollments() ([]state.Validator, error) { // GetTestVM returns a VM and a Store setup for a test run of some sort of code. func (bc *Blockchain) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *vm.VM { d := bc.dao.GetWrapped().(*dao.Simple) - d.MPT = nil systemInterop := bc.newInteropContext(t, d, b, tx) vm := systemInterop.SpawnVM() vm.SetPriceGetter(systemInterop.GetPrice) diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 35cfa850a..5503e2667 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -132,7 +132,7 @@ func TestAddBlockStateRoot(t *testing.T) { c.ProtocolConfiguration.StateRootInHeader = true }) - sr, err := bc.GetStateRoot(bc.BlockHeight()) + sr, err := bc.GetStateModule().GetStateRoot(bc.BlockHeight()) require.NoError(t, err) tx := newNEP17Transfer(bc.contracts.NEO.Hash, neoOwner, util.Uint160{}, 1) diff --git a/pkg/core/blockchainer/blockchainer.go b/pkg/core/blockchainer/blockchainer.go index be17fd192..bdfdc06d8 100644 --- a/pkg/core/blockchainer/blockchainer.go +++ b/pkg/core/blockchainer/blockchainer.go @@ -23,7 +23,6 @@ type Blockchainer interface { GetConfig() config.ProtocolConfiguration AddHeaders(...*block.Header) error AddBlock(*block.Block) error - AddStateRoot(r *state.MPTRoot) error CalculateClaimable(h util.Uint160, endHeight uint32) (*big.Int, error) Close() IsTxStillRelevant(t *transaction.Transaction, txpool *mempool.Pool, isPartialTx bool) bool @@ -54,8 +53,7 @@ type Blockchainer interface { GetValidators() ([]*keys.PublicKey, error) GetStandByCommittee() keys.PublicKeys GetStandByValidators() keys.PublicKeys - GetStateProof(root util.Uint256, key []byte) ([][]byte, error) - GetStateRoot(height uint32) (*state.MPTRootState, error) + GetStateModule() StateRoot GetStorageItem(id int32, key []byte) state.StorageItem GetStorageItems(id int32) (map[string]state.StorageItem, error) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *vm.VM diff --git a/pkg/core/blockchainer/state_root.go b/pkg/core/blockchainer/state_root.go new file mode 100644 index 000000000..57c76c69f --- /dev/null +++ b/pkg/core/blockchainer/state_root.go @@ -0,0 +1,13 @@ +package blockchainer + +import ( + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/util" +) + +// StateRoot represents local state root module. +type StateRoot interface { + CurrentLocalStateRoot() util.Uint256 + GetStateProof(root util.Uint256, key []byte) ([][]byte, error) + GetStateRoot(height uint32) (*state.MPTRoot, error) +} diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index af40a37f6..c8390aaf0 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/binary" "errors" - "fmt" iocore "io" "sort" @@ -42,12 +41,9 @@ type DAO interface { GetContractScriptHash(id int32) (util.Uint160, error) GetCurrentBlockHeight() (uint32, error) GetCurrentHeaderHeight() (i uint32, h util.Uint256, err error) - GetCurrentStateRootHeight() (uint32, error) GetHeaderHashes() ([]util.Uint256, error) GetNEP17Balances(acc util.Uint160) (*state.NEP17Balances, error) GetNEP17TransferLog(acc util.Uint160, index uint32) (*state.NEP17TransferLog, error) - GetStateRoot(height uint32) (*state.MPTRootState, error) - PutStateRoot(root *state.MPTRootState) error GetStorageItem(id int32, key []byte) state.StorageItem GetStorageItems(id int32) (map[string]state.StorageItem, error) GetStorageItemsWithPrefix(id int32, prefix []byte) (map[string]state.StorageItem, error) @@ -72,7 +68,6 @@ type DAO interface { // Simple is memCached wrapper around DB, simple DAO implementation. type Simple struct { - MPT *mpt.Trie Store *storage.MemCachedStore network netmode.Magic // stateRootInHeader specifies if block header contains state root. @@ -94,7 +89,6 @@ func (dao *Simple) GetBatch() *storage.MemBatch { // MemCachedStore around the current DAO Store. func (dao *Simple) GetWrapped() DAO { d := NewSimple(dao.Store, dao.network, dao.stateRootInHeader) - d.MPT = dao.MPT return d } @@ -289,75 +283,6 @@ func (dao *Simple) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWrit // -- start storage item. -func makeStateRootKey(height uint32) []byte { - key := make([]byte, 5) - key[0] = byte(storage.DataMPT) - binary.LittleEndian.PutUint32(key[1:], height) - return key -} - -// InitMPT initializes MPT at the given height. -func (dao *Simple) InitMPT(height uint32, enableRefCount bool) error { - var gcKey = []byte{byte(storage.DataMPT), 1} - if height == 0 { - dao.MPT = mpt.NewTrie(nil, enableRefCount, dao.Store) - var val byte - if enableRefCount { - val = 1 - } - return dao.Store.Put(gcKey, []byte{val}) - } - var hasRefCount bool - if v, err := dao.Store.Get(gcKey); err == nil { - hasRefCount = v[0] != 0 - } - if hasRefCount != enableRefCount { - return fmt.Errorf("KeepOnlyLatestState setting mismatch: old=%v, new=%v", hasRefCount, enableRefCount) - } - r, err := dao.GetStateRoot(height) - if err != nil { - return err - } - dao.MPT = mpt.NewTrie(mpt.NewHashNode(r.Root), enableRefCount, dao.Store) - return nil -} - -// GetCurrentStateRootHeight returns current state root height. -func (dao *Simple) GetCurrentStateRootHeight() (uint32, error) { - key := []byte{byte(storage.DataMPT)} - val, err := dao.Store.Get(key) - if err != nil { - if err == storage.ErrKeyNotFound { - err = nil - } - return 0, err - } - return binary.LittleEndian.Uint32(val), nil -} - -// PutCurrentStateRootHeight updates current state root height. -func (dao *Simple) PutCurrentStateRootHeight(height uint32) error { - key := []byte{byte(storage.DataMPT)} - val := make([]byte, 4) - binary.LittleEndian.PutUint32(val, height) - return dao.Store.Put(key, val) -} - -// GetStateRoot returns state root of a given height. -func (dao *Simple) GetStateRoot(height uint32) (*state.MPTRootState, error) { - r := new(state.MPTRootState) - err := dao.GetAndDecode(r, makeStateRootKey(height)) - if err != nil { - return nil, err - } - return r, nil -} - -// PutStateRoot puts state root of a given height into the store. -func (dao *Simple) PutStateRoot(r *state.MPTRootState) error { - return dao.Put(r, makeStateRootKey(r.Index)) -} - // GetStorageItem returns StorageItem if it exists in the given store. func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem { b, err := dao.Store.Get(makeStorageItemKey(id, key)) @@ -672,12 +597,11 @@ func (dao *Simple) Persist() (int, error) { return dao.Store.Persist() } -// UpdateMPT updates MPT using storage items from the underlying memcached store. -func (dao *Simple) UpdateMPT() error { +// GetMPTBatch storage changes to be applied to MPT. +func (dao *Simple) GetMPTBatch() mpt.Batch { var b mpt.Batch dao.Store.MemoryStore.SeekAll([]byte{byte(storage.STStorage)}, func(k, v []byte) { b.Add(k[1:], v) }) - _, err := dao.MPT.PutBatch(b) - return err + return b } diff --git a/pkg/core/helper_test.go b/pkg/core/helper_test.go index 40b64f9b5..74b858b8d 100644 --- a/pkg/core/helper_test.go +++ b/pkg/core/helper_test.go @@ -87,7 +87,7 @@ func (bc *Blockchain) newBlock(txs ...*transaction.Transaction) *block.Block { } } if bc.config.StateRootInHeader { - sr, err := bc.GetStateRoot(bc.BlockHeight()) + sr, err := bc.GetStateModule().GetStateRoot(bc.BlockHeight()) if err != nil { panic(err) } diff --git a/pkg/core/native_management_test.go b/pkg/core/native_management_test.go index ceaf4b204..2a61a168f 100644 --- a/pkg/core/native_management_test.go +++ b/pkg/core/native_management_test.go @@ -10,6 +10,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/chaindump" "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/stateroot" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/io" @@ -555,7 +556,8 @@ func TestContractDestroy(t *testing.T) { require.NoError(t, err) err = bc.dao.PutStorageItem(cs1.ID, []byte{1, 2, 3}, state.StorageItem{3, 2, 1}) require.NoError(t, err) - require.NoError(t, bc.dao.UpdateMPT()) + b := bc.dao.GetMPTBatch() + require.NoError(t, bc.GetStateModule().(*stateroot.Module).AddMPTBatch(bc.BlockHeight(), b)) t.Run("no contract", func(t *testing.T) { res, err := invokeContractMethod(bc, 1_00000000, mgmtHash, "destroy") diff --git a/pkg/core/prometheus.go b/pkg/core/prometheus.go index c849e3459..b81fb847d 100644 --- a/pkg/core/prometheus.go +++ b/pkg/core/prometheus.go @@ -30,14 +30,6 @@ var ( Namespace: "neogo", }, ) - //stateHeight prometheus metric. - stateHeight = prometheus.NewGauge( - prometheus.GaugeOpts{ - Help: "Current verified state height", - Name: "current_state_height", - Namespace: "neogo", - }, - ) ) func init() { @@ -59,7 +51,3 @@ func updateHeaderHeightMetric(hHeight int) { func updateBlockHeightMetric(bHeight uint32) { blockHeight.Set(float64(bHeight)) } - -func updateStateHeightMetric(sHeight uint32) { - stateHeight.Set(float64(sHeight)) -} diff --git a/pkg/core/state/mpt_root.go b/pkg/core/state/mpt_root.go index f2c890781..5fd716ff4 100644 --- a/pkg/core/state/mpt_root.go +++ b/pkg/core/state/mpt_root.go @@ -1,101 +1,54 @@ package state import ( - "encoding/json" - "errors" - "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util" ) -// MPTRootBase represents storage state root. -type MPTRootBase struct { - Version byte `json:"version"` - Index uint32 `json:"index"` - PrevHash util.Uint256 `json:"prehash"` - Root util.Uint256 `json:"stateroot"` -} - // MPTRoot represents storage state root together with sign info. type MPTRoot struct { - MPTRootBase + Version byte `json:"version"` + Index uint32 `json:"index"` + Root util.Uint256 `json:"stateroot"` Witness *transaction.Witness `json:"witness,omitempty"` } -// MPTRootStateFlag represents verification state of the state root. -type MPTRootStateFlag byte - -// Possible verification states of MPTRoot. -const ( - Unverified MPTRootStateFlag = 0x00 - Verified MPTRootStateFlag = 0x01 - Invalid MPTRootStateFlag = 0x03 -) - -// MPTRootState represents state root together with its verification state. -type MPTRootState struct { - MPTRoot `json:"stateroot"` - Flag MPTRootStateFlag `json:"flag"` -} - -// EncodeBinary implements io.Serializable. -func (s *MPTRootState) EncodeBinary(w *io.BinWriter) { - w.WriteB(byte(s.Flag)) - s.MPTRoot.EncodeBinary(w) -} - -// DecodeBinary implements io.Serializable. -func (s *MPTRootState) DecodeBinary(r *io.BinReader) { - s.Flag = MPTRootStateFlag(r.ReadB()) - s.MPTRoot.DecodeBinary(r) -} - // GetSignedPart returns part of MPTRootBase which needs to be signed. -func (s *MPTRootBase) GetSignedPart() []byte { +func (s *MPTRoot) GetSignedPart() []byte { buf := io.NewBufBinWriter() - s.EncodeBinary(buf.BinWriter) + s.EncodeBinaryUnsigned(buf.BinWriter) return buf.Bytes() } // GetSignedHash returns hash of MPTRootBase which needs to be signed. -func (s *MPTRootBase) GetSignedHash() util.Uint256 { - buf := io.NewBufBinWriter() - s.EncodeBinary(buf.BinWriter) - return hash.Sha256(buf.Bytes()) -} - -// Equals checks if s == other. -func (s *MPTRootBase) Equals(other *MPTRootBase) bool { - return s.Version == other.Version && s.Index == other.Index && - s.PrevHash.Equals(other.PrevHash) && s.Root.Equals(other.Root) +func (s *MPTRoot) GetSignedHash() util.Uint256 { + return hash.Sha256(s.GetSignedPart()) } // Hash returns hash of s. -func (s *MPTRootBase) Hash() util.Uint256 { +func (s *MPTRoot) Hash() util.Uint256 { return hash.DoubleSha256(s.GetSignedPart()) } -// DecodeBinary implements io.Serializable. -func (s *MPTRootBase) DecodeBinary(r *io.BinReader) { +// DecodeBinaryUnsigned decodes hashable part of state root. +func (s *MPTRoot) DecodeBinaryUnsigned(r *io.BinReader) { s.Version = r.ReadB() s.Index = r.ReadU32LE() - s.PrevHash.DecodeBinary(r) s.Root.DecodeBinary(r) } -// EncodeBinary implements io.Serializable. -func (s *MPTRootBase) EncodeBinary(w *io.BinWriter) { +// EncodeBinaryUnsigned encodes hashable part of state root.. +func (s *MPTRoot) EncodeBinaryUnsigned(w *io.BinWriter) { w.WriteB(s.Version) w.WriteU32LE(s.Index) - s.PrevHash.EncodeBinary(w) s.Root.EncodeBinary(w) } // DecodeBinary implements io.Serializable. func (s *MPTRoot) DecodeBinary(r *io.BinReader) { - s.MPTRootBase.DecodeBinary(r) + s.DecodeBinaryUnsigned(r) var ws []transaction.Witness r.ReadArray(&ws, 1) @@ -106,48 +59,10 @@ func (s *MPTRoot) DecodeBinary(r *io.BinReader) { // EncodeBinary implements io.Serializable. func (s *MPTRoot) EncodeBinary(w *io.BinWriter) { - s.MPTRootBase.EncodeBinary(w) + s.EncodeBinaryUnsigned(w) if s.Witness == nil { w.WriteVarUint(0) } else { w.WriteArray([]*transaction.Witness{s.Witness}) } } - -// String implements fmt.Stringer. -func (f MPTRootStateFlag) String() string { - switch f { - case Unverified: - return "Unverified" - case Verified: - return "Verified" - case Invalid: - return "Invalid" - default: - return "" - } -} - -// MarshalJSON implements json.Marshaler. -func (f MPTRootStateFlag) MarshalJSON() ([]byte, error) { - return []byte(`"` + f.String() + `"`), nil -} - -// UnmarshalJSON implements json.Unmarshaler. -func (f *MPTRootStateFlag) UnmarshalJSON(data []byte) error { - var s string - if err := json.Unmarshal(data, &s); err != nil { - return err - } - switch s { - case "Unverified": - *f = Unverified - case "Verified": - *f = Verified - case "Invalid": - *f = Invalid - default: - return errors.New("unknown flag") - } - return nil -} diff --git a/pkg/core/state/mpt_root_test.go b/pkg/core/state/mpt_root_test.go index da9ff9c46..eaf5c45dd 100644 --- a/pkg/core/state/mpt_root_test.go +++ b/pkg/core/state/mpt_root_test.go @@ -14,12 +14,9 @@ import ( func testStateRoot() *MPTRoot { return &MPTRoot{ - MPTRootBase: MPTRootBase{ - Version: byte(rand.Uint32()), - Index: rand.Uint32(), - PrevHash: random.Uint256(), - Root: random.Uint256(), - }, + Version: byte(rand.Uint32()), + Index: rand.Uint32(), + Root: random.Uint256(), } } @@ -36,64 +33,27 @@ func TestStateRoot_Serializable(t *testing.T) { }) } -func TestStateRootEquals(t *testing.T) { - r1 := testStateRoot() - r2 := *r1 - require.True(t, r1.Equals(&r2.MPTRootBase)) - - r2.MPTRootBase.Index++ - require.False(t, r1.Equals(&r2.MPTRootBase)) -} - -func TestMPTRootState_Serializable(t *testing.T) { - rs := &MPTRootState{ - MPTRoot: *testStateRoot(), - Flag: 0x04, - } - rs.MPTRoot.Witness = &transaction.Witness{ - InvocationScript: random.Bytes(10), - VerificationScript: random.Bytes(11), - } - testserdes.EncodeDecodeBinary(t, rs, new(MPTRootState)) -} - -func TestMPTRootStateUnverifiedByDefault(t *testing.T) { - var r MPTRootState - require.Equal(t, Unverified, r.Flag) -} - func TestMPTRoot_MarshalJSON(t *testing.T) { t.Run("Good", func(t *testing.T) { r := testStateRoot() - rs := &MPTRootState{ - MPTRoot: *r, - Flag: Verified, - } - testserdes.MarshalUnmarshalJSON(t, rs, new(MPTRootState)) + testserdes.MarshalUnmarshalJSON(t, r, new(MPTRoot)) }) t.Run("Compatibility", func(t *testing.T) { js := []byte(`{ - "flag": "Unverified", - "stateroot": { "version": 1, "index": 3000000, - "prehash": "0x4f30f43af8dd2262fc331c45bfcd9066ebbacda204e6e81371cbd884fe7d6c90", "stateroot": "0xb2fd7e368a848ef70d27cf44940a35237333ed05f1d971c9408f0eb285e0b6f3" - }}`) + }`) - rs := new(MPTRootState) + rs := new(MPTRoot) require.NoError(t, json.Unmarshal(js, &rs)) require.EqualValues(t, 1, rs.Version) require.EqualValues(t, 3000000, rs.Index) require.Nil(t, rs.Witness) - u, err := util.Uint256DecodeStringLE("4f30f43af8dd2262fc331c45bfcd9066ebbacda204e6e81371cbd884fe7d6c90") - require.NoError(t, err) - require.Equal(t, u, rs.PrevHash) - - u, err = util.Uint256DecodeStringLE("b2fd7e368a848ef70d27cf44940a35237333ed05f1d971c9408f0eb285e0b6f3") + u, err := util.Uint256DecodeStringLE("b2fd7e368a848ef70d27cf44940a35237333ed05f1d971c9408f0eb285e0b6f3") require.NoError(t, err) require.Equal(t, u, rs.Root) }) diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go new file mode 100644 index 000000000..9bf646fca --- /dev/null +++ b/pkg/core/stateroot/module.go @@ -0,0 +1,98 @@ +package stateroot + +import ( + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" + "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/util" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +type ( + // Module represents module for local processing of state roots. + Module struct { + Store *storage.MemCachedStore + mpt *mpt.Trie + bc blockchainer.Blockchainer + log *zap.Logger + + localHeight atomic.Uint32 + validatedHeight atomic.Uint32 + currentLocal atomic.Value + } +) + +// NewModule returns new instance of stateroot module. +func NewModule(bc blockchainer.Blockchainer, log *zap.Logger, s *storage.MemCachedStore) *Module { + return &Module{ + bc: bc, + log: log, + Store: s, + } +} + +// GetStateProof returns proof of having key in the MPT with the specified root. +func (s *Module) GetStateProof(root util.Uint256, key []byte) ([][]byte, error) { + tr := mpt.NewTrie(mpt.NewHashNode(root), false, storage.NewMemCachedStore(s.Store)) + return tr.GetProof(key) +} + +// GetStateRoot returns state root for a given height. +func (s *Module) GetStateRoot(height uint32) (*state.MPTRoot, error) { + return s.getStateRoot(makeStateRootKey(height)) +} + +// CurrentLocalStateRoot returns hash of the local state root. +func (s *Module) CurrentLocalStateRoot() util.Uint256 { + return s.currentLocal.Load().(util.Uint256) +} + +// Init initializes state root module at the given height. +func (s *Module) Init(height uint32, enableRefCount bool) error { + var gcKey = []byte{byte(storage.DataMPT), prefixGC} + if height == 0 { + s.mpt = mpt.NewTrie(nil, enableRefCount, s.Store) + var val byte + if enableRefCount { + val = 1 + } + s.currentLocal.Store(util.Uint256{}) + return s.Store.Put(gcKey, []byte{val}) + } + var hasRefCount bool + if v, err := s.Store.Get(gcKey); err == nil { + hasRefCount = v[0] != 0 + } + if hasRefCount != enableRefCount { + return fmt.Errorf("KeepOnlyLatestState setting mismatch: old=%v, new=%v", hasRefCount, enableRefCount) + } + r, err := s.getStateRoot(makeStateRootKey(height)) + if err != nil { + return err + } + s.currentLocal.Store(r.Root) + s.localHeight.Store(r.Index) + s.mpt = mpt.NewTrie(mpt.NewHashNode(r.Root), enableRefCount, s.Store) + return nil +} + +// AddMPTBatch updates using provided batch. +func (s *Module) AddMPTBatch(index uint32, b mpt.Batch) error { + if _, err := s.mpt.PutBatch(b); err != nil { + return err + } + s.mpt.Flush() + err := s.addLocalStateRoot(&state.MPTRoot{ + Index: index, + Root: s.mpt.StateRoot(), + }) + if err != nil { + return err + } + _, err = s.Store.Persist() + return err +} diff --git a/pkg/core/stateroot/prometheus.go b/pkg/core/stateroot/prometheus.go new file mode 100644 index 000000000..cd6346404 --- /dev/null +++ b/pkg/core/stateroot/prometheus.go @@ -0,0 +1,20 @@ +package stateroot + +import "github.com/prometheus/client_golang/prometheus" + +// stateHeight prometheus metric. +var stateHeight = prometheus.NewGauge( + prometheus.GaugeOpts{ + Help: "Current verified state height", + Name: "current_state_height", + Namespace: "neogo", + }, +) + +func init() { + prometheus.MustRegister(stateHeight) +} + +func updateStateHeightMetric(sHeight uint32) { + stateHeight.Set(float64(sHeight)) +} diff --git a/pkg/core/stateroot/store.go b/pkg/core/stateroot/store.go new file mode 100644 index 000000000..cc43bd0cf --- /dev/null +++ b/pkg/core/stateroot/store.go @@ -0,0 +1,56 @@ +package stateroot + +import ( + "encoding/binary" + + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/io" +) + +const ( + prefixGC = 0x01 + prefixLocal = 0x02 +) + +func (s *Module) addLocalStateRoot(sr *state.MPTRoot) error { + key := makeStateRootKey(sr.Index) + if err := s.putStateRoot(key, sr); err != nil { + return err + } + + data := make([]byte, 4) + binary.LittleEndian.PutUint32(data, sr.Index) + if err := s.Store.Put([]byte{byte(storage.DataMPT), prefixLocal}, data); err != nil { + return err + } + s.currentLocal.Store(sr.Root) + s.localHeight.Store(sr.Index) + updateStateHeightMetric(sr.Index) + return nil +} + +func (s *Module) putStateRoot(key []byte, sr *state.MPTRoot) error { + w := io.NewBufBinWriter() + sr.EncodeBinary(w.BinWriter) + return s.Store.Put(key, w.Bytes()) +} + +func (s *Module) getStateRoot(key []byte) (*state.MPTRoot, error) { + data, err := s.Store.Get(key) + if err != nil { + return nil, err + } + + sr := new(state.MPTRoot) + r := io.NewBinReaderFromBuf(data) + sr.DecodeBinary(r) + return sr, r.Err +} + +func makeStateRootKey(index uint32) []byte { + key := make([]byte, 5) + key[0] = byte(storage.DataMPT) + binary.BigEndian.PutUint32(key, index) + return key +} diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index db22543d7..fbc1d4a99 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -833,7 +833,7 @@ func (s *Server) getProof(ps request.Params) (interface{}, *response.Error) { return nil, response.ErrInvalidParams } skey := makeStorageKey(cs.ID, key) - proof, err := s.chain.GetStateProof(root, skey) + proof, err := s.chain.GetStateModule().GetStateProof(root, skey) return &result.GetProof{ Result: result.ProofWithKey{ Key: skey, @@ -884,16 +884,16 @@ func (s *Server) getStateRoot(ps request.Params) (interface{}, *response.Error) if p == nil { return nil, response.NewRPCError("Invalid parameter.", "", nil) } - var rt *state.MPTRootState + var rt *state.MPTRoot var h util.Uint256 height, err := p.GetInt() if err == nil { - rt, err = s.chain.GetStateRoot(uint32(height)) + rt, err = s.chain.GetStateModule().GetStateRoot(uint32(height)) } else if h, err = p.GetUint256(); err == nil { var hdr *block.Header hdr, err = s.chain.GetHeader(h) if err == nil { - rt, err = s.chain.GetStateRoot(hdr.Index) + rt, err = s.chain.GetStateModule().GetStateRoot(hdr.Index) } } if err != nil { diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index c643aa790..c7f430f3c 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -1287,7 +1287,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) [] }) }) t.Run("getproof", func(t *testing.T) { - r, err := chain.GetStateRoot(3) + r, err := chain.GetStateModule().GetStateRoot(3) require.NoError(t, err) rpc := fmt.Sprintf(`{"jsonrpc": "2.0", "id": 1, "method": "getproof", "params": ["%s", "%s", "%x"]}`, @@ -1316,11 +1316,11 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) [] body := doRPCCall(rpc, httpSrv.URL, t) rawRes := checkErrGetResult(t, body, false) - res := new(state.MPTRootState) + res := new(state.MPTRoot) require.NoError(t, json.Unmarshal(rawRes, res)) require.NotEqual(t, util.Uint256{}, res.Root) // be sure this test uses valid height - expected, err := e.chain.GetStateRoot(5) + expected, err := e.chain.GetStateModule().GetStateRoot(5) require.NoError(t, err) require.Equal(t, expected, res) } diff --git a/scripts/gendump/main.go b/scripts/gendump/main.go index 8b93aea5a..977cae05a 100644 --- a/scripts/gendump/main.go +++ b/scripts/gendump/main.go @@ -175,7 +175,7 @@ func newBlock(bc *core.Blockchain, lastBlock *block.Block, script []byte, txs .. Transactions: txs, } if bc.GetConfig().StateRootInHeader { - sr, err := bc.GetStateRoot(bc.BlockHeight()) + sr, err := bc.GetStateModule().GetStateRoot(bc.BlockHeight()) if err != nil { return nil, err } From ac227a80fee98e68f1313ff1652bac34288a0d6b Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Mon, 1 Feb 2021 19:00:07 +0300 Subject: [PATCH 4/6] stateroot: use RoleStateValidator for verification --- internal/fakechain/fakechain.go | 2 +- pkg/core/blockchain.go | 1 + pkg/core/blockchainer/state_root.go | 4 + pkg/core/native/designate.go | 11 +- pkg/core/stateroot/module.go | 45 +++++++- pkg/core/stateroot/store.go | 39 ++++++- pkg/core/stateroot/validators.go | 37 +++++++ pkg/core/stateroot_test.go | 153 ++++++++++++++++++++++++++++ pkg/network/server.go | 21 +++- pkg/rpc/server/server.go | 2 +- pkg/services/stateroot/message.go | 50 +++++++++ pkg/services/stateroot/service.go | 51 ++++++++++ 12 files changed, 405 insertions(+), 11 deletions(-) create mode 100644 pkg/core/stateroot/validators.go create mode 100644 pkg/core/stateroot_test.go create mode 100644 pkg/services/stateroot/message.go create mode 100644 pkg/services/stateroot/service.go diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index fae3804a7..f449b36e8 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -276,7 +276,7 @@ func (chain *FakeChain) GetEnrollments() ([]state.Validator, error) { // GetStateModule implements Blockchainer interface. func (chain *FakeChain) GetStateModule() blockchainer.StateRoot { - panic("TODO") + return nil } // GetStorageItem implements Blockchainer interface. diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 998992a12..73dcfe664 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -196,6 +196,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L } bc.stateRoot = stateroot.NewModule(bc, bc.log, bc.dao.Store) + bc.contracts.Designate.StateRootService = bc.stateRoot if err := bc.init(); err != nil { return nil, err diff --git a/pkg/core/blockchainer/state_root.go b/pkg/core/blockchainer/state_root.go index 57c76c69f..e1086edee 100644 --- a/pkg/core/blockchainer/state_root.go +++ b/pkg/core/blockchainer/state_root.go @@ -2,12 +2,16 @@ package blockchainer import ( "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" ) // StateRoot represents local state root module. type StateRoot interface { + AddStateRoot(root *state.MPTRoot) error CurrentLocalStateRoot() util.Uint256 + CurrentValidatedHeight() uint32 GetStateProof(root util.Uint256, key []byte) ([][]byte, error) GetStateRoot(height uint32) (*state.MPTRoot, error) + UpdateStateValidators(height uint32, pubs keys.PublicKeys) } diff --git a/pkg/core/native/designate.go b/pkg/core/native/designate.go index c7d225861..44e40978d 100644 --- a/pkg/core/native/designate.go +++ b/pkg/core/native/designate.go @@ -7,6 +7,7 @@ import ( "sort" "sync/atomic" + "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer/services" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/interop" @@ -40,6 +41,8 @@ type Designate struct { OracleService atomic.Value // NotaryService represents Notary node module. NotaryService atomic.Value + // StateRootService represents StateRoot node module. + StateRootService blockchainer.StateRoot } type roleData struct { @@ -172,12 +175,10 @@ func (s *Designate) hashFromNodes(r Role, nodes keys.PublicKeys) util.Uint160 { } var script []byte switch r { - case RoleOracle, RoleNeoFSAlphabet: - script, _ = smartcontract.CreateDefaultMultiSigRedeemScript(nodes.Copy()) case RoleP2PNotary: script, _ = smartcontract.CreateMultiSigRedeemScript(1, nodes.Copy()) default: - script, _ = smartcontract.CreateMajorityMultiSigRedeemScript(nodes.Copy()) + script, _ = smartcontract.CreateDefaultMultiSigRedeemScript(nodes.Copy()) } return hash.Hash160(script) } @@ -201,6 +202,10 @@ func (s *Designate) updateCachedRoleData(v *atomic.Value, d dao.DAO, r Role) err if ntr, _ := s.NotaryService.Load().(services.Notary); ntr != nil { ntr.UpdateNotaryNodes(nodeKeys.Copy()) } + case RoleStateValidator: + if s.StateRootService != nil { + s.StateRootService.UpdateStateValidators(height, nodeKeys.Copy()) + } } return nil } diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index 9bf646fca..477e040cb 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -1,12 +1,16 @@ package stateroot import ( + "encoding/binary" + "errors" "fmt" + "sync" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" "go.uber.org/zap" @@ -20,9 +24,19 @@ type ( bc blockchainer.Blockchainer log *zap.Logger + currentLocal atomic.Value localHeight atomic.Uint32 validatedHeight atomic.Uint32 - currentLocal atomic.Value + + mtx sync.RWMutex + keys []keyCache + } + + keyCache struct { + height uint32 + validatorsKeys keys.PublicKeys + validatorsHash util.Uint160 + validatorsScript []byte } ) @@ -51,8 +65,18 @@ func (s *Module) CurrentLocalStateRoot() util.Uint256 { return s.currentLocal.Load().(util.Uint256) } +// CurrentValidatedHeight returns current state root validated height. +func (s *Module) CurrentValidatedHeight() uint32 { + return s.validatedHeight.Load() +} + // Init initializes state root module at the given height. func (s *Module) Init(height uint32, enableRefCount bool) error { + data, err := s.Store.Get([]byte{byte(storage.DataMPT), prefixValidated}) + if err == nil { + s.validatedHeight.Store(binary.LittleEndian.Uint32(data)) + } + var gcKey = []byte{byte(storage.DataMPT), prefixGC} if height == 0 { s.mpt = mpt.NewTrie(nil, enableRefCount, s.Store) @@ -96,3 +120,22 @@ func (s *Module) AddMPTBatch(index uint32, b mpt.Batch) error { _, err = s.Store.Persist() return err } + +// VerifyStateRoot checks if state root is valid. +func (s *Module) VerifyStateRoot(r *state.MPTRoot) error { + _, err := s.getStateRoot(makeStateRootKey(r.Index - 1)) + if err != nil { + return errors.New("can't get previous state root") + } + return s.verifyWitness(r) +} + +const maxVerificationGAS = 1_00000000 + +// verifyWitness verifies state root witness. +func (s *Module) verifyWitness(r *state.MPTRoot) error { + s.mtx.Lock() + h := s.getKeyCacheForHeight(r.Index).validatorsHash + s.mtx.Unlock() + return s.bc.VerifyWitness(h, r, r.Witness, maxVerificationGAS) +} diff --git a/pkg/core/stateroot/store.go b/pkg/core/stateroot/store.go index cc43bd0cf..b2ab7d210 100644 --- a/pkg/core/stateroot/store.go +++ b/pkg/core/stateroot/store.go @@ -9,8 +9,9 @@ import ( ) const ( - prefixGC = 0x01 - prefixLocal = 0x02 + prefixGC = 0x01 + prefixLocal = 0x02 + prefixValidated = 0x03 ) func (s *Module) addLocalStateRoot(sr *state.MPTRoot) error { @@ -26,7 +27,10 @@ func (s *Module) addLocalStateRoot(sr *state.MPTRoot) error { } s.currentLocal.Store(sr.Root) s.localHeight.Store(sr.Index) - updateStateHeightMetric(sr.Index) + if s.bc.GetConfig().StateRootInHeader { + s.validatedHeight.Store(sr.Index) + updateStateHeightMetric(sr.Index) + } return nil } @@ -54,3 +58,32 @@ func makeStateRootKey(index uint32) []byte { binary.BigEndian.PutUint32(key, index) return key } + +// AddStateRoot adds validated state root provided by network. +func (s *Module) AddStateRoot(sr *state.MPTRoot) error { + if err := s.VerifyStateRoot(sr); err != nil { + return err + } + key := makeStateRootKey(sr.Index) + local, err := s.getStateRoot(key) + if err != nil { + return err + } + if local.Witness != nil { + return nil + } + if err := s.putStateRoot(key, sr); err != nil { + return err + } + + data := make([]byte, 4) + binary.LittleEndian.PutUint32(data, sr.Index) + if err := s.Store.Put([]byte{byte(storage.DataMPT), prefixValidated}, data); err != nil { + return err + } + s.validatedHeight.Store(sr.Index) + if !s.bc.GetConfig().StateRootInHeader { + updateStateHeightMetric(sr.Index) + } + return nil +} diff --git a/pkg/core/stateroot/validators.go b/pkg/core/stateroot/validators.go new file mode 100644 index 000000000..58348091a --- /dev/null +++ b/pkg/core/stateroot/validators.go @@ -0,0 +1,37 @@ +package stateroot + +import ( + "sort" + + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/smartcontract" +) + +// UpdateStateValidators updates list of state validator keys. +func (s *Module) UpdateStateValidators(height uint32, pubs keys.PublicKeys) { + script, _ := smartcontract.CreateDefaultMultiSigRedeemScript(pubs) + h := hash.Hash160(script) + + s.mtx.Lock() + kc := s.getKeyCacheForHeight(height) + if kc.validatorsHash != h { + s.keys = append(s.keys, keyCache{ + height: height, + validatorsKeys: pubs, + validatorsHash: h, + validatorsScript: script, + }) + } + s.mtx.Unlock() +} + +func (s *Module) getKeyCacheForHeight(h uint32) keyCache { + index := sort.Search(len(s.keys), func(i int) bool { + return s.keys[i].height >= h + }) + if index == len(s.keys) { + return keyCache{} + } + return s.keys[index] +} diff --git a/pkg/core/stateroot_test.go b/pkg/core/stateroot_test.go new file mode 100644 index 000000000..163ae2445 --- /dev/null +++ b/pkg/core/stateroot_test.go @@ -0,0 +1,153 @@ +package core + +import ( + "errors" + "sort" + "testing" + + "github.com/nspcc-dev/neo-go/internal/testserdes" + "github.com/nspcc-dev/neo-go/pkg/core/native" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/stateroot" + "github.com/nspcc-dev/neo-go/pkg/smartcontract" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/vm/emit" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/stretchr/testify/require" +) + +func testSignStateRoot(t *testing.T, r *state.MPTRoot, pubs keys.PublicKeys, accs ...*wallet.Account) []byte { + n := smartcontract.GetMajorityHonestNodeCount(len(accs)) + w := io.NewBufBinWriter() + for i := 0; i < n; i++ { + sig := accs[i].PrivateKey().SignHash(r.GetSignedHash()) + emit.Bytes(w.BinWriter, sig) + } + require.NoError(t, w.Err) + + script, err := smartcontract.CreateMajorityMultiSigRedeemScript(pubs.Copy()) + require.NoError(t, err) + r.Witness = &transaction.Witness{ + VerificationScript: script, + InvocationScript: w.Bytes(), + } + data, err := testserdes.EncodeBinary(stateroot.NewMessage(stateroot.RootT, r)) + require.NoError(t, err) + return data +} + +func newMajorityMultisigWithGAS(t *testing.T, n int) (util.Uint160, keys.PublicKeys, []*wallet.Account) { + accs := make([]*wallet.Account, n) + for i := range accs { + acc, err := wallet.NewAccount() + require.NoError(t, err) + accs[i] = acc + } + sort.Slice(accs, func(i, j int) bool { + pi := accs[i].PrivateKey().PublicKey() + pj := accs[j].PrivateKey().PublicKey() + return pi.Cmp(pj) == -1 + }) + pubs := make(keys.PublicKeys, n) + for i := range pubs { + pubs[i] = accs[i].PrivateKey().PublicKey() + } + script, err := smartcontract.CreateMajorityMultiSigRedeemScript(pubs) + require.NoError(t, err) + return hash.Hash160(script), pubs, accs +} + +func TestStateRoot(t *testing.T) { + bc := newTestChain(t) + + h, pubs, accs := newMajorityMultisigWithGAS(t, 2) + bc.setNodesByRole(t, true, native.RoleStateValidator, pubs) + updateIndex := bc.BlockHeight() + transferTokenFromMultisigAccount(t, bc, h, bc.contracts.GAS.Hash, 1_0000_0000) + + srv, err := stateroot.New(bc.GetStateModule()) + require.NoError(t, err) + require.EqualValues(t, 0, srv.CurrentValidatedHeight()) + r, err := srv.GetStateRoot(bc.BlockHeight()) + require.NoError(t, err) + require.Equal(t, r.Root, srv.CurrentLocalStateRoot()) + + t.Run("invalid message", func(t *testing.T) { + require.Error(t, srv.OnPayload(&payload.Extensible{Data: []byte{42}})) + require.EqualValues(t, 0, srv.CurrentValidatedHeight()) + }) + t.Run("drop zero index", func(t *testing.T) { + r, err := srv.GetStateRoot(0) + require.NoError(t, err) + data, err := testserdes.EncodeBinary(stateroot.NewMessage(stateroot.RootT, r)) + require.NoError(t, err) + require.NoError(t, srv.OnPayload(&payload.Extensible{Data: data})) + require.EqualValues(t, 0, srv.CurrentValidatedHeight()) + }) + t.Run("invalid height", func(t *testing.T) { + r, err := srv.GetStateRoot(1) + require.NoError(t, err) + r.Index = 10 + data := testSignStateRoot(t, r, pubs, accs...) + require.Error(t, srv.OnPayload(&payload.Extensible{Data: data})) + require.EqualValues(t, 0, srv.CurrentValidatedHeight()) + }) + t.Run("invalid signer", func(t *testing.T) { + accInv, err := wallet.NewAccount() + require.NoError(t, err) + pubs := keys.PublicKeys{accInv.PrivateKey().PublicKey()} + require.NoError(t, accInv.ConvertMultisig(1, pubs)) + transferTokenFromMultisigAccount(t, bc, accInv.Contract.ScriptHash(), bc.contracts.GAS.Hash, 1_0000_0000) + r, err := srv.GetStateRoot(1) + require.NoError(t, err) + data := testSignStateRoot(t, r, pubs, accInv) + err = srv.OnPayload(&payload.Extensible{Data: data}) + require.True(t, errors.Is(err, ErrWitnessHashMismatch), "got: %v", err) + require.EqualValues(t, 0, srv.CurrentValidatedHeight()) + }) + + r, err = srv.GetStateRoot(updateIndex + 1) + require.NoError(t, err) + data := testSignStateRoot(t, r, pubs, accs...) + require.NoError(t, srv.OnPayload(&payload.Extensible{Data: data})) + require.EqualValues(t, 2, srv.CurrentValidatedHeight()) + + r, err = srv.GetStateRoot(updateIndex + 1) + require.NoError(t, err) + require.NotNil(t, r.Witness) + require.Equal(t, h, r.Witness.ScriptHash()) +} + +func TestStateRootInitNonZeroHeight(t *testing.T) { + st := memoryStore{storage.NewMemoryStore()} + h, pubs, accs := newMajorityMultisigWithGAS(t, 2) + + var root util.Uint256 + t.Run("init", func(t *testing.T) { // this is in a separate test to do proper cleanup + bc := newTestChainWithCustomCfgAndStore(t, st, nil) + bc.setNodesByRole(t, true, native.RoleStateValidator, pubs) + transferTokenFromMultisigAccount(t, bc, h, bc.contracts.GAS.Hash, 1_0000_0000) + + _, err := persistBlock(bc) + require.NoError(t, err) + srv, err := stateroot.New(bc.GetStateModule()) + require.NoError(t, err) + r, err := srv.GetStateRoot(2) + require.NoError(t, err) + data := testSignStateRoot(t, r, pubs, accs...) + require.NoError(t, srv.OnPayload(&payload.Extensible{Data: data})) + require.EqualValues(t, 2, srv.CurrentValidatedHeight()) + root = srv.CurrentLocalStateRoot() + }) + + bc2 := newTestChainWithCustomCfgAndStore(t, st, nil) + srv := bc2.GetStateModule() + require.EqualValues(t, 2, srv.CurrentValidatedHeight()) + require.Equal(t, root, srv.CurrentLocalStateRoot()) +} diff --git a/pkg/network/server.go b/pkg/network/server.go index 89b3bc302..0d594c42a 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -22,6 +22,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/services/notary" "github.com/nspcc-dev/neo-go/pkg/services/oracle" + "github.com/nspcc-dev/neo-go/pkg/services/stateroot" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" "go.uber.org/zap" @@ -87,7 +88,8 @@ type ( consensusStarted *atomic.Bool canHandleExtens *atomic.Bool - oracle *oracle.Oracle + oracle *oracle.Oracle + stateRoot stateroot.Service log *zap.Logger } @@ -171,6 +173,12 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai } }) + sr, err := stateroot.New(chain.GetStateModule()) + if err != nil { + return nil, fmt.Errorf("can't initialize StateRoot service: %w", err) + } + s.stateRoot = sr + if config.OracleCfg.Enabled { orcCfg := oracle.Config{ Log: log, @@ -295,6 +303,11 @@ func (s *Server) GetOracle() *oracle.Oracle { return s.oracle } +// GetStateRoot returns state root service instance. +func (s *Server) GetStateRoot() stateroot.Service { + return s.stateRoot +} + // UnconnectedPeers returns a list of peers that are in the discovery peer list // but are not connected to the server. func (s *Server) UnconnectedPeers() []string { @@ -803,7 +816,11 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { switch e.Category { case consensus.Category: s.consensus.OnPayload(e) - case "StateService": // no-op for now + case stateroot.Category: + err := s.stateRoot.OnPayload(e) + if err != nil { + return err + } default: return errors.New("invalid category") } diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index fbc1d4a99..b193746f4 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -869,7 +869,7 @@ func (s *Server) verifyProof(ps request.Params) (interface{}, *response.Error) { func (s *Server) getStateHeight(_ request.Params) (interface{}, *response.Error) { var height = s.chain.BlockHeight() - var stateHeight uint32 + var stateHeight = s.chain.GetStateModule().CurrentValidatedHeight() if s.chain.GetConfig().StateRootInHeader { stateHeight = height - 1 } diff --git a/pkg/services/stateroot/message.go b/pkg/services/stateroot/message.go new file mode 100644 index 000000000..c9f603a8a --- /dev/null +++ b/pkg/services/stateroot/message.go @@ -0,0 +1,50 @@ +package stateroot + +import ( + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/io" +) + +type ( + // MessageType represents message type. + MessageType byte + + // Message represents state-root related message. + Message struct { + Type MessageType + Payload io.Serializable + } +) + +// Various message types. +const ( + RootT MessageType = 1 +) + +// NewMessage creates new message of specified type. +func NewMessage(typ MessageType, p io.Serializable) *Message { + return &Message{ + Type: typ, + Payload: p, + } +} + +// EncodeBinary implements io.Serializable interface. +func (m *Message) EncodeBinary(w *io.BinWriter) { + w.WriteB(byte(m.Type)) + m.Payload.EncodeBinary(w) +} + +// DecodeBinary implements io.Serializable interface. +func (m *Message) DecodeBinary(r *io.BinReader) { + switch m.Type = MessageType(r.ReadB()); m.Type { + case RootT: + m.Payload = new(state.MPTRoot) + default: + r.Err = fmt.Errorf("invalid type: %x", m.Type) + return + } + m.Payload.DecodeBinary(r) +} diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go new file mode 100644 index 000000000..7dfc49bb7 --- /dev/null +++ b/pkg/services/stateroot/service.go @@ -0,0 +1,51 @@ +package stateroot + +import ( + "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/payload" +) + +type ( + // Service represents state root service. + Service interface { + blockchainer.StateRoot + OnPayload(p *payload.Extensible) error + } + + service struct { + blockchainer.StateRoot + } +) + +const ( + // Category is message category for extensible payloads. + Category = "StateService" +) + +// New returns new state root service instance using underlying module. +func New(mod blockchainer.StateRoot) (Service, error) { + return &service{ + StateRoot: mod, + }, nil +} + +// OnPayload implements Service interface. +func (s *service) OnPayload(ep *payload.Extensible) error { + m := new(Message) + r := io.NewBinReaderFromBuf(ep.Data) + m.DecodeBinary(r) + if r.Err != nil { + return r.Err + } + switch m.Type { + case RootT: + sr := m.Payload.(*state.MPTRoot) + if sr.Index == 0 { + return nil + } + return s.AddStateRoot(sr) + } + return nil +} From 3c65ed15075533cc76e3279f62869e386190c0bc Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Tue, 2 Feb 2021 12:34:27 +0300 Subject: [PATCH 5/6] stateroot: allow to sign new roots --- pkg/config/application_config.go | 1 + pkg/config/state_root.go | 7 ++ pkg/core/blockchainer/state_root.go | 3 + pkg/core/stateroot/callbacks.go | 20 ++++++ pkg/core/stateroot/module.go | 3 + pkg/core/stateroot/store.go | 3 + pkg/core/stateroot/validators.go | 23 ++++--- pkg/core/stateroot_test.go | 96 ++++++++++++++++++++++++++- pkg/network/server.go | 21 ++++-- pkg/network/server_config.go | 4 ++ pkg/services/stateroot/message.go | 3 + pkg/services/stateroot/network.go | 98 ++++++++++++++++++++++++++++ pkg/services/stateroot/service.go | 80 +++++++++++++++++++++-- pkg/services/stateroot/signature.go | 88 +++++++++++++++++++++++++ pkg/services/stateroot/validators.go | 55 ++++++++++++++++ pkg/services/stateroot/vote.go | 27 ++++++++ 16 files changed, 514 insertions(+), 18 deletions(-) create mode 100644 pkg/config/state_root.go create mode 100644 pkg/core/stateroot/callbacks.go create mode 100644 pkg/services/stateroot/network.go create mode 100644 pkg/services/stateroot/signature.go create mode 100644 pkg/services/stateroot/validators.go create mode 100644 pkg/services/stateroot/vote.go diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index f14ad9941..e2c7e10ec 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -28,4 +28,5 @@ type ApplicationConfiguration struct { UnlockWallet Wallet `yaml:"UnlockWallet"` Oracle OracleConfiguration `yaml:"Oracle"` P2PNotary P2PNotary `yaml:"P2PNotary"` + StateRoot StateRoot `yaml:"StateRoot"` } diff --git a/pkg/config/state_root.go b/pkg/config/state_root.go new file mode 100644 index 000000000..904c104fb --- /dev/null +++ b/pkg/config/state_root.go @@ -0,0 +1,7 @@ +package config + +// StateRoot contains state root service configuration. +type StateRoot struct { + Enabled bool `yaml:"Enabled"` + UnlockWallet Wallet `yaml:"UnlockWallet"` +} diff --git a/pkg/core/blockchainer/state_root.go b/pkg/core/blockchainer/state_root.go index e1086edee..556427f84 100644 --- a/pkg/core/blockchainer/state_root.go +++ b/pkg/core/blockchainer/state_root.go @@ -13,5 +13,8 @@ type StateRoot interface { CurrentValidatedHeight() uint32 GetStateProof(root util.Uint256, key []byte) ([][]byte, error) GetStateRoot(height uint32) (*state.MPTRoot, error) + GetStateValidators(height uint32) keys.PublicKeys + SetSignAndSendCallback(func(*state.MPTRoot) error) + SetUpdateValidatorsCallback(func(keys.PublicKeys)) UpdateStateValidators(height uint32, pubs keys.PublicKeys) } diff --git a/pkg/core/stateroot/callbacks.go b/pkg/core/stateroot/callbacks.go new file mode 100644 index 000000000..19ea0a17e --- /dev/null +++ b/pkg/core/stateroot/callbacks.go @@ -0,0 +1,20 @@ +package stateroot + +import ( + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" +) + +// SetSignAndSendCb sets callback for sending signed root. +func (s *Module) SetSignAndSendCallback(f func(*state.MPTRoot) error) { + s.mtx.Lock() + defer s.mtx.Unlock() + s.signAndSendCb = f +} + +// SetUpdateValidatorsCallback sets callback for sending signed root. +func (s *Module) SetUpdateValidatorsCallback(f func(keys.PublicKeys)) { + s.mtx.Lock() + defer s.mtx.Unlock() + s.updateValidatorsCb = f +} diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index 477e040cb..9b7c3bf57 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -30,6 +30,9 @@ type ( mtx sync.RWMutex keys []keyCache + + updateValidatorsCb func(publicKeys keys.PublicKeys) + signAndSendCb func(*state.MPTRoot) error } keyCache struct { diff --git a/pkg/core/stateroot/store.go b/pkg/core/stateroot/store.go index b2ab7d210..9efbd4686 100644 --- a/pkg/core/stateroot/store.go +++ b/pkg/core/stateroot/store.go @@ -31,6 +31,9 @@ func (s *Module) addLocalStateRoot(sr *state.MPTRoot) error { s.validatedHeight.Store(sr.Index) updateStateHeightMetric(sr.Index) } + if s.signAndSendCb != nil { + return s.signAndSendCb(sr) + } return nil } diff --git a/pkg/core/stateroot/validators.go b/pkg/core/stateroot/validators.go index 58348091a..e07a447e2 100644 --- a/pkg/core/stateroot/validators.go +++ b/pkg/core/stateroot/validators.go @@ -1,8 +1,6 @@ package stateroot import ( - "sort" - "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/smartcontract" @@ -14,6 +12,9 @@ func (s *Module) UpdateStateValidators(height uint32, pubs keys.PublicKeys) { h := hash.Hash160(script) s.mtx.Lock() + if s.updateValidatorsCb != nil { + s.updateValidatorsCb(pubs) + } kc := s.getKeyCacheForHeight(height) if kc.validatorsHash != h { s.keys = append(s.keys, keyCache{ @@ -27,11 +28,17 @@ func (s *Module) UpdateStateValidators(height uint32, pubs keys.PublicKeys) { } func (s *Module) getKeyCacheForHeight(h uint32) keyCache { - index := sort.Search(len(s.keys), func(i int) bool { - return s.keys[i].height >= h - }) - if index == len(s.keys) { - return keyCache{} + for i := len(s.keys) - 1; i >= 0; i-- { + if s.keys[i].height <= h && (i+1 == len(s.keys) || s.keys[i+1].height < h) { + return s.keys[i] + } } - return s.keys[index] + return keyCache{} +} + +// GetStateValidators returns current state validators. +func (s *Module) GetStateValidators(height uint32) keys.PublicKeys { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.getKeyCacheForHeight(height).validatorsKeys.Copy() } diff --git a/pkg/core/stateroot_test.go b/pkg/core/stateroot_test.go index 163ae2445..814e48498 100644 --- a/pkg/core/stateroot_test.go +++ b/pkg/core/stateroot_test.go @@ -2,10 +2,13 @@ package core import ( "errors" + "os" + "path" "sort" "testing" "github.com/nspcc-dev/neo-go/internal/testserdes" + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" @@ -20,6 +23,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) func testSignStateRoot(t *testing.T, r *state.MPTRoot, pubs keys.PublicKeys, accs ...*wallet.Account) []byte { @@ -71,7 +75,12 @@ func TestStateRoot(t *testing.T) { updateIndex := bc.BlockHeight() transferTokenFromMultisigAccount(t, bc, h, bc.contracts.GAS.Hash, 1_0000_0000) - srv, err := stateroot.New(bc.GetStateModule()) + tmpDir := path.Join(os.TempDir(), "neogo.initsnz") + require.NoError(t, os.Mkdir(tmpDir, os.ModePerm)) + defer os.RemoveAll(tmpDir) + w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass") + cfg := createStateRootConfig(w.Path(), "pass") + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) require.NoError(t, err) require.EqualValues(t, 0, srv.CurrentValidatedHeight()) r, err := srv.GetStateRoot(bc.BlockHeight()) @@ -136,7 +145,12 @@ func TestStateRootInitNonZeroHeight(t *testing.T) { _, err := persistBlock(bc) require.NoError(t, err) - srv, err := stateroot.New(bc.GetStateModule()) + tmpDir := path.Join(os.TempDir(), "neogo.initsnz") + require.NoError(t, os.Mkdir(tmpDir, os.ModePerm)) + defer os.RemoveAll(tmpDir) + w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass") + cfg := createStateRootConfig(w.Path(), "pass") + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) require.NoError(t, err) r, err := srv.GetStateRoot(2) require.NoError(t, err) @@ -151,3 +165,81 @@ func TestStateRootInitNonZeroHeight(t *testing.T) { require.EqualValues(t, 2, srv.CurrentValidatedHeight()) require.Equal(t, root, srv.CurrentLocalStateRoot()) } + +func createAndWriteWallet(t *testing.T, acc *wallet.Account, path, password string) *wallet.Wallet { + w, err := wallet.NewWallet(path) + require.NoError(t, err) + require.NoError(t, acc.Encrypt(password)) + w.AddAccount(acc) + require.NoError(t, w.Save()) + w.Close() + return w +} + +func createStateRootConfig(walletPath, password string) config.StateRoot { + return config.StateRoot{ + Enabled: true, + UnlockWallet: config.Wallet{ + Path: walletPath, + Password: password, + }, + } +} + +func TestStateRootFull(t *testing.T) { + tmpDir := path.Join(os.TempDir(), "neogo.stateroot4") + require.NoError(t, os.Mkdir(tmpDir, os.ModePerm)) + defer os.RemoveAll(tmpDir) + + bc := newTestChain(t) + + h, pubs, accs := newMajorityMultisigWithGAS(t, 2) + w := createAndWriteWallet(t, accs[1], path.Join(tmpDir, "wallet2"), "two") + cfg := createStateRootConfig(w.Path(), "two") + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) + require.NoError(t, err) + + var lastValidated *payload.Extensible + srv.SetRelayCallback(func(ep *payload.Extensible) { + lastValidated = ep + }) + + bc.setNodesByRole(t, true, native.RoleStateValidator, pubs) + transferTokenFromMultisigAccount(t, bc, h, bc.contracts.GAS.Hash, 1_0000_0000) + checkVoteBroadcasted(t, bc, lastValidated, 2, 1) + _, err = persistBlock(bc) + checkVoteBroadcasted(t, bc, lastValidated, 3, 1) + + r, err := srv.GetStateRoot(2) + require.NoError(t, err) + require.NoError(t, srv.AddSignature(2, 0, accs[0].PrivateKey().SignHash(r.GetSignedHash()))) + require.NotNil(t, lastValidated) + + msg := new(stateroot.Message) + require.NoError(t, testserdes.DecodeBinary(lastValidated.Data, msg)) + require.Equal(t, stateroot.RootT, msg.Type) + + actual := msg.Payload.(*state.MPTRoot) + require.Equal(t, r.Index, actual.Index) + require.Equal(t, r.Version, actual.Version) + require.Equal(t, r.Root, actual.Root) +} + +func checkVoteBroadcasted(t *testing.T, bc *Blockchain, p *payload.Extensible, + height uint32, valIndex byte) { + require.NotNil(t, p) + m := new(stateroot.Message) + require.NoError(t, testserdes.DecodeBinary(p.Data, m)) + require.Equal(t, stateroot.VoteT, m.Type) + vote := m.Payload.(*stateroot.Vote) + + srv := bc.GetStateModule() + r, err := srv.GetStateRoot(bc.BlockHeight()) + require.NoError(t, err) + require.Equal(t, height, vote.Height) + require.Equal(t, int32(valIndex), vote.ValidatorIndex) + + pubs, _, err := bc.contracts.Designate.GetDesignatedByRole(bc.dao, native.RoleStateValidator, bc.BlockHeight()) + require.True(t, len(pubs) > int(valIndex)) + require.True(t, pubs[valIndex].Verify(vote.Signature, r.GetSignedHash().BytesBE())) +} diff --git a/pkg/network/server.go b/pkg/network/server.go index 0d594c42a..e0e2dea65 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -173,7 +173,11 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai } }) - sr, err := stateroot.New(chain.GetStateModule()) + if config.StateRootCfg.Enabled && chain.GetConfig().StateRootInHeader { + return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled") + } + + sr, err := stateroot.New(config.StateRootCfg, s.log, chain.GetStateModule()) if err != nil { return nil, fmt.Errorf("can't initialize StateRoot service: %w", err) } @@ -216,6 +220,10 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai s.consensus = srv + if config.StateRootCfg.Enabled { + s.stateRoot.SetRelayCallback(s.handleNewPayload) + } + if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), @@ -1052,9 +1060,14 @@ func (s *Server) handleNewPayload(p *payload.Extensible) { } msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{p.Hash()})) - // It's high priority because it directly affects consensus process, - // even though it's just an inv. - s.broadcastHPMessage(msg) + switch p.Category { + case consensus.Category: + // It's high priority because it directly affects consensus process, + // even though it's just an inv. + s.broadcastHPMessage(msg) + default: + s.broadcastMessage(msg) + } } func (s *Server) requestTx(hashes ...util.Uint256) { diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index 907ada125..42fed7a23 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -72,6 +72,9 @@ type ( // P2PNotaryCfg is notary module configuration. P2PNotaryCfg config.P2PNotary + + // StateRootCfg is stateroot module configuration. + StateRootCfg config.StateRoot } ) @@ -104,5 +107,6 @@ func NewServerConfig(cfg config.Config) ServerConfig { TimePerBlock: time.Duration(protoConfig.SecondsPerBlock) * time.Second, OracleCfg: appConfig.Oracle, P2PNotaryCfg: appConfig.P2PNotary, + StateRootCfg: appConfig.StateRoot, } } diff --git a/pkg/services/stateroot/message.go b/pkg/services/stateroot/message.go index c9f603a8a..5b1e32e73 100644 --- a/pkg/services/stateroot/message.go +++ b/pkg/services/stateroot/message.go @@ -20,6 +20,7 @@ type ( // Various message types. const ( + VoteT MessageType = 0 RootT MessageType = 1 ) @@ -40,6 +41,8 @@ func (m *Message) EncodeBinary(w *io.BinWriter) { // DecodeBinary implements io.Serializable interface. func (m *Message) DecodeBinary(r *io.BinReader) { switch m.Type = MessageType(r.ReadB()); m.Type { + case VoteT: + m.Payload = new(Vote) case RootT: m.Payload = new(state.MPTRoot) default: diff --git a/pkg/services/stateroot/network.go b/pkg/services/stateroot/network.go new file mode 100644 index 000000000..b5489edc5 --- /dev/null +++ b/pkg/services/stateroot/network.go @@ -0,0 +1,98 @@ +package stateroot + +import ( + "errors" + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/payload" + "go.uber.org/zap" +) + +// RelayCallback represents callback for sending validated state roots. +type RelayCallback = func(*payload.Extensible) + +// AddSignature adds state root signature. +func (s *service) AddSignature(height uint32, validatorIndex int32, sig []byte) error { + if !s.MainCfg.Enabled { + return nil + } + + pubs := s.GetStateValidators(height) + if validatorIndex < 0 || int(validatorIndex) >= len(pubs) { + return errors.New("invalid validator index") + } + pub := pubs[validatorIndex] + + incRoot := s.getIncompleteRoot(height) + if incRoot == nil { + return nil + } + + incRoot.Lock() + if incRoot.root != nil { + ok := pub.Verify(sig, incRoot.root.GetSignedHash().BytesBE()) + if !ok { + incRoot.Unlock() + return fmt.Errorf("invalid state root signature for %d", validatorIndex) + } + } + incRoot.addSignature(pub, sig) + sr, ready := incRoot.finalize(pubs) + incRoot.Unlock() + + if ready { + err := s.AddStateRoot(sr) + if err != nil { + s.log.Error("can't add validated state root", zap.Error(err)) + } + s.sendValidatedRoot(sr) + } + return nil +} + +// GetConfig returns service configuration. +func (s *service) GetConfig() config.StateRoot { + return s.MainCfg +} + +func (s *service) getIncompleteRoot(height uint32) *incompleteRoot { + s.srMtx.Lock() + defer s.srMtx.Unlock() + if incRoot, ok := s.incompleteRoots[height]; ok { + return incRoot + } + incRoot := &incompleteRoot{sigs: make(map[string]*rootSig)} + s.incompleteRoots[height] = incRoot + return incRoot +} + +func (s *service) sendValidatedRoot(r *state.MPTRoot) { + w := io.NewBufBinWriter() + m := NewMessage(RootT, r) + m.EncodeBinary(w.BinWriter) + ep := &payload.Extensible{ + Network: s.Network, + ValidBlockStart: r.Index, + ValidBlockEnd: r.Index + transaction.MaxValidUntilBlockIncrement, + Sender: s.getAccount().PrivateKey().GetScriptHash(), + Data: w.Bytes(), + } + s.getRelayCallback()(ep) +} + +func (s *service) getRelayCallback() RelayCallback { + s.cbMtx.RLock() + defer s.cbMtx.RUnlock() + return s.onValidatedRoot +} + +// SetRelayCallback sets callback to pool and broadcast tx. +func (s *service) SetRelayCallback(cb RelayCallback) { + s.cbMtx.Lock() + defer s.cbMtx.Unlock() + s.onValidatedRoot = cb +} diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 7dfc49bb7..76726f5ff 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -1,10 +1,18 @@ package stateroot import ( + "errors" + "sync" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "go.uber.org/zap" ) type ( @@ -12,10 +20,28 @@ type ( Service interface { blockchainer.StateRoot OnPayload(p *payload.Extensible) error + AddSignature(height uint32, validatorIndex int32, sig []byte) error + GetConfig() config.StateRoot + SetRelayCallback(RelayCallback) } service struct { blockchainer.StateRoot + + MainCfg config.StateRoot + Network netmode.Magic + + log *zap.Logger + accMtx sync.RWMutex + myIndex byte + wallet *wallet.Wallet + acc *wallet.Account + + srMtx sync.Mutex + incompleteRoots map[uint32]*incompleteRoot + + cbMtx sync.RWMutex + onValidatedRoot RelayCallback } ) @@ -25,10 +51,36 @@ const ( ) // New returns new state root service instance using underlying module. -func New(mod blockchainer.StateRoot) (Service, error) { - return &service{ - StateRoot: mod, - }, nil +func New(cfg config.StateRoot, log *zap.Logger, mod blockchainer.StateRoot) (Service, error) { + s := &service{ + StateRoot: mod, + log: log, + incompleteRoots: make(map[uint32]*incompleteRoot), + } + + s.MainCfg = cfg + if cfg.Enabled { + var err error + w := cfg.UnlockWallet + if s.wallet, err = wallet.NewWalletFromFile(w.Path); err != nil { + return nil, err + } + + haveAccount := false + for _, acc := range s.wallet.Accounts { + if err := acc.Decrypt(w.Password); err == nil { + haveAccount = true + break + } + } + if !haveAccount { + return nil, errors.New("no wallet account could be unlocked") + } + + s.SetUpdateValidatorsCallback(s.updateValidators) + s.SetSignAndSendCallback(s.signAndSend) + } + return s, nil } // OnPayload implements Service interface. @@ -46,6 +98,26 @@ func (s *service) OnPayload(ep *payload.Extensible) error { return nil } return s.AddStateRoot(sr) + case VoteT: + v := m.Payload.(*Vote) + return s.AddSignature(v.Height, v.ValidatorIndex, v.Signature) } return nil } + +func (s *service) updateValidators(pubs keys.PublicKeys) { + s.accMtx.Lock() + defer s.accMtx.Unlock() + + s.acc = nil + for i := range pubs { + if acc := s.wallet.GetAccount(pubs[i].GetScriptHash()); acc != nil { + err := acc.Decrypt(s.MainCfg.UnlockWallet.Password) + if err == nil { + s.acc = acc + s.myIndex = byte(i) + break + } + } + } +} diff --git a/pkg/services/stateroot/signature.go b/pkg/services/stateroot/signature.go new file mode 100644 index 000000000..b579f91fe --- /dev/null +++ b/pkg/services/stateroot/signature.go @@ -0,0 +1,88 @@ +package stateroot + +import ( + "sync" + + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/smartcontract" + "github.com/nspcc-dev/neo-go/pkg/vm/emit" +) + +type ( + incompleteRoot struct { + sync.RWMutex + // isSent is true state root was already broadcasted. + isSent bool + // request is oracle request. + root *state.MPTRoot + // sigs contains signature from every oracle node. + sigs map[string]*rootSig + } + + rootSig struct { + // pub is cached public key. + pub *keys.PublicKey + // ok is true if signature was verified. + ok bool + // sig is state root signature. + sig []byte + } +) + +func newIncompleteRoot() *incompleteRoot { + return &incompleteRoot{ + sigs: make(map[string]*rootSig), + } +} + +func (r *incompleteRoot) reverify() { + txHash := r.root.GetSignedHash() + for _, sig := range r.sigs { + if !sig.ok { + sig.ok = sig.pub.Verify(sig.sig, txHash.BytesBE()) + } + } +} + +func (r *incompleteRoot) addSignature(pub *keys.PublicKey, sig []byte) { + r.sigs[string(pub.Bytes())] = &rootSig{ + pub: pub, + ok: r.root != nil, + sig: sig, + } +} + +// finalize checks is either main or backup tx has sufficient number of signatures and returns +// tx and bool value indicating if it is ready to be broadcasted. +func (r *incompleteRoot) finalize(stateValidators keys.PublicKeys) (*state.MPTRoot, bool) { + if r.root == nil { + return nil, false + } + + m := smartcontract.GetDefaultHonestNodeCount(len(stateValidators)) + sigs := make([][]byte, 0, m) + for _, pub := range stateValidators { + sig, ok := r.sigs[string(pub.Bytes())] + if ok && sig.ok { + sigs = append(sigs, sig.sig) + if len(sigs) == m { + break + } + } + } + if len(sigs) != m { + return nil, false + } + + w := io.NewBufBinWriter() + for i := range sigs { + emit.Bytes(w.BinWriter, sigs[i]) + } + r.root.Witness = &transaction.Witness{ + InvocationScript: w.Bytes(), + } + return r.root, true +} diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go new file mode 100644 index 000000000..4413c4676 --- /dev/null +++ b/pkg/services/stateroot/validators.go @@ -0,0 +1,55 @@ +package stateroot + +import ( + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/wallet" +) + +func (s *service) signAndSend(r *state.MPTRoot) error { + if !s.MainCfg.Enabled { + return nil + } + + acc := s.getAccount() + if acc == nil { + return nil + } + + sig := acc.PrivateKey().SignHash(r.GetSignedHash()) + incRoot := s.getIncompleteRoot(r.Index) + incRoot.root = r + incRoot.addSignature(acc.PrivateKey().PublicKey(), sig) + incRoot.reverify() + + s.accMtx.RLock() + myIndex := s.myIndex + s.accMtx.RUnlock() + msg := NewMessage(VoteT, &Vote{ + ValidatorIndex: int32(myIndex), + Height: r.Index, + Signature: sig, + }) + + w := io.NewBufBinWriter() + msg.EncodeBinary(w.BinWriter) + if w.Err != nil { + return w.Err + } + s.getRelayCallback()(&payload.Extensible{ + Network: s.Network, + ValidBlockStart: r.Index, + ValidBlockEnd: r.Index + transaction.MaxValidUntilBlockIncrement, + Sender: s.getAccount().PrivateKey().GetScriptHash(), + Data: w.Bytes(), + }) + return nil +} + +func (s *service) getAccount() *wallet.Account { + s.accMtx.RLock() + defer s.accMtx.RUnlock() + return s.acc +} diff --git a/pkg/services/stateroot/vote.go b/pkg/services/stateroot/vote.go new file mode 100644 index 000000000..ad0bd9316 --- /dev/null +++ b/pkg/services/stateroot/vote.go @@ -0,0 +1,27 @@ +package stateroot + +import ( + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/io" +) + +// Vote represents vote message. +type Vote struct { + ValidatorIndex int32 + Height uint32 + Signature []byte +} + +// EncodeBinary implements io.Serializable interface. +func (p *Vote) EncodeBinary(w *io.BinWriter) { + w.WriteU32LE(uint32(p.ValidatorIndex)) + w.WriteU32LE(p.Height) + w.WriteVarBytes(p.Signature) +} + +// DecodeBinary implements io.Serializable interface. +func (p *Vote) DecodeBinary(r *io.BinReader) { + p.ValidatorIndex = int32(r.ReadU32LE()) + p.Height = r.ReadU32LE() + p.Signature = r.ReadVarBytes(keys.SignatureLen) +} From 2f3abf95a2ff02bafa29632d15a556fcb9c6f8cc Mon Sep 17 00:00:00 2001 From: Evgeniy Stratonikov Date: Wed, 3 Mar 2021 12:37:06 +0300 Subject: [PATCH 6/6] stateroot: broadcast state on new blocks --- pkg/core/blockchainer/state_root.go | 3 +-- pkg/core/stateroot/callbacks.go | 10 +--------- pkg/core/stateroot/module.go | 3 +-- pkg/core/stateroot/store.go | 3 --- pkg/core/stateroot/validators.go | 2 +- pkg/core/stateroot_test.go | 26 +++++++++++++++++--------- pkg/network/server.go | 8 +++++++- pkg/services/stateroot/service.go | 28 +++++++++++++++++++--------- pkg/services/stateroot/validators.go | 28 ++++++++++++++++++++++++++++ 9 files changed, 75 insertions(+), 36 deletions(-) diff --git a/pkg/core/blockchainer/state_root.go b/pkg/core/blockchainer/state_root.go index 556427f84..979e15963 100644 --- a/pkg/core/blockchainer/state_root.go +++ b/pkg/core/blockchainer/state_root.go @@ -14,7 +14,6 @@ type StateRoot interface { GetStateProof(root util.Uint256, key []byte) ([][]byte, error) GetStateRoot(height uint32) (*state.MPTRoot, error) GetStateValidators(height uint32) keys.PublicKeys - SetSignAndSendCallback(func(*state.MPTRoot) error) - SetUpdateValidatorsCallback(func(keys.PublicKeys)) + SetUpdateValidatorsCallback(func(uint32, keys.PublicKeys)) UpdateStateValidators(height uint32, pubs keys.PublicKeys) } diff --git a/pkg/core/stateroot/callbacks.go b/pkg/core/stateroot/callbacks.go index 19ea0a17e..95fce212a 100644 --- a/pkg/core/stateroot/callbacks.go +++ b/pkg/core/stateroot/callbacks.go @@ -1,19 +1,11 @@ package stateroot import ( - "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" ) -// SetSignAndSendCb sets callback for sending signed root. -func (s *Module) SetSignAndSendCallback(f func(*state.MPTRoot) error) { - s.mtx.Lock() - defer s.mtx.Unlock() - s.signAndSendCb = f -} - // SetUpdateValidatorsCallback sets callback for sending signed root. -func (s *Module) SetUpdateValidatorsCallback(f func(keys.PublicKeys)) { +func (s *Module) SetUpdateValidatorsCallback(f func(uint32, keys.PublicKeys)) { s.mtx.Lock() defer s.mtx.Unlock() s.updateValidatorsCb = f diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index 9b7c3bf57..2d63b89b5 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -31,8 +31,7 @@ type ( mtx sync.RWMutex keys []keyCache - updateValidatorsCb func(publicKeys keys.PublicKeys) - signAndSendCb func(*state.MPTRoot) error + updateValidatorsCb func(height uint32, publicKeys keys.PublicKeys) } keyCache struct { diff --git a/pkg/core/stateroot/store.go b/pkg/core/stateroot/store.go index 9efbd4686..b2ab7d210 100644 --- a/pkg/core/stateroot/store.go +++ b/pkg/core/stateroot/store.go @@ -31,9 +31,6 @@ func (s *Module) addLocalStateRoot(sr *state.MPTRoot) error { s.validatedHeight.Store(sr.Index) updateStateHeightMetric(sr.Index) } - if s.signAndSendCb != nil { - return s.signAndSendCb(sr) - } return nil } diff --git a/pkg/core/stateroot/validators.go b/pkg/core/stateroot/validators.go index e07a447e2..545c52ded 100644 --- a/pkg/core/stateroot/validators.go +++ b/pkg/core/stateroot/validators.go @@ -13,7 +13,7 @@ func (s *Module) UpdateStateValidators(height uint32, pubs keys.PublicKeys) { s.mtx.Lock() if s.updateValidatorsCb != nil { - s.updateValidatorsCb(pubs) + s.updateValidatorsCb(height, pubs) } kc := s.getKeyCacheForHeight(height) if kc.validatorsHash != h { diff --git a/pkg/core/stateroot_test.go b/pkg/core/stateroot_test.go index 814e48498..9b9d2e0b6 100644 --- a/pkg/core/stateroot_test.go +++ b/pkg/core/stateroot_test.go @@ -6,6 +6,7 @@ import ( "path" "sort" "testing" + "time" "github.com/nspcc-dev/neo-go/internal/testserdes" "github.com/nspcc-dev/neo-go/pkg/config" @@ -23,6 +24,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap/zaptest" ) @@ -80,7 +82,7 @@ func TestStateRoot(t *testing.T) { defer os.RemoveAll(tmpDir) w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass") cfg := createStateRootConfig(w.Path(), "pass") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc) require.NoError(t, err) require.EqualValues(t, 0, srv.CurrentValidatedHeight()) r, err := srv.GetStateRoot(bc.BlockHeight()) @@ -150,7 +152,7 @@ func TestStateRootInitNonZeroHeight(t *testing.T) { defer os.RemoveAll(tmpDir) w := createAndWriteWallet(t, accs[0], path.Join(tmpDir, "w"), "pass") cfg := createStateRootConfig(w.Path(), "pass") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc) require.NoError(t, err) r, err := srv.GetStateRoot(2) require.NoError(t, err) @@ -196,27 +198,33 @@ func TestStateRootFull(t *testing.T) { h, pubs, accs := newMajorityMultisigWithGAS(t, 2) w := createAndWriteWallet(t, accs[1], path.Join(tmpDir, "wallet2"), "two") cfg := createStateRootConfig(w.Path(), "two") - srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc.GetStateModule()) + srv, err := stateroot.New(cfg, zaptest.NewLogger(t), bc) require.NoError(t, err) + srv.Run() + t.Cleanup(srv.Shutdown) - var lastValidated *payload.Extensible + var lastValidated atomic.Value + var lastHeight atomic.Uint32 srv.SetRelayCallback(func(ep *payload.Extensible) { - lastValidated = ep + lastHeight.Store(ep.ValidBlockStart) + lastValidated.Store(ep) }) bc.setNodesByRole(t, true, native.RoleStateValidator, pubs) transferTokenFromMultisigAccount(t, bc, h, bc.contracts.GAS.Hash, 1_0000_0000) - checkVoteBroadcasted(t, bc, lastValidated, 2, 1) + require.Eventually(t, func() bool { return lastHeight.Load() == 2 }, time.Second, time.Millisecond) + checkVoteBroadcasted(t, bc, lastValidated.Load().(*payload.Extensible), 2, 1) _, err = persistBlock(bc) - checkVoteBroadcasted(t, bc, lastValidated, 3, 1) + require.Eventually(t, func() bool { return lastHeight.Load() == 3 }, time.Second, time.Millisecond) + checkVoteBroadcasted(t, bc, lastValidated.Load().(*payload.Extensible), 3, 1) r, err := srv.GetStateRoot(2) require.NoError(t, err) require.NoError(t, srv.AddSignature(2, 0, accs[0].PrivateKey().SignHash(r.GetSignedHash()))) - require.NotNil(t, lastValidated) + require.NotNil(t, lastValidated.Load().(*payload.Extensible)) msg := new(stateroot.Message) - require.NoError(t, testserdes.DecodeBinary(lastValidated.Data, msg)) + require.NoError(t, testserdes.DecodeBinary(lastValidated.Load().(*payload.Extensible).Data, msg)) require.Equal(t, stateroot.RootT, msg.Type) actual := msg.Payload.(*state.MPTRoot) diff --git a/pkg/network/server.go b/pkg/network/server.go index e0e2dea65..39bfb2cd8 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -177,7 +177,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled") } - sr, err := stateroot.New(config.StateRootCfg, s.log, chain.GetStateModule()) + sr, err := stateroot.New(config.StateRootCfg, s.log, chain) if err != nil { return nil, fmt.Errorf("can't initialize StateRoot service: %w", err) } @@ -277,6 +277,9 @@ func (s *Server) Start(errChan chan error) { s.notaryRequestPool.RunSubscriptions() go s.notaryModule.Run() } + if s.StateRootCfg.Enabled { + s.stateRoot.Run() + } go s.relayBlocksLoop() go s.bQueue.run() go s.transport.Accept() @@ -296,6 +299,9 @@ func (s *Server) Shutdown() { p.Disconnect(errServerShutdown) } s.bQueue.discard() + if s.StateRootCfg.Enabled { + s.stateRoot.Shutdown() + } if s.oracle != nil { s.oracle.Shutdown() } diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 76726f5ff..0c5b17f40 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -23,25 +24,31 @@ type ( AddSignature(height uint32, validatorIndex int32, sig []byte) error GetConfig() config.StateRoot SetRelayCallback(RelayCallback) + Run() + Shutdown() } service struct { blockchainer.StateRoot + chain blockchainer.Blockchainer MainCfg config.StateRoot Network netmode.Magic - log *zap.Logger - accMtx sync.RWMutex - myIndex byte - wallet *wallet.Wallet - acc *wallet.Account + log *zap.Logger + accMtx sync.RWMutex + accHeight uint32 + myIndex byte + wallet *wallet.Wallet + acc *wallet.Account srMtx sync.Mutex incompleteRoots map[uint32]*incompleteRoot cbMtx sync.RWMutex onValidatedRoot RelayCallback + blockCh chan *block.Block + done chan struct{} } ) @@ -51,11 +58,14 @@ const ( ) // New returns new state root service instance using underlying module. -func New(cfg config.StateRoot, log *zap.Logger, mod blockchainer.StateRoot) (Service, error) { +func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer) (Service, error) { s := &service{ - StateRoot: mod, + StateRoot: bc.GetStateModule(), + chain: bc, log: log, incompleteRoots: make(map[uint32]*incompleteRoot), + blockCh: make(chan *block.Block), + done: make(chan struct{}), } s.MainCfg = cfg @@ -78,7 +88,6 @@ func New(cfg config.StateRoot, log *zap.Logger, mod blockchainer.StateRoot) (Ser } s.SetUpdateValidatorsCallback(s.updateValidators) - s.SetSignAndSendCallback(s.signAndSend) } return s, nil } @@ -105,7 +114,7 @@ func (s *service) OnPayload(ep *payload.Extensible) error { return nil } -func (s *service) updateValidators(pubs keys.PublicKeys) { +func (s *service) updateValidators(height uint32, pubs keys.PublicKeys) { s.accMtx.Lock() defer s.accMtx.Unlock() @@ -115,6 +124,7 @@ func (s *service) updateValidators(pubs keys.PublicKeys) { err := acc.Decrypt(s.MainCfg.UnlockWallet.Password) if err == nil { s.acc = acc + s.accHeight = height s.myIndex = byte(i) break } diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 4413c4676..b3a98378e 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -6,8 +6,36 @@ import ( "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/wallet" + "go.uber.org/zap" ) +// Run runs service instance in a separate goroutine. +func (s *service) Run() { + s.chain.SubscribeForBlocks(s.blockCh) + go s.run() +} + +func (s *service) run() { + for { + select { + case b := <-s.blockCh: + r, err := s.GetStateRoot(b.Index) + if err != nil { + s.log.Error("can't get state root for new block", zap.Error(err)) + } else if err := s.signAndSend(r); err != nil { + s.log.Error("can't sign or send state root", zap.Error(err)) + } + case <-s.done: + return + } + } +} + +// Shutdown stops the service. +func (s *service) Shutdown() { + close(s.done) +} + func (s *service) signAndSend(r *state.MPTRoot) error { if !s.MainCfg.Enabled { return nil