package core import ( "errors" "fmt" "math/big" "sync" "sync/atomic" "time" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/interop" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/bigint" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm" "github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "go.uber.org/zap" ) // Tuning parameters. const ( headerBatchCount = 2000 version = "0.1.0" defaultMemPoolSize = 50000 verificationGasLimit = 100000000 // 1 GAS ) var ( // ErrAlreadyExists is returned when trying to add some already existing // transaction into the pool (not specifying whether it exists in the // chain or mempool). ErrAlreadyExists = errors.New("already exists") // ErrOOM is returned when adding transaction to the memory pool because // it reached its full capacity. ErrOOM = errors.New("no space left in the memory pool") // ErrPolicy is returned on attempt to add transaction that doesn't // comply with node's configured policy into the mempool. ErrPolicy = errors.New("not allowed by policy") // ErrInvalidBlockIndex is returned when trying to add block with index // other than expected height of the blockchain. ErrInvalidBlockIndex error = errors.New("invalid block index") ) var ( genAmount = []int{6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} decrementInterval = 2000000 persistInterval = 1 * time.Second ) // Blockchain represents the blockchain. It maintans internal state representing // the state of the ledger that can be accessed in various ways and changed by // adding new blocks or headers. type Blockchain struct { config config.ProtocolConfiguration // The only way chain state changes is by adding blocks, so we can't // allow concurrent block additions. It differs from the next lock in // that it's only for AddBlock method itself, the chain state is // protected by the lock below, but holding it during all of AddBlock // is too expensive (because the state only changes when persisting // change cache). addLock sync.Mutex // This lock ensures blockchain immutability for operations that need // that while performing their tasks. It's mostly used as a read lock // with the only writer being the block addition logic. lock sync.RWMutex // Data access object for CRUD operations around storage. dao *dao.Simple // Current index/height of the highest block. // Read access should always be called by BlockHeight(). // Write access should only happen in storeBlock(). blockHeight uint32 // Current top Block wrapped in an atomic.Value for safe access. topBlock atomic.Value // Current persisted block count. persistedHeight uint32 // Number of headers stored in the chain file. storedHeaderCount uint32 generationAmount []int decrementInterval int // All operations on headerList must be called from an // headersOp to be routine safe. headerList *HeaderHashList // Only for operating on the headerList. headersOp chan headersOpFunc headersOpDone chan struct{} // Stop synchronization mechanisms. stopCh chan struct{} runToExitCh chan struct{} memPool mempool.Pool // This lock protects concurrent access to keyCache. keyCacheLock sync.RWMutex // cache for block verification keys. keyCache map[util.Uint160]map[string]*keys.PublicKey sbCommittee keys.PublicKeys log *zap.Logger lastBatch *storage.MemBatch contracts native.Contracts // Notification subsystem. events chan bcEvent subCh chan interface{} unsubCh chan interface{} } // bcEvent is an internal event generated by the Blockchain and then // broadcasted to other parties. It joins the new block and associated // invocation logs, all the other events visible from outside can be produced // from this combination. type bcEvent struct { block *block.Block appExecResults []*state.AppExecResult } type headersOpFunc func(headerList *HeaderHashList) // NewBlockchain returns a new blockchain object the will use the // given Store as its underlying storage. For it to work correctly you need // to spawn a goroutine for its Run method after this initialization. func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.Logger) (*Blockchain, error) { if log == nil { return nil, errors.New("empty logger") } if cfg.MemPoolSize <= 0 { cfg.MemPoolSize = defaultMemPoolSize log.Info("mempool size is not set or wrong, setting default value", zap.Int("MemPoolSize", cfg.MemPoolSize)) } committee, err := committeeFromConfig(cfg) if err != nil { return nil, err } bc := &Blockchain{ config: cfg, dao: dao.NewSimple(s, cfg.Magic), headersOp: make(chan headersOpFunc), headersOpDone: make(chan struct{}), stopCh: make(chan struct{}), runToExitCh: make(chan struct{}), memPool: mempool.NewMemPool(cfg.MemPoolSize), keyCache: make(map[util.Uint160]map[string]*keys.PublicKey), sbCommittee: committee, log: log, events: make(chan bcEvent), subCh: make(chan interface{}), unsubCh: make(chan interface{}), generationAmount: genAmount, decrementInterval: decrementInterval, contracts: *native.NewContracts(), } if err := bc.init(); err != nil { return nil, err } return bc, nil } func (bc *Blockchain) init() error { // If we could not find the version in the Store, we know that there is nothing stored. ver, err := bc.dao.GetVersion() if err != nil { bc.log.Info("no storage version found! creating genesis block") if err = bc.dao.PutVersion(version); err != nil { return err } genesisBlock, err := createGenesisBlock(bc.config) if err != nil { return err } bc.headerList = NewHeaderHashList(genesisBlock.Hash()) err = bc.dao.PutCurrentHeader(hashAndIndexToBytes(genesisBlock.Hash(), genesisBlock.Index)) if err != nil { return err } return bc.storeBlock(genesisBlock) } if ver != version { return fmt.Errorf("storage version mismatch betweeen %s and %s", version, ver) } // At this point there was no version found in the storage which // implies a creating fresh storage with the version specified // and the genesis block as first block. bc.log.Info("restoring blockchain", zap.String("version", version)) bHeight, err := bc.dao.GetCurrentBlockHeight() if err != nil { return err } bc.blockHeight = bHeight bc.persistedHeight = bHeight if err = bc.dao.InitMPT(bHeight); err != nil { return fmt.Errorf("can't init MPT at height %d: %w", bHeight, err) } hashes, err := bc.dao.GetHeaderHashes() if err != nil { return err } bc.headerList = NewHeaderHashList(hashes...) bc.storedHeaderCount = uint32(len(hashes)) currHeaderHeight, currHeaderHash, err := bc.dao.GetCurrentHeaderHeight() if err != nil { return err } if bc.storedHeaderCount == 0 && currHeaderHeight == 0 { bc.headerList.Add(currHeaderHash) } // There is a high chance that the Node is stopped before the next // batch of 2000 headers was stored. Via the currentHeaders stored we can sync // that with stored blocks. if currHeaderHeight >= bc.storedHeaderCount { hash := currHeaderHash var targetHash util.Uint256 if bc.headerList.Len() > 0 { targetHash = bc.headerList.Get(bc.headerList.Len() - 1) } else { genesisBlock, err := createGenesisBlock(bc.config) if err != nil { return err } targetHash = genesisBlock.Hash() bc.headerList.Add(targetHash) } headers := make([]*block.Header, 0) for hash != targetHash { header, err := bc.GetHeader(hash) if err != nil { return fmt.Errorf("could not get header %s: %w", hash, err) } headers = append(headers, header) hash = header.PrevHash } headerSliceReverse(headers) for _, h := range headers { if !h.Verify() { return fmt.Errorf("bad header %d/%s in the storage", h.Index, h.Hash()) } bc.headerList.Add(h.Hash()) } } return nil } // Run runs chain loop, it needs to be run as goroutine and executing it is // critical for correct Blockchain operation. func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() if err := bc.persist(); err != nil { bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.Store.Close(); err != nil { bc.log.Warn("failed to close db", zap.Error(err)) } close(bc.runToExitCh) }() go bc.notificationDispatcher() for { select { case <-bc.stopCh: return case op := <-bc.headersOp: op(bc.headerList) bc.headersOpDone <- struct{}{} case <-persistTimer.C: go func() { err := bc.persist() if err != nil { bc.log.Warn("failed to persist blockchain", zap.Error(err)) } persistTimer.Reset(persistInterval) }() } } } // notificationDispatcher manages subscription to events and broadcasts new events. func (bc *Blockchain) notificationDispatcher() { var ( // These are just sets of subscribers, though modelled as maps // for ease of management (not a lot of subscriptions is really // expected, but maps are convenient for adding/deleting elements). blockFeed = make(map[chan<- *block.Block]bool) txFeed = make(map[chan<- *transaction.Transaction]bool) notificationFeed = make(map[chan<- *state.NotificationEvent]bool) executionFeed = make(map[chan<- *state.AppExecResult]bool) ) for { select { case <-bc.stopCh: return case sub := <-bc.subCh: switch ch := sub.(type) { case chan<- *block.Block: blockFeed[ch] = true case chan<- *transaction.Transaction: txFeed[ch] = true case chan<- *state.NotificationEvent: notificationFeed[ch] = true case chan<- *state.AppExecResult: executionFeed[ch] = true default: panic(fmt.Sprintf("bad subscription: %T", sub)) } case unsub := <-bc.unsubCh: switch ch := unsub.(type) { case chan<- *block.Block: delete(blockFeed, ch) case chan<- *transaction.Transaction: delete(txFeed, ch) case chan<- *state.NotificationEvent: delete(notificationFeed, ch) case chan<- *state.AppExecResult: delete(executionFeed, ch) default: panic(fmt.Sprintf("bad unsubscription: %T", unsub)) } case event := <-bc.events: // We don't want to waste time looping through transactions when there are no // subscribers. if len(txFeed) != 0 || len(notificationFeed) != 0 || len(executionFeed) != 0 { aer := event.appExecResults[0] if !aer.TxHash.Equals(event.block.Hash()) { panic("inconsistent application execution results") } for ch := range executionFeed { ch <- aer } for i := range aer.Events { for ch := range notificationFeed { ch <- &aer.Events[i] } } aerIdx := 1 for _, tx := range event.block.Transactions { aer := event.appExecResults[aerIdx] if !aer.TxHash.Equals(tx.Hash()) { panic("inconsistent application execution results") } aerIdx++ for ch := range executionFeed { ch <- aer } if aer.VMState == vm.HaltState { for i := range aer.Events { for ch := range notificationFeed { ch <- &aer.Events[i] } } } for ch := range txFeed { ch <- tx } } } for ch := range blockFeed { ch <- event.block } } } } // Close stops Blockchain's internal loop, syncs changes to persistent storage // and closes it. The Blockchain is no longer functional after the call to Close. func (bc *Blockchain) Close() { // If there is a block addition in progress, wait for it to finish and // don't allow new ones. bc.addLock.Lock() close(bc.stopCh) <-bc.runToExitCh bc.addLock.Unlock() } // AddBlock accepts successive block for the Blockchain, verifies it and // stores internally. Eventually it will be persisted to the backing storage. func (bc *Blockchain) AddBlock(block *block.Block) error { bc.addLock.Lock() defer bc.addLock.Unlock() expectedHeight := bc.BlockHeight() + 1 if expectedHeight != block.Index { return fmt.Errorf("expected %d, got %d: %w", expectedHeight, block.Index, ErrInvalidBlockIndex) } headerLen := bc.headerListLen() if int(block.Index) == headerLen { err := bc.addHeaders(bc.config.VerifyBlocks, block.Header()) if err != nil { return err } } if bc.config.VerifyBlocks { err := block.Verify() if err != nil { return fmt.Errorf("block %s is invalid: %w", block.Hash().StringLE(), err) } if bc.config.VerifyTransactions { for _, tx := range block.Transactions { err := bc.VerifyTx(tx, block) if err != nil { return fmt.Errorf("transaction %s failed to verify: %w", tx.Hash().StringLE(), err) } } } } return bc.storeBlock(block) } // AddHeaders processes the given headers and add them to the // HeaderHashList. It expects headers to be sorted by index. func (bc *Blockchain) AddHeaders(headers ...*block.Header) error { return bc.addHeaders(bc.config.VerifyBlocks, headers...) } // addHeaders is an internal implementation of AddHeaders (`verify` parameter // tells it to verify or not verify given headers). func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) (err error) { var ( start = time.Now() batch = bc.dao.Store.Batch() ) if len(headers) > 0 { var i int curHeight := bc.HeaderHeight() for i = range headers { if headers[i].Index > curHeight { break } } headers = headers[i:] } if len(headers) == 0 { return nil } else if verify { // Verify that the chain of the headers is consistent. var lastHeader *block.Header if lastHeader, err = bc.GetHeader(headers[0].PrevHash); err != nil { return fmt.Errorf("previous header was not found: %w", err) } for _, h := range headers { if err = bc.verifyHeader(h, lastHeader); err != nil { return } lastHeader = h } } bc.headersOp <- func(headerList *HeaderHashList) { oldlen := headerList.Len() for _, h := range headers { if int(h.Index-1) >= headerList.Len() { err = fmt.Errorf( "height of received header %d is higher then the current header %d", h.Index, headerList.Len(), ) return } if int(h.Index) < headerList.Len() { continue } if !h.Verify() { err = fmt.Errorf("header %v is invalid", h) return } if err = bc.processHeader(h, batch, headerList); err != nil { return } } if oldlen != headerList.Len() { updateHeaderHeightMetric(headerList.Len() - 1) if err = bc.dao.Store.PutBatch(batch); err != nil { return } bc.log.Debug("done processing headers", zap.Int("headerIndex", headerList.Len()-1), zap.Uint32("blockHeight", bc.BlockHeight()), zap.Duration("took", time.Since(start))) } } <-bc.headersOpDone return err } // processHeader processes the given header. Note that this is only thread safe // if executed in headers operation. func (bc *Blockchain) processHeader(h *block.Header, batch storage.Batch, headerList *HeaderHashList) error { headerList.Add(h.Hash()) buf := io.NewBufBinWriter() for int(h.Index)-headerBatchCount >= int(bc.storedHeaderCount) { if err := headerList.Write(buf.BinWriter, int(bc.storedHeaderCount), headerBatchCount); err != nil { return err } key := storage.AppendPrefixInt(storage.IXHeaderHashList, int(bc.storedHeaderCount)) batch.Put(key, buf.Bytes()) bc.storedHeaderCount += headerBatchCount buf.Reset() } buf.Reset() h.EncodeBinary(buf.BinWriter) if buf.Err != nil { return buf.Err } key := storage.AppendPrefix(storage.DataBlock, h.Hash().BytesLE()) batch.Put(key, buf.Bytes()) batch.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndexToBytes(h.Hash(), h.Index)) return nil } // GetStateRoot returns state root for a given height. func (bc *Blockchain) GetStateRoot(height uint32) (*state.MPTRootState, error) { return bc.dao.GetStateRoot(height) } // storeBlock performs chain update using the block given, it executes all // transactions with all appropriate side-effects and updates Blockchain state. // This is the only way to change Blockchain state. func (bc *Blockchain) storeBlock(block *block.Block) error { cache := dao.NewCached(bc.dao) appExecResults := make([]*state.AppExecResult, 0, 1+len(block.Transactions)) if err := cache.StoreAsBlock(block); err != nil { return err } if err := cache.StoreAsCurrentBlock(block); err != nil { return err } if block.Index > 0 { systemInterop := bc.newInteropContext(trigger.System, cache, block, nil) v := systemInterop.SpawnVM() v.LoadScriptWithFlags(bc.contracts.GetPersistScript(), smartcontract.AllowModifyStates|smartcontract.AllowCall) v.SetPriceGetter(getPrice) if err := v.Run(); err != nil { return fmt.Errorf("onPersist run failed: %w", err) } else if _, err := systemInterop.DAO.Persist(); err != nil { return fmt.Errorf("can't save onPersist changes: %w", err) } for i := range systemInterop.Notifications { bc.handleNotification(&systemInterop.Notifications[i], cache, block, block.Hash()) } aer := &state.AppExecResult{ TxHash: block.Hash(), // application logs can be retrieved by block hash Trigger: trigger.System, VMState: v.State(), GasConsumed: v.GasConsumed(), Stack: v.Estack().ToArray(), Events: systemInterop.Notifications, } appExecResults = append(appExecResults, aer) err := cache.PutAppExecResult(aer) if err != nil { return fmt.Errorf("failed to store onPersist exec result: %w", err) } } for _, tx := range block.Transactions { if err := cache.StoreAsTransaction(tx, block.Index); err != nil { return err } systemInterop := bc.newInteropContext(trigger.Application, cache, block, tx) v := systemInterop.SpawnVM() v.LoadScriptWithFlags(tx.Script, smartcontract.All) v.SetPriceGetter(getPrice) v.GasLimit = tx.SystemFee err := v.Run() if !v.HasFailed() { _, err := systemInterop.DAO.Persist() if err != nil { return fmt.Errorf("failed to persist invocation results: %w", err) } for i := range systemInterop.Notifications { bc.handleNotification(&systemInterop.Notifications[i], cache, block, tx.Hash()) } } else { bc.log.Warn("contract invocation failed", zap.String("tx", tx.Hash().StringLE()), zap.Uint32("block", block.Index), zap.Error(err)) } aer := &state.AppExecResult{ TxHash: tx.Hash(), Trigger: trigger.Application, VMState: v.State(), GasConsumed: v.GasConsumed(), Stack: v.Estack().ToArray(), Events: systemInterop.Notifications, } appExecResults = append(appExecResults, aer) err = cache.PutAppExecResult(aer) if err != nil { return fmt.Errorf("failed to store tx exec result: %w", err) } } root := bc.dao.MPT.StateRoot() var prevHash util.Uint256 if block.Index > 0 { prev, err := bc.dao.GetStateRoot(block.Index - 1) if err != nil { return fmt.Errorf("can't get previous state root: %w", err) } prevHash = hash.DoubleSha256(prev.GetSignedPart()) } err := bc.AddStateRoot(&state.MPTRoot{ MPTRootBase: state.MPTRootBase{ Index: block.Index, PrevHash: prevHash, Root: root, }, }) if err != nil { return err } if bc.config.SaveStorageBatch { bc.lastBatch = cache.DAO.GetBatch() } bc.lock.Lock() _, err = cache.Persist() if err != nil { bc.lock.Unlock() return err } bc.contracts.Policy.OnPersistEnd(bc.dao) bc.dao.MPT.Flush() // Every persist cycle we also compact our in-memory MPT. persistedHeight := atomic.LoadUint32(&bc.persistedHeight) if persistedHeight == block.Index-1 { // 10 is good and roughly estimated to fit remaining trie into 1M of memory. bc.dao.MPT.Collapse(10) } bc.topBlock.Store(block) atomic.StoreUint32(&bc.blockHeight, block.Index) bc.memPool.RemoveStale(bc.isTxStillRelevant, bc) bc.lock.Unlock() updateBlockHeightMetric(block.Index) // Genesis block is stored when Blockchain is not yet running, so there // is no one to read this event. And it doesn't make much sense as event // anyway. if block.Index != 0 { bc.events <- bcEvent{block, appExecResults} } return nil } func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d *dao.Cached, b *block.Block, h util.Uint256) { if note.Name != "transfer" && note.Name != "Transfer" { return } arr, ok := note.Item.Value().([]stackitem.Item) if !ok || len(arr) != 3 { return } var from []byte fromValue := arr[0].Value() // we don't have `from` set when we are minting tokens if fromValue != nil { from, ok = fromValue.([]byte) if !ok { return } } var to []byte toValue := arr[1].Value() // we don't have `to` set when we are burning tokens if toValue != nil { to, ok = toValue.([]byte) if !ok { return } } amount, ok := arr[2].Value().(*big.Int) if !ok { bs, ok := arr[2].Value().([]byte) if !ok { return } amount = bigint.FromBytes(bs) } bc.processNEP5Transfer(d, h, b, note.ScriptHash, from, to, amount) } func parseUint160(addr []byte) util.Uint160 { if u, err := util.Uint160DecodeBytesBE(addr); err == nil { return u } return util.Uint160{} } func (bc *Blockchain) processNEP5Transfer(cache *dao.Cached, h util.Uint256, b *block.Block, sc util.Uint160, from, to []byte, amount *big.Int) { toAddr := parseUint160(to) fromAddr := parseUint160(from) var id int32 nativeContract := bc.contracts.ByHash(sc) if nativeContract != nil { id = nativeContract.Metadata().ContractID } else { assetContract := bc.GetContractState(sc) if assetContract == nil { return } id = assetContract.ID } transfer := &state.NEP5Transfer{ Asset: id, From: fromAddr, To: toAddr, Block: b.Index, Timestamp: b.Timestamp, Tx: h, } if !fromAddr.Equals(util.Uint160{}) { balances, err := cache.GetNEP5Balances(fromAddr) if err != nil { return } bs := balances.Trackers[id] bs.Balance = *new(big.Int).Sub(&bs.Balance, amount) bs.LastUpdatedBlock = b.Index balances.Trackers[id] = bs transfer.Amount = *new(big.Int).Sub(&transfer.Amount, amount) isBig, err := cache.AppendNEP5Transfer(fromAddr, balances.NextTransferBatch, transfer) if err != nil { return } if isBig { balances.NextTransferBatch++ } if err := cache.PutNEP5Balances(fromAddr, balances); err != nil { return } } if !toAddr.Equals(util.Uint160{}) { balances, err := cache.GetNEP5Balances(toAddr) if err != nil { return } bs := balances.Trackers[id] bs.Balance = *new(big.Int).Add(&bs.Balance, amount) bs.LastUpdatedBlock = b.Index balances.Trackers[id] = bs transfer.Amount = *amount isBig, err := cache.AppendNEP5Transfer(toAddr, balances.NextTransferBatch, transfer) if err != nil { return } if isBig { balances.NextTransferBatch++ } if err := cache.PutNEP5Balances(toAddr, balances); err != nil { return } } } // ForEachNEP5Transfer executes f for each nep5 transfer in log. func (bc *Blockchain) ForEachNEP5Transfer(acc util.Uint160, f func(*state.NEP5Transfer) error) error { balances, err := bc.dao.GetNEP5Balances(acc) if err != nil { return nil } for i := uint32(0); i <= balances.NextTransferBatch; i++ { lg, err := bc.dao.GetNEP5TransferLog(acc, i) if err != nil { return nil } err = lg.ForEach(f) if err != nil { return err } } return nil } // GetNEP5Balances returns NEP5 balances for the acc. func (bc *Blockchain) GetNEP5Balances(acc util.Uint160) *state.NEP5Balances { bs, err := bc.dao.GetNEP5Balances(acc) if err != nil { return nil } return bs } // GetUtilityTokenBalance returns utility token (GAS) balance for the acc. func (bc *Blockchain) GetUtilityTokenBalance(acc util.Uint160) *big.Int { bs, err := bc.dao.GetNEP5Balances(acc) if err != nil { return big.NewInt(0) } balance := bs.Trackers[bc.contracts.GAS.ContractID].Balance return &balance } // GetGoverningTokenBalance returns governing token (NEO) balance and the height // of the last balance change for the account. func (bc *Blockchain) GetGoverningTokenBalance(acc util.Uint160) (*big.Int, uint32) { bs, err := bc.dao.GetNEP5Balances(acc) if err != nil { return big.NewInt(0), 0 } neo := bs.Trackers[bc.contracts.NEO.ContractID] return &neo.Balance, neo.LastUpdatedBlock } // LastBatch returns last persisted storage batch. func (bc *Blockchain) LastBatch() *storage.MemBatch { return bc.lastBatch } // persist flushes current in-memory Store contents to the persistent storage. func (bc *Blockchain) persist() error { var ( start = time.Now() persisted int err error ) persisted, err = bc.dao.Persist() if err != nil { return err } if persisted > 0 { bHeight, err := bc.dao.GetCurrentBlockHeight() if err != nil { return err } oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight) diff := bHeight - oldHeight storedHeaderHeight, _, err := bc.dao.GetCurrentHeaderHeight() if err != nil { return err } bc.log.Info("blockchain persist completed", zap.Uint32("persistedBlocks", diff), zap.Int("persistedKeys", persisted), zap.Uint32("headerHeight", storedHeaderHeight), zap.Uint32("blockHeight", bHeight), zap.Duration("took", time.Since(start))) // update monitoring metrics. updatePersistedHeightMetric(bHeight) } return nil } func (bc *Blockchain) headerListLen() (n int) { bc.headersOp <- func(headerList *HeaderHashList) { n = headerList.Len() } <-bc.headersOpDone return } // GetTransaction returns a TX and its height by the given hash. func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) { if tx, ok := bc.memPool.TryGetValue(hash); ok { return tx, 0, nil // the height is not actually defined for memPool transaction. Not sure if zero is a good number in this case. } return bc.dao.GetTransaction(hash) } // GetAppExecResult returns application execution result by the given // tx hash. func (bc *Blockchain) GetAppExecResult(hash util.Uint256) (*state.AppExecResult, error) { return bc.dao.GetAppExecResult(hash) } // GetStorageItem returns an item from storage. func (bc *Blockchain) GetStorageItem(id int32, key []byte) *state.StorageItem { return bc.dao.GetStorageItem(id, key) } // GetStorageItems returns all storage items for a given contract id. func (bc *Blockchain) GetStorageItems(id int32) (map[string]*state.StorageItem, error) { return bc.dao.GetStorageItems(id) } // GetBlock returns a Block by the given hash. func (bc *Blockchain) GetBlock(hash util.Uint256) (*block.Block, error) { topBlock := bc.topBlock.Load() if topBlock != nil { if tb, ok := topBlock.(*block.Block); ok && tb.Hash().Equals(hash) { return tb, nil } } block, err := bc.dao.GetBlock(hash) if err != nil { return nil, err } for _, tx := range block.Transactions { stx, _, err := bc.dao.GetTransaction(tx.Hash()) if err != nil { return nil, err } *tx = *stx } return block, nil } // GetHeader returns data block header identified with the given hash value. func (bc *Blockchain) GetHeader(hash util.Uint256) (*block.Header, error) { topBlock := bc.topBlock.Load() if topBlock != nil { if tb, ok := topBlock.(*block.Block); ok && tb.Hash().Equals(hash) { return tb.Header(), nil } } block, err := bc.dao.GetBlock(hash) if err != nil { return nil, err } return block.Header(), nil } // HasTransaction returns true if the blockchain contains he given // transaction hash. func (bc *Blockchain) HasTransaction(hash util.Uint256) bool { return bc.memPool.ContainsKey(hash) || bc.dao.HasTransaction(hash) } // HasBlock returns true if the blockchain contains the given // block hash. func (bc *Blockchain) HasBlock(hash util.Uint256) bool { if header, err := bc.GetHeader(hash); err == nil { return header.Index <= bc.BlockHeight() } return false } // CurrentBlockHash returns the highest processed block hash. func (bc *Blockchain) CurrentBlockHash() (hash util.Uint256) { bc.headersOp <- func(headerList *HeaderHashList) { hash = headerList.Get(int(bc.BlockHeight())) } <-bc.headersOpDone return } // CurrentHeaderHash returns the hash of the latest known header. func (bc *Blockchain) CurrentHeaderHash() (hash util.Uint256) { bc.headersOp <- func(headerList *HeaderHashList) { hash = headerList.Last() } <-bc.headersOpDone return } // GetHeaderHash returns the hash from the headerList by its // height/index. func (bc *Blockchain) GetHeaderHash(i int) (hash util.Uint256) { bc.headersOp <- func(headerList *HeaderHashList) { hash = headerList.Get(i) } <-bc.headersOpDone return } // BlockHeight returns the height/index of the highest block. func (bc *Blockchain) BlockHeight() uint32 { return atomic.LoadUint32(&bc.blockHeight) } // HeaderHeight returns the index/height of the highest header. func (bc *Blockchain) HeaderHeight() uint32 { return uint32(bc.headerListLen() - 1) } // GetContractState returns contract by its script hash. func (bc *Blockchain) GetContractState(hash util.Uint160) *state.Contract { contract, err := bc.dao.GetContractState(hash) if contract == nil && err != storage.ErrKeyNotFound { bc.log.Warn("failed to get contract state", zap.Error(err)) } return contract } // GetContractScriptHash returns contract script hash by its ID. func (bc *Blockchain) GetContractScriptHash(id int32) (util.Uint160, error) { return bc.dao.GetContractScriptHash(id) } // GetConfig returns the config stored in the blockchain. func (bc *Blockchain) GetConfig() config.ProtocolConfiguration { return bc.config } // SubscribeForBlocks adds given channel to new block event broadcasting, so when // there is a new block added to the chain you'll receive it via this channel. // Make sure it's read from regularly as not reading these events might affect // other Blockchain functions. func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) { bc.subCh <- ch } // SubscribeForTransactions adds given channel to new transaction event // broadcasting, so when there is a new transaction added to the chain (in a // block) you'll receive it via this channel. Make sure it's read from regularly // as not reading these events might affect other Blockchain functions. func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) { bc.subCh <- ch } // SubscribeForNotifications adds given channel to new notifications event // broadcasting, so when an in-block transaction execution generates a // notification you'll receive it via this channel. Only notifications from // successful transactions are broadcasted, if you're interested in failed // transactions use SubscribeForExecutions instead. Make sure this channel is // read from regularly as not reading these events might affect other Blockchain // functions. func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.NotificationEvent) { bc.subCh <- ch } // SubscribeForExecutions adds given channel to new transaction execution event // broadcasting, so when an in-block transaction execution happens you'll receive // the result of it via this channel. Make sure it's read from regularly as not // reading these events might affect other Blockchain functions. func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) { bc.subCh <- ch } // UnsubscribeFromBlocks unsubscribes given channel from new block notifications, // you can close it afterwards. Passing non-subscribed channel is a no-op. func (bc *Blockchain) UnsubscribeFromBlocks(ch chan<- *block.Block) { bc.unsubCh <- ch } // UnsubscribeFromTransactions unsubscribes given channel from new transaction // notifications, you can close it afterwards. Passing non-subscribed channel is // a no-op. func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transaction) { bc.unsubCh <- ch } // UnsubscribeFromNotifications unsubscribes given channel from new // execution-generated notifications, you can close it afterwards. Passing // non-subscribed channel is a no-op. func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.NotificationEvent) { bc.unsubCh <- ch } // UnsubscribeFromExecutions unsubscribes given channel from new execution // notifications, you can close it afterwards. Passing non-subscribed channel is // a no-op. func (bc *Blockchain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) { bc.unsubCh <- ch } // CalculateClaimable calculates the amount of GAS generated by owning specified // amount of NEO between specified blocks. The amount of NEO being passed is in // its natural non-divisible form (1 NEO as 1, 2 NEO as 2, no multiplication by // 10⁸ is needed as for Fixed8). func (bc *Blockchain) CalculateClaimable(value *big.Int, startHeight, endHeight uint32) *big.Int { var amount int64 di := uint32(bc.decrementInterval) ustart := startHeight / di if genSize := uint32(len(bc.generationAmount)); ustart < genSize { uend := endHeight / di iend := endHeight % di if uend >= genSize { uend = genSize - 1 iend = di } else if iend == 0 { uend-- iend = di } istart := startHeight % di for ustart < uend { amount += int64(di-istart) * int64(bc.generationAmount[ustart]) ustart++ istart = 0 } amount += int64(iend-istart) * int64(bc.generationAmount[ustart]) } return new(big.Int).Mul(big.NewInt(amount), value) } // FeePerByte returns transaction network fee per byte. func (bc *Blockchain) FeePerByte() int64 { return bc.contracts.Policy.GetFeePerByteInternal(bc.dao) } // GetMaxBlockSize returns maximum allowed block size from native Policy contract. func (bc *Blockchain) GetMaxBlockSize() uint32 { return bc.contracts.Policy.GetMaxBlockSizeInternal(bc.dao) } // GetMaxBlockSystemFee returns maximum block system fee from native Policy contract. func (bc *Blockchain) GetMaxBlockSystemFee() int64 { return bc.contracts.Policy.GetMaxBlockSystemFeeInternal(bc.dao) } // GetMemPool returns the memory pool of the blockchain. func (bc *Blockchain) GetMemPool() *mempool.Pool { return &bc.memPool } // ApplyPolicyToTxSet applies configured policies to given transaction set. It // expects slice to be ordered by fee and returns a subslice of it. func (bc *Blockchain) ApplyPolicyToTxSet(txes []*transaction.Transaction) []*transaction.Transaction { maxTx := bc.contracts.Policy.GetMaxTransactionsPerBlockInternal(bc.dao) if maxTx != 0 && len(txes) > int(maxTx) { txes = txes[:maxTx] } maxBlockSize := bc.contracts.Policy.GetMaxBlockSizeInternal(bc.dao) maxBlockSysFee := bc.contracts.Policy.GetMaxBlockSystemFeeInternal(bc.dao) var ( blockSize uint32 sysFee int64 ) blockSize = uint32(io.GetVarSize(new(block.Block)) + io.GetVarSize(len(txes)+1)) for i, tx := range txes { blockSize += uint32(io.GetVarSize(tx)) sysFee += tx.SystemFee if blockSize > maxBlockSize || sysFee > maxBlockSysFee { txes = txes[:i] break } } return txes } func (bc *Blockchain) verifyHeader(currHeader, prevHeader *block.Header) error { if prevHeader.Hash() != currHeader.PrevHash { return errors.New("previous header hash doesn't match") } if prevHeader.Index+1 != currHeader.Index { return errors.New("previous header index doesn't match") } if prevHeader.Timestamp >= currHeader.Timestamp { return errors.New("block is not newer than the previous one") } return bc.verifyHeaderWitnesses(currHeader, prevHeader) } // verifyTx verifies whether a transaction is bonafide or not. func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) error { height := bc.BlockHeight() if t.ValidUntilBlock <= height || t.ValidUntilBlock > height+transaction.MaxValidUntilBlockIncrement { return fmt.Errorf("transaction has expired. ValidUntilBlock = %d, current height = %d", t.ValidUntilBlock, height) } // Policying. if err := bc.contracts.Policy.CheckPolicy(bc.dao, t); err != nil { // Only one %w can be used. return fmt.Errorf("%w: %v", ErrPolicy, err) } balance := bc.GetUtilityTokenBalance(t.Sender()) need := t.SystemFee + t.NetworkFee if balance.Cmp(big.NewInt(need)) < 0 { return fmt.Errorf("insufficient funds: balance is %v, need: %v", balance, need) } size := io.GetVarSize(t) if size > transaction.MaxTransactionSize { return fmt.Errorf("too big transaction (%d > MaxTransactionSize %d)", size, transaction.MaxTransactionSize) } needNetworkFee := int64(size) * bc.FeePerByte() netFee := t.NetworkFee - needNetworkFee if netFee < 0 { return fmt.Errorf("insufficient funds: net fee is %v, need %v", t.NetworkFee, needNetworkFee) } if block == nil { if ok := bc.memPool.Verify(t, bc); !ok { return errors.New("invalid transaction due to conflicts with the memory pool") } } return bc.verifyTxWitnesses(t, block) } // isTxStillRelevant is a callback for mempool transaction filtering after the // new block addition. It returns false for transactions already present in the // chain (added by the new block), transactions using some inputs that are // already used (double spends) and does witness reverification for non-standard // contracts. It operates under the assumption that full transaction verification // was already done so we don't need to check basic things like size, input/output // correctness, etc. func (bc *Blockchain) isTxStillRelevant(t *transaction.Transaction) bool { var recheckWitness bool if bc.dao.HasTransaction(t.Hash()) { return false } for i := range t.Scripts { if !vm.IsStandardContract(t.Scripts[i].VerificationScript) { recheckWitness = true break } } if recheckWitness { return bc.verifyTxWitnesses(t, nil) == nil } return true } // AddStateRoot add new (possibly unverified) state root to the blockchain. func (bc *Blockchain) AddStateRoot(r *state.MPTRoot) error { our, err := bc.GetStateRoot(r.Index) if err == nil { if our.Flag == state.Verified { return bc.updateStateHeight(r.Index) } else if r.Witness == nil && our.Witness != nil { r.Witness = our.Witness } } if err := bc.verifyStateRoot(r); err != nil { return fmt.Errorf("invalid state root: %w", err) } if r.Index > bc.BlockHeight() { // just put it into the store for future checks return bc.dao.PutStateRoot(&state.MPTRootState{ MPTRoot: *r, Flag: state.Unverified, }) } flag := state.Unverified if r.Witness != nil { if err := bc.verifyStateRootWitness(r); err != nil { return fmt.Errorf("can't verify signature: %w", err) } flag = state.Verified } err = bc.dao.PutStateRoot(&state.MPTRootState{ MPTRoot: *r, Flag: flag, }) if err != nil { return err } return bc.updateStateHeight(r.Index) } func (bc *Blockchain) updateStateHeight(newHeight uint32) error { h, err := bc.dao.GetCurrentStateRootHeight() if err != nil { return fmt.Errorf("can't get current state root height: %w", err) } else if newHeight == h+1 { updateStateHeightMetric(newHeight) return bc.dao.PutCurrentStateRootHeight(h + 1) } return nil } // verifyStateRoot checks if state root is valid. func (bc *Blockchain) verifyStateRoot(r *state.MPTRoot) error { if r.Index == 0 { return nil } prev, err := bc.GetStateRoot(r.Index - 1) if err != nil { return errors.New("can't get previous state root") } else if !r.PrevHash.Equals(hash.DoubleSha256(prev.GetSignedPart())) { return errors.New("previous hash mismatch") } else if prev.Version != r.Version { return errors.New("version mismatch") } return nil } // verifyStateRootWitness verifies that state root signature is correct. func (bc *Blockchain) verifyStateRootWitness(r *state.MPTRoot) error { b, err := bc.GetBlock(bc.GetHeaderHash(int(r.Index))) if err != nil { return err } interopCtx := bc.newInteropContext(trigger.Verification, bc.dao, nil, nil) interopCtx.Container = r return bc.verifyHashAgainstScript(b.NextConsensus, r.Witness, interopCtx, true, bc.contracts.Policy.GetMaxVerificationGas(interopCtx.DAO)) } // VerifyTx verifies whether a transaction is bonafide or not. Block parameter // is used for easy interop access and can be omitted for transactions that are // not yet added into any block. // Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270). func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error { bc.lock.RLock() defer bc.lock.RUnlock() return bc.verifyTx(t, block) } // PoolTx verifies and tries to add given transaction into the mempool. func (bc *Blockchain) PoolTx(t *transaction.Transaction) error { bc.lock.RLock() defer bc.lock.RUnlock() if bc.HasTransaction(t.Hash()) { return fmt.Errorf("blockchain: %w", ErrAlreadyExists) } if err := bc.verifyTx(t, nil); err != nil { return err } if err := bc.memPool.Add(t, bc); err != nil { switch { case errors.Is(err, mempool.ErrOOM): return ErrOOM case errors.Is(err, mempool.ErrDup): return fmt.Errorf("mempool: %w", ErrAlreadyExists) default: return err } } return nil } //GetStandByValidators returns validators from the configuration. func (bc *Blockchain) GetStandByValidators() keys.PublicKeys { return bc.sbCommittee[:bc.config.ValidatorsCount].Copy() } // GetStandByCommittee returns standby commitee from the configuration. func (bc *Blockchain) GetStandByCommittee() keys.PublicKeys { return bc.sbCommittee.Copy() } // GetValidators returns current validators. func (bc *Blockchain) GetValidators() ([]*keys.PublicKey, error) { return bc.contracts.NEO.GetValidatorsInternal(bc, bc.dao) } // GetNextBlockValidators returns next block validators. func (bc *Blockchain) GetNextBlockValidators() ([]*keys.PublicKey, error) { return bc.contracts.NEO.GetNextBlockValidatorsInternal(bc, bc.dao) } // GetEnrollments returns all registered validators. func (bc *Blockchain) GetEnrollments() ([]state.Validator, error) { return bc.contracts.NEO.GetCandidates(bc.dao) } // GetTestVM returns a VM and a Store setup for a test run of some sort of code. func (bc *Blockchain) GetTestVM(tx *transaction.Transaction) *vm.VM { systemInterop := bc.newInteropContext(trigger.Application, bc.dao, nil, tx) vm := systemInterop.SpawnVM() vm.SetPriceGetter(getPrice) return vm } // ScriptFromWitness returns verification script for provided witness. // If hash is not equal to the witness script hash, error is returned. func ScriptFromWitness(hash util.Uint160, witness *transaction.Witness) ([]byte, error) { verification := witness.VerificationScript if len(verification) == 0 { bb := io.NewBufBinWriter() emit.AppCall(bb.BinWriter, hash) verification = bb.Bytes() } else if h := witness.ScriptHash(); hash != h { return nil, errors.New("witness hash mismatch") } return verification, nil } // verifyHashAgainstScript verifies given hash against the given witness. func (bc *Blockchain) verifyHashAgainstScript(hash util.Uint160, witness *transaction.Witness, interopCtx *interop.Context, useKeys bool, gas int64) error { verification, err := ScriptFromWitness(hash, witness) if err != nil { return err } gasPolicy := bc.contracts.Policy.GetMaxVerificationGas(interopCtx.DAO) if gas > gasPolicy { gas = gasPolicy } vm := interopCtx.SpawnVM() vm.SetPriceGetter(getPrice) vm.GasLimit = gas vm.LoadScriptWithFlags(verification, smartcontract.NoneFlag) vm.LoadScript(witness.InvocationScript) if useKeys { bc.keyCacheLock.RLock() if bc.keyCache[hash] != nil { vm.SetPublicKeys(bc.keyCache[hash]) } bc.keyCacheLock.RUnlock() } err = vm.Run() if vm.HasFailed() { return fmt.Errorf("vm execution has failed: %w", err) } resEl := vm.Estack().Pop() if resEl != nil { if !resEl.Bool() { return fmt.Errorf("signature check failed") } if useKeys { bc.keyCacheLock.RLock() _, ok := bc.keyCache[hash] bc.keyCacheLock.RUnlock() if !ok { bc.keyCacheLock.Lock() bc.keyCache[hash] = vm.GetPublicKeys() bc.keyCacheLock.Unlock() } } } else { return fmt.Errorf("no result returned from the script") } return nil } // verifyTxWitnesses verifies the scripts (witnesses) that come with a given // transaction. It can reorder them by ScriptHash, because that's required to // match a slice of script hashes from the Blockchain. Block parameter // is used for easy interop access and can be omitted for transactions that are // not yet added into any block. // Golang implementation of VerifyWitnesses method in C# (https://github.com/neo-project/neo/blob/master/neo/SmartContract/Helper.cs#L87). func (bc *Blockchain) verifyTxWitnesses(t *transaction.Transaction, block *block.Block) error { if len(t.Signers) != len(t.Scripts) { return fmt.Errorf("number of signers doesn't match witnesses (%d vs %d)", len(t.Signers), len(t.Scripts)) } interopCtx := bc.newInteropContext(trigger.Verification, bc.dao, block, t) for i := range t.Signers { err := bc.verifyHashAgainstScript(t.Signers[i].Account, &t.Scripts[i], interopCtx, false, t.NetworkFee) if err != nil { return fmt.Errorf("witness #%d: %w", i, err) } } return nil } // verifyHeaderWitnesses is a block-specific implementation of VerifyWitnesses logic. func (bc *Blockchain) verifyHeaderWitnesses(currHeader, prevHeader *block.Header) error { var hash util.Uint160 if prevHeader == nil && currHeader.PrevHash.Equals(util.Uint256{}) { hash = currHeader.Script.ScriptHash() } else { hash = prevHeader.NextConsensus } interopCtx := bc.newInteropContext(trigger.Verification, bc.dao, nil, nil) interopCtx.Container = currHeader return bc.verifyHashAgainstScript(hash, &currHeader.Script, interopCtx, true, verificationGasLimit) } // GoverningTokenHash returns the governing token (NEO) native contract hash. func (bc *Blockchain) GoverningTokenHash() util.Uint160 { return bc.contracts.NEO.Hash } // UtilityTokenHash returns the utility token (GAS) native contract hash. func (bc *Blockchain) UtilityTokenHash() util.Uint160 { return bc.contracts.GAS.Hash } func hashAndIndexToBytes(h util.Uint256, index uint32) []byte { buf := io.NewBufBinWriter() buf.WriteBytes(h.BytesLE()) buf.WriteU32LE(index) return buf.Bytes() } func (bc *Blockchain) secondsPerBlock() int { return bc.config.SecondsPerBlock } func (bc *Blockchain) newInteropContext(trigger trigger.Type, d dao.DAO, block *block.Block, tx *transaction.Transaction) *interop.Context { ic := interop.NewContext(trigger, bc, d, bc.contracts.Contracts, block, tx, bc.log) ic.Functions = [][]interop.Function{systemInterops, neoInterops} switch { case tx != nil: ic.Container = tx case block != nil: ic.Container = block } return ic }