core: rework headerList

It's a contention point as all accesses to it are serialized and they compete
with persisting logic at the same time.
This commit is contained in:
Roman Khimov 2020-09-16 14:28:18 +03:00
parent 6f3aff76a4
commit 7cbb660082
2 changed files with 90 additions and 167 deletions

View file

@ -100,13 +100,9 @@ type Blockchain struct {
generationAmount []int
decrementInterval int
// All operations on headerList must be called from an
// headersOp to be routine safe.
headerList *HeaderHashList
// Only for operating on the headerList.
headersOp chan headersOpFunc
headersOpDone chan struct{}
// Header hashes list with associated lock.
headerHashesLock sync.RWMutex
headerHashes []util.Uint256
// Stop synchronization mechanisms.
stopCh chan struct{}
@ -137,8 +133,6 @@ type bcEvent struct {
appExecResults []*state.AppExecResult
}
type headersOpFunc func(headerList *HeaderHashList)
// NewBlockchain returns a new blockchain object the will use the
// given Store as its underlying storage. For it to work correctly you need
// to spawn a goroutine for its Run method after this initialization.
@ -156,18 +150,16 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
return nil, err
}
bc := &Blockchain{
config: cfg,
dao: dao.NewSimple(s, cfg.Magic),
headersOp: make(chan headersOpFunc),
headersOpDone: make(chan struct{}),
stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}),
memPool: mempool.New(cfg.MemPoolSize),
sbCommittee: committee,
log: log,
events: make(chan bcEvent),
subCh: make(chan interface{}),
unsubCh: make(chan interface{}),
config: cfg,
dao: dao.NewSimple(s, cfg.Magic),
stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}),
memPool: mempool.New(cfg.MemPoolSize),
sbCommittee: committee,
log: log,
events: make(chan bcEvent),
subCh: make(chan interface{}),
unsubCh: make(chan interface{}),
generationAmount: genAmount,
decrementInterval: decrementInterval,
@ -194,7 +186,7 @@ func (bc *Blockchain) init() error {
if err != nil {
return err
}
bc.headerList = NewHeaderHashList(genesisBlock.Hash())
bc.headerHashes = []util.Uint256{genesisBlock.Hash()}
err = bc.dao.PutCurrentHeader(hashAndIndexToBytes(genesisBlock.Hash(), genesisBlock.Index))
if err != nil {
return err
@ -220,20 +212,19 @@ func (bc *Blockchain) init() error {
return fmt.Errorf("can't init MPT at height %d: %w", bHeight, err)
}
hashes, err := bc.dao.GetHeaderHashes()
bc.headerHashes, err = bc.dao.GetHeaderHashes()
if err != nil {
return err
}
bc.headerList = NewHeaderHashList(hashes...)
bc.storedHeaderCount = uint32(len(hashes))
bc.storedHeaderCount = uint32(len(bc.headerHashes))
currHeaderHeight, currHeaderHash, err := bc.dao.GetCurrentHeaderHeight()
if err != nil {
return err
}
if bc.storedHeaderCount == 0 && currHeaderHeight == 0 {
bc.headerList.Add(currHeaderHash)
bc.headerHashes = append(bc.headerHashes, currHeaderHash)
}
// There is a high chance that the Node is stopped before the next
@ -242,15 +233,15 @@ func (bc *Blockchain) init() error {
if currHeaderHeight >= bc.storedHeaderCount {
hash := currHeaderHash
var targetHash util.Uint256
if bc.headerList.Len() > 0 {
targetHash = bc.headerList.Get(bc.headerList.Len() - 1)
if len(bc.headerHashes) > 0 {
targetHash = bc.headerHashes[len(bc.headerHashes)-1]
} else {
genesisBlock, err := createGenesisBlock(bc.config)
if err != nil {
return err
}
targetHash = genesisBlock.Hash()
bc.headerList.Add(targetHash)
bc.headerHashes = append(bc.headerHashes, targetHash)
}
headers := make([]*block.Header, 0)
@ -264,7 +255,7 @@ func (bc *Blockchain) init() error {
}
headerSliceReverse(headers)
for _, h := range headers {
bc.headerList.Add(h.Hash())
bc.headerHashes = append(bc.headerHashes, h.Hash())
}
}
@ -290,9 +281,6 @@ func (bc *Blockchain) Run() {
select {
case <-bc.stopCh:
return
case op := <-bc.headersOp:
op(bc.headerList)
bc.headersOpDone <- struct{}{}
case <-persistTimer.C:
go func() {
err := bc.persist()
@ -415,8 +403,7 @@ func (bc *Blockchain) AddBlock(block *block.Block) error {
return fmt.Errorf("expected %d, got %d: %w", expectedHeight, block.Index, ErrInvalidBlockIndex)
}
headerLen := bc.headerListLen()
if int(block.Index) == headerLen {
if block.Index == bc.HeaderHeight()+1 {
err := bc.addHeaders(bc.config.VerifyBlocks, block.Header())
if err != nil {
return err
@ -490,65 +477,52 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) (err err
}
}
bc.headersOp <- func(headerList *HeaderHashList) {
oldlen := headerList.Len()
for _, h := range headers {
if int(h.Index-1) >= headerList.Len() {
err = fmt.Errorf(
"height of received header %d is higher then the current header %d",
h.Index, headerList.Len(),
)
return
}
if int(h.Index) < headerList.Len() {
continue
}
if err = bc.processHeader(h, batch, headerList); err != nil {
return
}
}
if oldlen != headerList.Len() {
updateHeaderHeightMetric(headerList.Len() - 1)
if err = bc.dao.Store.PutBatch(batch); err != nil {
return
}
bc.log.Debug("done processing headers",
zap.Int("headerIndex", headerList.Len()-1),
zap.Uint32("blockHeight", bc.BlockHeight()),
zap.Duration("took", time.Since(start)))
}
}
<-bc.headersOpDone
return err
}
// processHeader processes the given header. Note that this is only thread safe
// if executed in headers operation.
func (bc *Blockchain) processHeader(h *block.Header, batch storage.Batch, headerList *HeaderHashList) error {
headerList.Add(h.Hash())
buf := io.NewBufBinWriter()
for int(h.Index)-headerBatchCount >= int(bc.storedHeaderCount) {
if err := headerList.Write(buf.BinWriter, int(bc.storedHeaderCount), headerBatchCount); err != nil {
bc.headerHashesLock.Lock()
defer bc.headerHashesLock.Unlock()
oldlen := len(bc.headerHashes)
var lastHeader *block.Header
for _, h := range headers {
if int(h.Index-1) >= len(bc.headerHashes) {
return fmt.Errorf("height of received header %d is higher than the current header %d", h.Index, len(bc.headerHashes))
}
if int(h.Index) < len(bc.headerHashes) {
continue
}
bc.headerHashes = append(bc.headerHashes, h.Hash())
h.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
}
key := storage.AppendPrefix(storage.DataBlock, h.Hash().BytesLE())
batch.Put(key, buf.Bytes())
buf.Reset()
lastHeader = h
}
if oldlen != len(bc.headerHashes) {
for int(lastHeader.Index)-headerBatchCount >= int(bc.storedHeaderCount) {
buf.WriteArray(bc.headerHashes[bc.storedHeaderCount : bc.storedHeaderCount+headerBatchCount])
if buf.Err != nil {
return buf.Err
}
key := storage.AppendPrefixInt(storage.IXHeaderHashList, int(bc.storedHeaderCount))
batch.Put(key, buf.Bytes())
bc.storedHeaderCount += headerBatchCount
}
batch.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndexToBytes(lastHeader.Hash(), lastHeader.Index))
updateHeaderHeightMetric(len(bc.headerHashes) - 1)
if err = bc.dao.Store.PutBatch(batch); err != nil {
return err
}
key := storage.AppendPrefixInt(storage.IXHeaderHashList, int(bc.storedHeaderCount))
batch.Put(key, buf.Bytes())
bc.storedHeaderCount += headerBatchCount
buf.Reset()
bc.log.Debug("done processing headers",
zap.Int("headerIndex", len(bc.headerHashes)-1),
zap.Uint32("blockHeight", bc.BlockHeight()),
zap.Duration("took", time.Since(start)))
}
buf.Reset()
h.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
}
key := storage.AppendPrefix(storage.DataBlock, h.Hash().BytesLE())
batch.Put(key, buf.Bytes())
batch.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndexToBytes(h.Hash(), h.Index))
return nil
}
@ -902,14 +876,6 @@ func (bc *Blockchain) persist() error {
return nil
}
func (bc *Blockchain) headerListLen() (n int) {
bc.headersOp <- func(headerList *HeaderHashList) {
n = headerList.Len()
}
<-bc.headersOpDone
return
}
// GetTransaction returns a TX and its height by the given hash.
func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) {
if tx, ok := bc.memPool.TryGetValue(hash); ok {
@ -988,31 +954,35 @@ func (bc *Blockchain) HasBlock(hash util.Uint256) bool {
}
// CurrentBlockHash returns the highest processed block hash.
func (bc *Blockchain) CurrentBlockHash() (hash util.Uint256) {
bc.headersOp <- func(headerList *HeaderHashList) {
hash = headerList.Get(int(bc.BlockHeight()))
func (bc *Blockchain) CurrentBlockHash() util.Uint256 {
topBlock := bc.topBlock.Load()
if topBlock != nil {
if tb, ok := topBlock.(*block.Block); ok {
return tb.Hash()
}
}
<-bc.headersOpDone
return
return bc.GetHeaderHash(int(bc.BlockHeight()))
}
// CurrentHeaderHash returns the hash of the latest known header.
func (bc *Blockchain) CurrentHeaderHash() (hash util.Uint256) {
bc.headersOp <- func(headerList *HeaderHashList) {
hash = headerList.Last()
}
<-bc.headersOpDone
return
func (bc *Blockchain) CurrentHeaderHash() util.Uint256 {
bc.headerHashesLock.RLock()
hash := bc.headerHashes[len(bc.headerHashes)-1]
bc.headerHashesLock.RUnlock()
return hash
}
// GetHeaderHash returns the hash from the headerList by its
// height/index.
func (bc *Blockchain) GetHeaderHash(i int) (hash util.Uint256) {
bc.headersOp <- func(headerList *HeaderHashList) {
hash = headerList.Get(i)
// GetHeaderHash returns hash of the header/block with specified index, if
// Blockchain doesn't have a hash for this height, zero Uint256 value is returned.
func (bc *Blockchain) GetHeaderHash(i int) util.Uint256 {
bc.headerHashesLock.RLock()
defer bc.headerHashesLock.RUnlock()
hashesLen := len(bc.headerHashes)
if hashesLen <= i {
return util.Uint256{}
}
<-bc.headersOpDone
return
return bc.headerHashes[i]
}
// BlockHeight returns the height/index of the highest block.
@ -1022,7 +992,10 @@ func (bc *Blockchain) BlockHeight() uint32 {
// HeaderHeight returns the index/height of the highest header.
func (bc *Blockchain) HeaderHeight() uint32 {
return uint32(bc.headerListLen() - 1)
bc.headerHashesLock.RLock()
n := len(bc.headerHashes)
bc.headerHashesLock.RUnlock()
return uint32(n - 1)
}
// GetContractState returns contract by its script hash.

View file

@ -1,50 +0,0 @@
package core
import (
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/util"
)
// A HeaderHashList represents a list of header hashes.
// This data structure in not routine safe and should be
// used under some kind of protection against race conditions.
type HeaderHashList struct {
hashes []util.Uint256
}
// NewHeaderHashList returns a new pointer to a HeaderHashList.
func NewHeaderHashList(hashes ...util.Uint256) *HeaderHashList {
return &HeaderHashList{
hashes: hashes,
}
}
// Add appends the given hash to the list of hashes.
func (l *HeaderHashList) Add(h ...util.Uint256) {
l.hashes = append(l.hashes, h...)
}
// Len returns the length of the underlying hashes slice.
func (l *HeaderHashList) Len() int {
return len(l.hashes)
}
// Get returns the hash by the given index.
func (l *HeaderHashList) Get(i int) util.Uint256 {
if l.Len() <= i {
return util.Uint256{}
}
return l.hashes[i]
}
// Last return the last hash in the HeaderHashList.
func (l *HeaderHashList) Last() util.Uint256 {
return l.hashes[l.Len()-1]
}
// Write writes n underlying hashes to the given BinWriter
// starting from start.
func (l *HeaderHashList) Write(bw *io.BinWriter, start, n int) error {
bw.WriteArray(l.hashes[start : start+n])
return bw.Err
}