diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 71744e935..b5302bac1 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -100,13 +100,9 @@ type Blockchain struct { generationAmount []int decrementInterval int - // All operations on headerList must be called from an - // headersOp to be routine safe. - headerList *HeaderHashList - - // Only for operating on the headerList. - headersOp chan headersOpFunc - headersOpDone chan struct{} + // Header hashes list with associated lock. + headerHashesLock sync.RWMutex + headerHashes []util.Uint256 // Stop synchronization mechanisms. stopCh chan struct{} @@ -137,8 +133,6 @@ type bcEvent struct { appExecResults []*state.AppExecResult } -type headersOpFunc func(headerList *HeaderHashList) - // NewBlockchain returns a new blockchain object the will use the // given Store as its underlying storage. For it to work correctly you need // to spawn a goroutine for its Run method after this initialization. @@ -156,18 +150,16 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L return nil, err } bc := &Blockchain{ - config: cfg, - dao: dao.NewSimple(s, cfg.Magic), - headersOp: make(chan headersOpFunc), - headersOpDone: make(chan struct{}), - stopCh: make(chan struct{}), - runToExitCh: make(chan struct{}), - memPool: mempool.New(cfg.MemPoolSize), - sbCommittee: committee, - log: log, - events: make(chan bcEvent), - subCh: make(chan interface{}), - unsubCh: make(chan interface{}), + config: cfg, + dao: dao.NewSimple(s, cfg.Magic), + stopCh: make(chan struct{}), + runToExitCh: make(chan struct{}), + memPool: mempool.New(cfg.MemPoolSize), + sbCommittee: committee, + log: log, + events: make(chan bcEvent), + subCh: make(chan interface{}), + unsubCh: make(chan interface{}), generationAmount: genAmount, decrementInterval: decrementInterval, @@ -194,7 +186,7 @@ func (bc *Blockchain) init() error { if err != nil { return err } - bc.headerList = NewHeaderHashList(genesisBlock.Hash()) + bc.headerHashes = []util.Uint256{genesisBlock.Hash()} err = bc.dao.PutCurrentHeader(hashAndIndexToBytes(genesisBlock.Hash(), genesisBlock.Index)) if err != nil { return err @@ -220,20 +212,19 @@ func (bc *Blockchain) init() error { return fmt.Errorf("can't init MPT at height %d: %w", bHeight, err) } - hashes, err := bc.dao.GetHeaderHashes() + bc.headerHashes, err = bc.dao.GetHeaderHashes() if err != nil { return err } - bc.headerList = NewHeaderHashList(hashes...) - bc.storedHeaderCount = uint32(len(hashes)) + bc.storedHeaderCount = uint32(len(bc.headerHashes)) currHeaderHeight, currHeaderHash, err := bc.dao.GetCurrentHeaderHeight() if err != nil { return err } if bc.storedHeaderCount == 0 && currHeaderHeight == 0 { - bc.headerList.Add(currHeaderHash) + bc.headerHashes = append(bc.headerHashes, currHeaderHash) } // There is a high chance that the Node is stopped before the next @@ -242,15 +233,15 @@ func (bc *Blockchain) init() error { if currHeaderHeight >= bc.storedHeaderCount { hash := currHeaderHash var targetHash util.Uint256 - if bc.headerList.Len() > 0 { - targetHash = bc.headerList.Get(bc.headerList.Len() - 1) + if len(bc.headerHashes) > 0 { + targetHash = bc.headerHashes[len(bc.headerHashes)-1] } else { genesisBlock, err := createGenesisBlock(bc.config) if err != nil { return err } targetHash = genesisBlock.Hash() - bc.headerList.Add(targetHash) + bc.headerHashes = append(bc.headerHashes, targetHash) } headers := make([]*block.Header, 0) @@ -264,7 +255,7 @@ func (bc *Blockchain) init() error { } headerSliceReverse(headers) for _, h := range headers { - bc.headerList.Add(h.Hash()) + bc.headerHashes = append(bc.headerHashes, h.Hash()) } } @@ -290,9 +281,6 @@ func (bc *Blockchain) Run() { select { case <-bc.stopCh: return - case op := <-bc.headersOp: - op(bc.headerList) - bc.headersOpDone <- struct{}{} case <-persistTimer.C: go func() { err := bc.persist() @@ -415,8 +403,7 @@ func (bc *Blockchain) AddBlock(block *block.Block) error { return fmt.Errorf("expected %d, got %d: %w", expectedHeight, block.Index, ErrInvalidBlockIndex) } - headerLen := bc.headerListLen() - if int(block.Index) == headerLen { + if block.Index == bc.HeaderHeight()+1 { err := bc.addHeaders(bc.config.VerifyBlocks, block.Header()) if err != nil { return err @@ -490,65 +477,52 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) (err err } } - bc.headersOp <- func(headerList *HeaderHashList) { - oldlen := headerList.Len() - for _, h := range headers { - if int(h.Index-1) >= headerList.Len() { - err = fmt.Errorf( - "height of received header %d is higher then the current header %d", - h.Index, headerList.Len(), - ) - return - } - if int(h.Index) < headerList.Len() { - continue - } - if err = bc.processHeader(h, batch, headerList); err != nil { - return - } - } - - if oldlen != headerList.Len() { - updateHeaderHeightMetric(headerList.Len() - 1) - if err = bc.dao.Store.PutBatch(batch); err != nil { - return - } - bc.log.Debug("done processing headers", - zap.Int("headerIndex", headerList.Len()-1), - zap.Uint32("blockHeight", bc.BlockHeight()), - zap.Duration("took", time.Since(start))) - } - } - <-bc.headersOpDone - return err -} - -// processHeader processes the given header. Note that this is only thread safe -// if executed in headers operation. -func (bc *Blockchain) processHeader(h *block.Header, batch storage.Batch, headerList *HeaderHashList) error { - headerList.Add(h.Hash()) - buf := io.NewBufBinWriter() - for int(h.Index)-headerBatchCount >= int(bc.storedHeaderCount) { - if err := headerList.Write(buf.BinWriter, int(bc.storedHeaderCount), headerBatchCount); err != nil { + bc.headerHashesLock.Lock() + defer bc.headerHashesLock.Unlock() + oldlen := len(bc.headerHashes) + var lastHeader *block.Header + for _, h := range headers { + if int(h.Index-1) >= len(bc.headerHashes) { + return fmt.Errorf("height of received header %d is higher than the current header %d", h.Index, len(bc.headerHashes)) + } + if int(h.Index) < len(bc.headerHashes) { + continue + } + bc.headerHashes = append(bc.headerHashes, h.Hash()) + h.EncodeBinary(buf.BinWriter) + if buf.Err != nil { + return buf.Err + } + + key := storage.AppendPrefix(storage.DataBlock, h.Hash().BytesLE()) + batch.Put(key, buf.Bytes()) + buf.Reset() + lastHeader = h + } + + if oldlen != len(bc.headerHashes) { + for int(lastHeader.Index)-headerBatchCount >= int(bc.storedHeaderCount) { + buf.WriteArray(bc.headerHashes[bc.storedHeaderCount : bc.storedHeaderCount+headerBatchCount]) + if buf.Err != nil { + return buf.Err + } + + key := storage.AppendPrefixInt(storage.IXHeaderHashList, int(bc.storedHeaderCount)) + batch.Put(key, buf.Bytes()) + bc.storedHeaderCount += headerBatchCount + } + + batch.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndexToBytes(lastHeader.Hash(), lastHeader.Index)) + updateHeaderHeightMetric(len(bc.headerHashes) - 1) + if err = bc.dao.Store.PutBatch(batch); err != nil { return err } - key := storage.AppendPrefixInt(storage.IXHeaderHashList, int(bc.storedHeaderCount)) - batch.Put(key, buf.Bytes()) - bc.storedHeaderCount += headerBatchCount - buf.Reset() + bc.log.Debug("done processing headers", + zap.Int("headerIndex", len(bc.headerHashes)-1), + zap.Uint32("blockHeight", bc.BlockHeight()), + zap.Duration("took", time.Since(start))) } - - buf.Reset() - h.EncodeBinary(buf.BinWriter) - if buf.Err != nil { - return buf.Err - } - - key := storage.AppendPrefix(storage.DataBlock, h.Hash().BytesLE()) - batch.Put(key, buf.Bytes()) - batch.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndexToBytes(h.Hash(), h.Index)) - return nil } @@ -902,14 +876,6 @@ func (bc *Blockchain) persist() error { return nil } -func (bc *Blockchain) headerListLen() (n int) { - bc.headersOp <- func(headerList *HeaderHashList) { - n = headerList.Len() - } - <-bc.headersOpDone - return -} - // GetTransaction returns a TX and its height by the given hash. func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) { if tx, ok := bc.memPool.TryGetValue(hash); ok { @@ -988,31 +954,35 @@ func (bc *Blockchain) HasBlock(hash util.Uint256) bool { } // CurrentBlockHash returns the highest processed block hash. -func (bc *Blockchain) CurrentBlockHash() (hash util.Uint256) { - bc.headersOp <- func(headerList *HeaderHashList) { - hash = headerList.Get(int(bc.BlockHeight())) +func (bc *Blockchain) CurrentBlockHash() util.Uint256 { + topBlock := bc.topBlock.Load() + if topBlock != nil { + if tb, ok := topBlock.(*block.Block); ok { + return tb.Hash() + } } - <-bc.headersOpDone - return + return bc.GetHeaderHash(int(bc.BlockHeight())) } // CurrentHeaderHash returns the hash of the latest known header. -func (bc *Blockchain) CurrentHeaderHash() (hash util.Uint256) { - bc.headersOp <- func(headerList *HeaderHashList) { - hash = headerList.Last() - } - <-bc.headersOpDone - return +func (bc *Blockchain) CurrentHeaderHash() util.Uint256 { + bc.headerHashesLock.RLock() + hash := bc.headerHashes[len(bc.headerHashes)-1] + bc.headerHashesLock.RUnlock() + return hash } -// GetHeaderHash returns the hash from the headerList by its -// height/index. -func (bc *Blockchain) GetHeaderHash(i int) (hash util.Uint256) { - bc.headersOp <- func(headerList *HeaderHashList) { - hash = headerList.Get(i) +// GetHeaderHash returns hash of the header/block with specified index, if +// Blockchain doesn't have a hash for this height, zero Uint256 value is returned. +func (bc *Blockchain) GetHeaderHash(i int) util.Uint256 { + bc.headerHashesLock.RLock() + defer bc.headerHashesLock.RUnlock() + + hashesLen := len(bc.headerHashes) + if hashesLen <= i { + return util.Uint256{} } - <-bc.headersOpDone - return + return bc.headerHashes[i] } // BlockHeight returns the height/index of the highest block. @@ -1022,7 +992,10 @@ func (bc *Blockchain) BlockHeight() uint32 { // HeaderHeight returns the index/height of the highest header. func (bc *Blockchain) HeaderHeight() uint32 { - return uint32(bc.headerListLen() - 1) + bc.headerHashesLock.RLock() + n := len(bc.headerHashes) + bc.headerHashesLock.RUnlock() + return uint32(n - 1) } // GetContractState returns contract by its script hash. diff --git a/pkg/core/header_hash_list.go b/pkg/core/header_hash_list.go deleted file mode 100644 index 3a7a1c3f6..000000000 --- a/pkg/core/header_hash_list.go +++ /dev/null @@ -1,50 +0,0 @@ -package core - -import ( - "github.com/nspcc-dev/neo-go/pkg/io" - "github.com/nspcc-dev/neo-go/pkg/util" -) - -// A HeaderHashList represents a list of header hashes. -// This data structure in not routine safe and should be -// used under some kind of protection against race conditions. -type HeaderHashList struct { - hashes []util.Uint256 -} - -// NewHeaderHashList returns a new pointer to a HeaderHashList. -func NewHeaderHashList(hashes ...util.Uint256) *HeaderHashList { - return &HeaderHashList{ - hashes: hashes, - } -} - -// Add appends the given hash to the list of hashes. -func (l *HeaderHashList) Add(h ...util.Uint256) { - l.hashes = append(l.hashes, h...) -} - -// Len returns the length of the underlying hashes slice. -func (l *HeaderHashList) Len() int { - return len(l.hashes) -} - -// Get returns the hash by the given index. -func (l *HeaderHashList) Get(i int) util.Uint256 { - if l.Len() <= i { - return util.Uint256{} - } - return l.hashes[i] -} - -// Last return the last hash in the HeaderHashList. -func (l *HeaderHashList) Last() util.Uint256 { - return l.hashes[l.Len()-1] -} - -// Write writes n underlying hashes to the given BinWriter -// starting from start. -func (l *HeaderHashList) Write(bw *io.BinWriter, start, n int) error { - bw.WriteArray(l.hashes[start : start+n]) - return bw.Err -}