mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-12-23 13:41:37 +00:00
core: don't always store all hashes in memory
We're paging these hashes, so we need a previous full page and a current one plus some cache for various requests. Storing 1M of hashes is 32M of memory and it grows quickly. It also seriously affects node startup time, most of what it's doing is reading these hashes, the longer the chain the more time it needs to do that. Notice that this doesn't change the underlying DB scheme in any way.
This commit is contained in:
parent
0ad6e295ea
commit
1c38b45074
5 changed files with 255 additions and 160 deletions
|
@ -45,7 +45,6 @@ import (
|
||||||
|
|
||||||
// Tuning parameters.
|
// Tuning parameters.
|
||||||
const (
|
const (
|
||||||
headerBatchCount = 2000
|
|
||||||
version = "0.2.6"
|
version = "0.2.6"
|
||||||
|
|
||||||
defaultInitialGAS = 52000000_00000000
|
defaultInitialGAS = 52000000_00000000
|
||||||
|
@ -115,6 +114,8 @@ var (
|
||||||
// the state of the ledger that can be accessed in various ways and changed by
|
// the state of the ledger that can be accessed in various ways and changed by
|
||||||
// adding new blocks or headers.
|
// adding new blocks or headers.
|
||||||
type Blockchain struct {
|
type Blockchain struct {
|
||||||
|
HeaderHashes
|
||||||
|
|
||||||
config config.ProtocolConfiguration
|
config config.ProtocolConfiguration
|
||||||
|
|
||||||
// The only way chain state changes is by adding blocks, so we can't
|
// The only way chain state changes is by adding blocks, so we can't
|
||||||
|
@ -151,13 +152,6 @@ type Blockchain struct {
|
||||||
// Current persisted block count.
|
// Current persisted block count.
|
||||||
persistedHeight uint32
|
persistedHeight uint32
|
||||||
|
|
||||||
// Number of headers stored in the chain file.
|
|
||||||
storedHeaderCount uint32
|
|
||||||
|
|
||||||
// Header hashes list with associated lock.
|
|
||||||
headerHashesLock sync.RWMutex
|
|
||||||
headerHashes []util.Uint256
|
|
||||||
|
|
||||||
// Stop synchronization mechanisms.
|
// Stop synchronization mechanisms.
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
runToExitCh chan struct{}
|
runToExitCh chan struct{}
|
||||||
|
@ -380,8 +374,7 @@ func (bc *Blockchain) init() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
bc.headerHashes = []util.Uint256{genesisBlock.Hash()}
|
bc.HeaderHashes.initGenesis(bc.dao, genesisBlock.Hash())
|
||||||
bc.dao.PutCurrentHeader(genesisBlock.Hash(), genesisBlock.Index)
|
|
||||||
if err := bc.stateRoot.Init(0); err != nil {
|
if err := bc.stateRoot.Init(0); err != nil {
|
||||||
return fmt.Errorf("can't init MPT: %w", err)
|
return fmt.Errorf("can't init MPT: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -414,41 +407,11 @@ func (bc *Blockchain) init() error {
|
||||||
// and the genesis block as first block.
|
// and the genesis block as first block.
|
||||||
bc.log.Info("restoring blockchain", zap.String("version", version))
|
bc.log.Info("restoring blockchain", zap.String("version", version))
|
||||||
|
|
||||||
bc.headerHashes, err = bc.dao.GetHeaderHashes()
|
err = bc.HeaderHashes.init(bc.dao)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
bc.storedHeaderCount = uint32(len(bc.headerHashes))
|
|
||||||
|
|
||||||
currHeaderHeight, currHeaderHash, err := bc.dao.GetCurrentHeaderHeight()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to retrieve current header info: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// There is a high chance that the Node is stopped before the next
|
|
||||||
// batch of 2000 headers was stored. Via the currentHeaders stored we can sync
|
|
||||||
// that with stored blocks.
|
|
||||||
if currHeaderHeight >= bc.storedHeaderCount {
|
|
||||||
hash := currHeaderHash
|
|
||||||
var targetHash util.Uint256
|
|
||||||
if len(bc.headerHashes) > 0 {
|
|
||||||
targetHash = bc.headerHashes[len(bc.headerHashes)-1]
|
|
||||||
}
|
|
||||||
headers := make([]util.Uint256, 0, headerBatchCount)
|
|
||||||
|
|
||||||
for hash != targetHash {
|
|
||||||
header, err := bc.GetHeader(hash)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not get header %s: %w", hash, err)
|
|
||||||
}
|
|
||||||
headers = append(headers, header.Hash())
|
|
||||||
hash = header.PrevHash
|
|
||||||
}
|
|
||||||
hashSliceReverse(headers)
|
|
||||||
bc.headerHashes = append(bc.headerHashes, headers...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check whether StateChangeState stage is in the storage and continue interrupted state jump / state reset if so.
|
// Check whether StateChangeState stage is in the storage and continue interrupted state jump / state reset if so.
|
||||||
stateChStage, err := bc.dao.Store.Get([]byte{byte(storage.SYSStateChangeStage)})
|
stateChStage, err := bc.dao.Store.Get([]byte{byte(storage.SYSStateChangeStage)})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -539,8 +502,8 @@ func (bc *Blockchain) jumpToState(p uint32) error {
|
||||||
// jump stage. All the data needed for the jump must be in the DB, otherwise an
|
// jump stage. All the data needed for the jump must be in the DB, otherwise an
|
||||||
// error is returned. It is not protected by mutex.
|
// error is returned. It is not protected by mutex.
|
||||||
func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) error {
|
func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) error {
|
||||||
if p+1 >= uint32(len(bc.headerHashes)) {
|
if p >= bc.HeaderHeight() {
|
||||||
return fmt.Errorf("invalid state sync point %d: headerHeignt is %d", p, len(bc.headerHashes))
|
return fmt.Errorf("invalid state sync point %d: headerHeignt is %d", p, bc.HeaderHeight())
|
||||||
}
|
}
|
||||||
|
|
||||||
bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p))
|
bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p))
|
||||||
|
@ -575,7 +538,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro
|
||||||
// After current state is updated, we need to remove outdated state-related data if so.
|
// After current state is updated, we need to remove outdated state-related data if so.
|
||||||
// The only outdated data we might have is genesis-related data, so check it.
|
// The only outdated data we might have is genesis-related data, so check it.
|
||||||
if p-bc.config.MaxTraceableBlocks > 0 {
|
if p-bc.config.MaxTraceableBlocks > 0 {
|
||||||
err := cache.DeleteBlock(bc.headerHashes[0])
|
err := cache.DeleteBlock(bc.GetHeaderHash(0))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err)
|
return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -588,7 +551,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Update SYS-prefixed info.
|
// Update SYS-prefixed info.
|
||||||
block, err := bc.dao.GetBlock(bc.headerHashes[p])
|
block, err := bc.dao.GetBlock(bc.GetHeaderHash(p))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get current block: %w", err)
|
return fmt.Errorf("failed to get current block: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -604,7 +567,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unknown state jump stage: %d", stage)
|
return fmt.Errorf("unknown state jump stage: %d", stage)
|
||||||
}
|
}
|
||||||
block, err := bc.dao.GetBlock(bc.headerHashes[p+1])
|
block, err := bc.dao.GetBlock(bc.GetHeaderHash(p + 1))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get block to init MPT: %w", err)
|
return fmt.Errorf("failed to get block to init MPT: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -625,10 +588,12 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro
|
||||||
// resetRAMState resets in-memory cached info.
|
// resetRAMState resets in-memory cached info.
|
||||||
func (bc *Blockchain) resetRAMState(height uint32, resetHeaders bool) error {
|
func (bc *Blockchain) resetRAMState(height uint32, resetHeaders bool) error {
|
||||||
if resetHeaders {
|
if resetHeaders {
|
||||||
bc.headerHashes = bc.headerHashes[:height+1]
|
err := bc.HeaderHashes.init(bc.dao)
|
||||||
bc.storedHeaderCount = height + 1
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
block, err := bc.dao.GetBlock(bc.headerHashes[height])
|
}
|
||||||
|
block, err := bc.dao.GetBlock(bc.GetHeaderHash(height))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get current block: %w", err)
|
return fmt.Errorf("failed to get current block: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -685,7 +650,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve necessary state before the DB modification.
|
// Retrieve necessary state before the DB modification.
|
||||||
b, err := bc.GetBlock(bc.headerHashes[height])
|
b, err := bc.GetBlock(bc.GetHeaderHash(height))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to retrieve block %d: %w", height, err)
|
return fmt.Errorf("failed to retrieve block %d: %w", height, err)
|
||||||
}
|
}
|
||||||
|
@ -1406,7 +1371,6 @@ func (bc *Blockchain) AddHeaders(headers ...*block.Header) error {
|
||||||
func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
|
func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
|
||||||
var (
|
var (
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
batch = bc.dao.GetPrivate()
|
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1436,44 +1400,14 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
|
||||||
lastHeader = h
|
lastHeader = h
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
res := bc.HeaderHashes.addHeaders(headers...)
|
||||||
bc.headerHashesLock.Lock()
|
if res == nil {
|
||||||
defer bc.headerHashesLock.Unlock()
|
|
||||||
oldlen := len(bc.headerHashes)
|
|
||||||
var lastHeader *block.Header
|
|
||||||
for _, h := range headers {
|
|
||||||
if int(h.Index) != len(bc.headerHashes) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
err = batch.StoreHeader(h)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
bc.headerHashes = append(bc.headerHashes, h.Hash())
|
|
||||||
lastHeader = h
|
|
||||||
}
|
|
||||||
|
|
||||||
if oldlen != len(bc.headerHashes) {
|
|
||||||
for int(lastHeader.Index)-headerBatchCount >= int(bc.storedHeaderCount) {
|
|
||||||
err = batch.StoreHeaderHashes(bc.headerHashes[bc.storedHeaderCount:bc.storedHeaderCount+headerBatchCount],
|
|
||||||
bc.storedHeaderCount)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
bc.storedHeaderCount += headerBatchCount
|
|
||||||
}
|
|
||||||
|
|
||||||
batch.PutCurrentHeader(lastHeader.Hash(), lastHeader.Index)
|
|
||||||
updateHeaderHeightMetric(uint32(len(bc.headerHashes) - 1))
|
|
||||||
if _, err = batch.Persist(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
bc.log.Debug("done processing headers",
|
bc.log.Debug("done processing headers",
|
||||||
zap.Int("headerIndex", len(bc.headerHashes)-1),
|
zap.Uint32("headerIndex", bc.HeaderHeight()),
|
||||||
zap.Uint32("blockHeight", bc.BlockHeight()),
|
zap.Uint32("blockHeight", bc.BlockHeight()),
|
||||||
zap.Duration("took", time.Since(start)))
|
zap.Duration("took", time.Since(start)))
|
||||||
}
|
}
|
||||||
return nil
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetStateRoot returns state root for the given height.
|
// GetStateRoot returns state root for the given height.
|
||||||
|
@ -1528,7 +1462,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
|
||||||
stop = start + 1
|
stop = start + 1
|
||||||
}
|
}
|
||||||
for index := start; index < stop; index++ {
|
for index := start; index < stop; index++ {
|
||||||
err := kvcache.DeleteBlock(bc.headerHashes[index])
|
err := kvcache.DeleteBlock(bc.GetHeaderHash(index))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
bc.log.Warn("error while removing old block",
|
bc.log.Warn("error while removing old block",
|
||||||
zap.Uint32("index", index),
|
zap.Uint32("index", index),
|
||||||
|
@ -2151,15 +2085,9 @@ func (bc *Blockchain) HasTransaction(hash util.Uint256) bool {
|
||||||
// HasBlock returns true if the blockchain contains the given
|
// HasBlock returns true if the blockchain contains the given
|
||||||
// block hash.
|
// block hash.
|
||||||
func (bc *Blockchain) HasBlock(hash util.Uint256) bool {
|
func (bc *Blockchain) HasBlock(hash util.Uint256) bool {
|
||||||
var height = bc.BlockHeight()
|
if bc.HeaderHashes.haveRecentHash(hash, bc.BlockHeight()) {
|
||||||
bc.headerHashesLock.RLock()
|
|
||||||
for i := int(height); i >= int(height)-4 && i >= 0; i-- {
|
|
||||||
if hash.Equals(bc.headerHashes[i]) {
|
|
||||||
bc.headerHashesLock.RUnlock()
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
|
||||||
bc.headerHashesLock.RUnlock()
|
|
||||||
|
|
||||||
if header, err := bc.GetHeader(hash); err == nil {
|
if header, err := bc.GetHeader(hash); err == nil {
|
||||||
return header.Index <= bc.BlockHeight()
|
return header.Index <= bc.BlockHeight()
|
||||||
|
@ -2177,40 +2105,11 @@ func (bc *Blockchain) CurrentBlockHash() util.Uint256 {
|
||||||
return bc.GetHeaderHash(bc.BlockHeight())
|
return bc.GetHeaderHash(bc.BlockHeight())
|
||||||
}
|
}
|
||||||
|
|
||||||
// CurrentHeaderHash returns the hash of the latest known header.
|
|
||||||
func (bc *Blockchain) CurrentHeaderHash() util.Uint256 {
|
|
||||||
bc.headerHashesLock.RLock()
|
|
||||||
hash := bc.headerHashes[len(bc.headerHashes)-1]
|
|
||||||
bc.headerHashesLock.RUnlock()
|
|
||||||
return hash
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetHeaderHash returns hash of the header/block with specified index, if
|
|
||||||
// Blockchain doesn't have a hash for this height, zero Uint256 value is returned.
|
|
||||||
func (bc *Blockchain) GetHeaderHash(i uint32) util.Uint256 {
|
|
||||||
bc.headerHashesLock.RLock()
|
|
||||||
defer bc.headerHashesLock.RUnlock()
|
|
||||||
|
|
||||||
hashesLen := uint32(len(bc.headerHashes))
|
|
||||||
if hashesLen <= i {
|
|
||||||
return util.Uint256{}
|
|
||||||
}
|
|
||||||
return bc.headerHashes[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
// BlockHeight returns the height/index of the highest block.
|
// BlockHeight returns the height/index of the highest block.
|
||||||
func (bc *Blockchain) BlockHeight() uint32 {
|
func (bc *Blockchain) BlockHeight() uint32 {
|
||||||
return atomic.LoadUint32(&bc.blockHeight)
|
return atomic.LoadUint32(&bc.blockHeight)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeaderHeight returns the index/height of the highest header.
|
|
||||||
func (bc *Blockchain) HeaderHeight() uint32 {
|
|
||||||
bc.headerHashesLock.RLock()
|
|
||||||
n := len(bc.headerHashes)
|
|
||||||
bc.headerHashesLock.RUnlock()
|
|
||||||
return uint32(n - 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetContractState returns contract by its script hash.
|
// GetContractState returns contract by its script hash.
|
||||||
func (bc *Blockchain) GetContractState(hash util.Uint160) *state.Contract {
|
func (bc *Blockchain) GetContractState(hash util.Uint160) *state.Contract {
|
||||||
contract, err := bc.contracts.Management.GetContract(bc.dao, hash)
|
contract, err := bc.contracts.Management.GetContract(bc.dao, hash)
|
||||||
|
|
|
@ -225,7 +225,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
|
||||||
t.Run("invalid state sync point", func(t *testing.T) {
|
t.Run("invalid state sync point", func(t *testing.T) {
|
||||||
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
|
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
|
||||||
point := make([]byte, 4)
|
point := make([]byte, 4)
|
||||||
binary.LittleEndian.PutUint32(point, uint32(len(bcSpout.headerHashes)))
|
binary.LittleEndian.PutUint32(point, bcSpout.lastHeaderIndex()+1)
|
||||||
bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point)
|
bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point)
|
||||||
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "invalid state sync point")
|
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, "invalid state sync point")
|
||||||
})
|
})
|
||||||
|
|
|
@ -146,14 +146,15 @@ func TestBlockchain_StartFromExistingDB(t *testing.T) {
|
||||||
|
|
||||||
// Corrupt headers hashes batch.
|
// Corrupt headers hashes batch.
|
||||||
cache := storage.NewMemCachedStore(ps) // Extra wrapper to avoid good DB corruption.
|
cache := storage.NewMemCachedStore(ps) // Extra wrapper to avoid good DB corruption.
|
||||||
key := make([]byte, 5)
|
// Make the chain think we're at 2000+ which will trigger page 0 read.
|
||||||
key[0] = byte(storage.IXHeaderHashList)
|
buf := io.NewBufBinWriter()
|
||||||
binary.BigEndian.PutUint32(key[1:], 1)
|
buf.WriteBytes(util.Uint256{}.BytesLE())
|
||||||
cache.Put(key, []byte{1, 2, 3})
|
buf.WriteU32LE(2000)
|
||||||
|
cache.Put([]byte{byte(storage.SYSCurrentHeader)}, buf.Bytes())
|
||||||
|
|
||||||
_, _, _, err := chain.NewMultiWithCustomConfigAndStoreNoCheck(t, customConfig, cache)
|
_, _, _, err := chain.NewMultiWithCustomConfigAndStoreNoCheck(t, customConfig, cache)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.True(t, strings.Contains(err.Error(), "failed to read batch of 2000"), err)
|
require.True(t, strings.Contains(err.Error(), "failed to retrieve header hash page"), err)
|
||||||
})
|
})
|
||||||
t.Run("corrupted current header height", func(t *testing.T) {
|
t.Run("corrupted current header height", func(t *testing.T) {
|
||||||
ps = newPS(t)
|
ps = newPS(t)
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package dao
|
package dao
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -582,25 +581,23 @@ func (dao *Simple) GetStateSyncCurrentBlockHeight() (uint32, error) {
|
||||||
return binary.LittleEndian.Uint32(b), nil
|
return binary.LittleEndian.Uint32(b), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetHeaderHashes returns a sorted list of header hashes retrieved from
|
// GetHeaderHashes returns a page of header hashes retrieved from
|
||||||
// the given underlying store.
|
// the given underlying store.
|
||||||
func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
|
func (dao *Simple) GetHeaderHashes(height uint32) ([]util.Uint256, error) {
|
||||||
var hashes = make([]util.Uint256, 0)
|
var hashes []util.Uint256
|
||||||
|
|
||||||
var seekErr error
|
key := dao.mkHeaderHashKey(height)
|
||||||
dao.Store.Seek(storage.SeekRange{
|
b, err := dao.Store.Get(key)
|
||||||
Prefix: dao.mkKeyPrefix(storage.IXHeaderHashList),
|
|
||||||
}, func(k, v []byte) bool {
|
|
||||||
newHashes, err := read2000Uint256Hashes(v)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
seekErr = fmt.Errorf("failed to read batch of 2000 header hashes: %w", err)
|
return nil, err
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
hashes = append(hashes, newHashes...)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
return hashes, seekErr
|
br := io.NewBinReaderFromBuf(b)
|
||||||
|
br.ReadArray(&hashes)
|
||||||
|
if br.Err != nil {
|
||||||
|
return nil, br.Err
|
||||||
|
}
|
||||||
|
return hashes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteHeaderHashes removes batches of header hashes starting from the one that
|
// DeleteHeaderHashes removes batches of header hashes starting from the one that
|
||||||
|
@ -683,19 +680,6 @@ func (dao *Simple) PutStateSyncCurrentBlockHeight(h uint32) {
|
||||||
dao.Store.Put(dao.mkKeyPrefix(storage.SYSStateSyncCurrentBlockHeight), buf.Bytes())
|
dao.Store.Put(dao.mkKeyPrefix(storage.SYSStateSyncCurrentBlockHeight), buf.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
// read2000Uint256Hashes attempts to read 2000 Uint256 hashes from
|
|
||||||
// the given byte array.
|
|
||||||
func read2000Uint256Hashes(b []byte) ([]util.Uint256, error) {
|
|
||||||
r := bytes.NewReader(b)
|
|
||||||
br := io.NewBinReaderFromIO(r)
|
|
||||||
hashes := make([]util.Uint256, 0)
|
|
||||||
br.ReadArray(&hashes)
|
|
||||||
if br.Err != nil {
|
|
||||||
return nil, br.Err
|
|
||||||
}
|
|
||||||
return hashes, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dao *Simple) mkHeaderHashKey(h uint32) []byte {
|
func (dao *Simple) mkHeaderHashKey(h uint32) []byte {
|
||||||
b := dao.getKeyBuf(1 + 4)
|
b := dao.getKeyBuf(1 + 4)
|
||||||
b[0] = byte(storage.IXHeaderHashList)
|
b[0] = byte(storage.IXHeaderHashList)
|
||||||
|
|
211
pkg/core/headerhashes.go
Normal file
211
pkg/core/headerhashes.go
Normal file
|
@ -0,0 +1,211 @@
|
||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
lru "github.com/hashicorp/golang-lru"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/dao"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
headerBatchCount = 2000
|
||||||
|
pagesCache = 8
|
||||||
|
)
|
||||||
|
|
||||||
|
// HeaderHashes is a header hash manager part of the Blockchain. It can't be used
|
||||||
|
// without Blockchain.
|
||||||
|
type HeaderHashes struct {
|
||||||
|
// Backing storage.
|
||||||
|
dao *dao.Simple
|
||||||
|
|
||||||
|
// Lock for all internal state fields.
|
||||||
|
lock sync.RWMutex
|
||||||
|
|
||||||
|
// The latest header hashes (storedHeaderCount+).
|
||||||
|
latest []util.Uint256
|
||||||
|
|
||||||
|
// Previously completed page of header hashes (pre-storedHeaderCount).
|
||||||
|
previous []util.Uint256
|
||||||
|
|
||||||
|
// Number of headers stored in the chain file.
|
||||||
|
storedHeaderCount uint32
|
||||||
|
|
||||||
|
// Cache for accessed pages of header hashes.
|
||||||
|
cache *lru.Cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HeaderHashes) initGenesis(dao *dao.Simple, hash util.Uint256) {
|
||||||
|
h.dao = dao
|
||||||
|
h.cache, _ = lru.New(pagesCache) // Never errors for positive size.
|
||||||
|
h.previous = make([]util.Uint256, headerBatchCount)
|
||||||
|
h.latest = make([]util.Uint256, 0, headerBatchCount)
|
||||||
|
h.latest = append(h.latest, hash)
|
||||||
|
dao.PutCurrentHeader(hash, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HeaderHashes) init(dao *dao.Simple) error {
|
||||||
|
h.dao = dao
|
||||||
|
h.cache, _ = lru.New(pagesCache) // Never errors for positive size.
|
||||||
|
|
||||||
|
currHeaderHeight, currHeaderHash, err := h.dao.GetCurrentHeaderHeight()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to retrieve current header info: %w", err)
|
||||||
|
}
|
||||||
|
h.storedHeaderCount = ((currHeaderHeight + 1) / headerBatchCount) * headerBatchCount
|
||||||
|
|
||||||
|
if h.storedHeaderCount >= headerBatchCount {
|
||||||
|
h.previous, err = h.dao.GetHeaderHashes(h.storedHeaderCount - headerBatchCount)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to retrieve header hash page %d: %w", h.storedHeaderCount-headerBatchCount, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
h.previous = make([]util.Uint256, headerBatchCount)
|
||||||
|
}
|
||||||
|
h.latest = make([]util.Uint256, 0, headerBatchCount)
|
||||||
|
|
||||||
|
// There is a high chance that the Node is stopped before the next
|
||||||
|
// batch of 2000 headers was stored. Via the currentHeaders stored we can sync
|
||||||
|
// that with stored blocks.
|
||||||
|
if currHeaderHeight >= h.storedHeaderCount {
|
||||||
|
hash := currHeaderHash
|
||||||
|
var targetHash util.Uint256
|
||||||
|
if h.storedHeaderCount >= headerBatchCount {
|
||||||
|
targetHash = h.previous[len(h.previous)-1]
|
||||||
|
}
|
||||||
|
headers := make([]util.Uint256, 0, headerBatchCount)
|
||||||
|
|
||||||
|
for hash != targetHash {
|
||||||
|
blk, err := h.dao.GetBlock(hash)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get header %s: %w", hash, err)
|
||||||
|
}
|
||||||
|
headers = append(headers, blk.Hash())
|
||||||
|
hash = blk.PrevHash
|
||||||
|
}
|
||||||
|
hashSliceReverse(headers)
|
||||||
|
h.latest = append(h.latest, headers...)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HeaderHashes) lastHeaderIndex() uint32 {
|
||||||
|
return h.storedHeaderCount + uint32(len(h.latest)) - 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeaderHeight returns the index/height of the highest header.
|
||||||
|
func (h *HeaderHashes) HeaderHeight() uint32 {
|
||||||
|
h.lock.RLock()
|
||||||
|
n := h.lastHeaderIndex()
|
||||||
|
h.lock.RUnlock()
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HeaderHashes) addHeaders(headers ...*block.Header) error {
|
||||||
|
var (
|
||||||
|
batch = h.dao.GetPrivate()
|
||||||
|
lastHeader *block.Header
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
h.lock.Lock()
|
||||||
|
defer h.lock.Unlock()
|
||||||
|
|
||||||
|
for _, head := range headers {
|
||||||
|
if head.Index != h.lastHeaderIndex()+1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err = batch.StoreHeader(head)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
lastHeader = head
|
||||||
|
h.latest = append(h.latest, head.Hash())
|
||||||
|
if len(h.latest) == headerBatchCount {
|
||||||
|
err = batch.StoreHeaderHashes(h.latest, h.storedHeaderCount)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
copy(h.previous, h.latest)
|
||||||
|
h.latest = h.latest[:0]
|
||||||
|
h.storedHeaderCount += headerBatchCount
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastHeader != nil {
|
||||||
|
batch.PutCurrentHeader(lastHeader.Hash(), lastHeader.Index)
|
||||||
|
updateHeaderHeightMetric(lastHeader.Index)
|
||||||
|
if _, err = batch.Persist(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CurrentHeaderHash returns the hash of the latest known header.
|
||||||
|
func (h *HeaderHashes) CurrentHeaderHash() util.Uint256 {
|
||||||
|
var hash util.Uint256
|
||||||
|
|
||||||
|
h.lock.RLock()
|
||||||
|
if len(h.latest) > 0 {
|
||||||
|
hash = h.latest[len(h.latest)-1]
|
||||||
|
} else {
|
||||||
|
hash = h.previous[len(h.previous)-1]
|
||||||
|
}
|
||||||
|
h.lock.RUnlock()
|
||||||
|
return hash
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetHeaderHash returns hash of the header/block with specified index, if
|
||||||
|
// HeaderHashes doesn't have a hash for this height, zero Uint256 value is returned.
|
||||||
|
func (h *HeaderHashes) GetHeaderHash(i uint32) util.Uint256 {
|
||||||
|
h.lock.RLock()
|
||||||
|
res, ok := h.getLocalHeaderHash(i)
|
||||||
|
h.lock.RUnlock()
|
||||||
|
if ok {
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
// If it's not in the latest/previous, then it's in the cache or DB, those
|
||||||
|
// need no additional locks.
|
||||||
|
page := (i / headerBatchCount) * headerBatchCount
|
||||||
|
cache, ok := h.cache.Get(page)
|
||||||
|
if ok {
|
||||||
|
hashes := cache.([]util.Uint256)
|
||||||
|
return hashes[i-page]
|
||||||
|
}
|
||||||
|
hashes, err := h.dao.GetHeaderHashes(page)
|
||||||
|
if err != nil {
|
||||||
|
return util.Uint256{}
|
||||||
|
}
|
||||||
|
_ = h.cache.Add(page, hashes)
|
||||||
|
return hashes[i-page]
|
||||||
|
}
|
||||||
|
|
||||||
|
// getLocalHeaderHash looks for the index in the latest and previous caches.
|
||||||
|
// Locking is left to the user.
|
||||||
|
func (h *HeaderHashes) getLocalHeaderHash(i uint32) (util.Uint256, bool) {
|
||||||
|
if i > h.lastHeaderIndex() {
|
||||||
|
return util.Uint256{}, false
|
||||||
|
}
|
||||||
|
if i >= h.storedHeaderCount {
|
||||||
|
return h.latest[i-h.storedHeaderCount], true
|
||||||
|
}
|
||||||
|
previousStored := h.storedHeaderCount - headerBatchCount
|
||||||
|
if i >= previousStored {
|
||||||
|
return h.previous[i-previousStored], true
|
||||||
|
}
|
||||||
|
return util.Uint256{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HeaderHashes) haveRecentHash(hash util.Uint256, i uint32) bool {
|
||||||
|
h.lock.RLock()
|
||||||
|
defer h.lock.RUnlock()
|
||||||
|
for ; i > 0; i-- {
|
||||||
|
lh, ok := h.getLocalHeaderHash(i)
|
||||||
|
if ok && hash.Equals(lh) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
Loading…
Reference in a new issue