core: make unpersisted blocks/txs available in Blockchain

This changes the Blockchain to also return unpersisted (theoretically, verified
in the AddBlock!) blocks and transactions, making Add/Get interfaces
symmetrical. It allows to turn Persist into internal method again and makes it
possible to enable transaction check in GetBlock(), thus fixing #366.
This commit is contained in:
Roman Khimov 2019-09-24 18:51:20 +03:00
parent 2daff0957a
commit 578ac414d4
4 changed files with 125 additions and 59 deletions

View file

@ -39,9 +39,12 @@ type Blockchain struct {
// Current index/height of the highest block. // Current index/height of the highest block.
// Read access should always be called by BlockHeight(). // Read access should always be called by BlockHeight().
// Write access should only happen in Persist(). // Write access should only happen in AddBlock().
blockHeight uint32 blockHeight uint32
// Current persisted block count.
persistedHeight uint32
// Number of headers stored in the chain file. // Number of headers stored in the chain file.
storedHeaderCount uint32 storedHeaderCount uint32
@ -112,6 +115,7 @@ func (bc *Blockchain) init() error {
return err return err
} }
bc.blockHeight = bHeight bc.blockHeight = bHeight
bc.persistedHeight = bHeight
hashes, err := storage.HeaderHashes(bc.Store) hashes, err := storage.HeaderHashes(bc.Store)
if err != nil { if err != nil {
@ -170,7 +174,7 @@ func (bc *Blockchain) Run(ctx context.Context) {
bc.headersOpDone <- struct{}{} bc.headersOpDone <- struct{}{}
case <-persistTimer.C: case <-persistTimer.C:
go func() { go func() {
err := bc.Persist(ctx) err := bc.persist(ctx)
if err != nil { if err != nil {
log.Warnf("failed to persist blockchain: %s", err) log.Warnf("failed to persist blockchain: %s", err)
} }
@ -195,7 +199,13 @@ func (bc *Blockchain) AddBlock(block *Block) error {
if bc.verifyBlocks && !block.Verify(false) { if bc.verifyBlocks && !block.Verify(false) {
return fmt.Errorf("block %s is invalid", block.Hash()) return fmt.Errorf("block %s is invalid", block.Hash())
} }
return bc.AddHeaders(block.Header()) err := bc.AddHeaders(block.Header())
if err != nil {
return err
}
}
if bc.BlockHeight()+1 == block.Index {
atomic.StoreUint32(&bc.blockHeight, block.Index)
} }
return nil return nil
} }
@ -392,12 +402,12 @@ func (bc *Blockchain) persistBlock(block *Block) error {
return err return err
} }
atomic.StoreUint32(&bc.blockHeight, block.Index) bc.persistedHeight = block.Index
return nil return nil
} }
//Persist starts persist loop. // persist flushed current block cache to the persistent storage.
func (bc *Blockchain) Persist(ctx context.Context) (err error) { func (bc *Blockchain) persist(ctx context.Context) (err error) {
var ( var (
start = time.Now() start = time.Now()
persisted = 0 persisted = 0
@ -413,7 +423,7 @@ func (bc *Blockchain) Persist(ctx context.Context) (err error) {
if uint32(headerList.Len()) <= bc.BlockHeight() { if uint32(headerList.Len()) <= bc.BlockHeight() {
return return
} }
hash := headerList.Get(int(bc.BlockHeight() + 1)) hash := headerList.Get(int(bc.persistedHeight + 1))
if block, ok := bc.blockCache.GetBlock(hash); ok { if block, ok := bc.blockCache.GetBlock(hash); ok {
if err = bc.persistBlock(block); err != nil { if err = bc.persistBlock(block); err != nil {
return return
@ -465,6 +475,9 @@ func (bc *Blockchain) headerListLen() (n int) {
// GetTransaction returns a TX and its height by the given hash. // GetTransaction returns a TX and its height by the given hash.
func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) { func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) {
if tx, height, ok := bc.blockCache.GetTransaction(hash); ok {
return tx, height, nil
}
if tx, ok := bc.memPool.TryGetValue(hash); ok { if tx, ok := bc.memPool.TryGetValue(hash); ok {
return tx, 0, nil // the height is not actually defined for memPool transaction. Not sure if zero is a good number in this case. return tx, 0, nil // the height is not actually defined for memPool transaction. Not sure if zero is a good number in this case.
} }
@ -490,38 +503,46 @@ func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transactio
// GetBlock returns a Block by the given hash. // GetBlock returns a Block by the given hash.
func (bc *Blockchain) GetBlock(hash util.Uint256) (*Block, error) { func (bc *Blockchain) GetBlock(hash util.Uint256) (*Block, error) {
block, ok := bc.blockCache.GetBlock(hash)
if !ok {
key := storage.AppendPrefix(storage.DataBlock, hash.BytesReverse()) key := storage.AppendPrefix(storage.DataBlock, hash.BytesReverse())
b, err := bc.Get(key) b, err := bc.Get(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
block, err := NewBlockFromTrimmedBytes(b) block, err = NewBlockFromTrimmedBytes(b)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO: persist TX first before we can handle this logic. }
// if len(block.Transactions) == 0 { if len(block.Transactions) == 0 {
// return nil, fmt.Errorf("block has no TX") return nil, fmt.Errorf("only header is available")
// } }
return block, nil return block, nil
} }
// GetHeader returns data block header identified with the given hash value. // GetHeader returns data block header identified with the given hash value.
func (bc *Blockchain) GetHeader(hash util.Uint256) (*Header, error) { func (bc *Blockchain) GetHeader(hash util.Uint256) (*Header, error) {
block, ok := bc.blockCache.GetBlock(hash)
if !ok {
b, err := bc.Get(storage.AppendPrefix(storage.DataBlock, hash.BytesReverse())) b, err := bc.Get(storage.AppendPrefix(storage.DataBlock, hash.BytesReverse()))
if err != nil { if err != nil {
return nil, err return nil, err
} }
block, err := NewBlockFromTrimmedBytes(b) block, err = NewBlockFromTrimmedBytes(b)
if err != nil { if err != nil {
return nil, err return nil, err
} }
}
return block.Header(), nil return block.Header(), nil
} }
// HasTransaction return true if the blockchain contains he given // HasTransaction return true if the blockchain contains he given
// transaction hash. // transaction hash.
func (bc *Blockchain) HasTransaction(hash util.Uint256) bool { func (bc *Blockchain) HasTransaction(hash util.Uint256) bool {
if _, _, ok := bc.blockCache.GetTransaction(hash); ok {
return true
}
if bc.memPool.ContainsKey(hash) { if bc.memPool.ContainsKey(hash) {
return true return true
} }
@ -536,6 +557,10 @@ func (bc *Blockchain) HasTransaction(hash util.Uint256) bool {
// HasBlock return true if the blockchain contains the given // HasBlock return true if the blockchain contains the given
// block hash. // block hash.
func (bc *Blockchain) HasBlock(hash util.Uint256) bool { func (bc *Blockchain) HasBlock(hash util.Uint256) bool {
if bc.blockCache.Has(hash) {
return true
}
if header, err := bc.GetHeader(hash); err == nil { if header, err := bc.GetHeader(hash); err == nil {
return header.Index <= bc.BlockHeight() return header.Index <= bc.BlockHeight()
} }

View file

@ -63,9 +63,8 @@ func TestAddBlock(t *testing.T) {
t.Log(bc.blockCache) t.Log(bc.blockCache)
if err := bc.Persist(context.Background()); err != nil { // This one tests persisting blocks, so it does need to persist()
t.Fatal(err) require.NoError(t, bc.persist(context.Background()))
}
for _, block := range blocks { for _, block := range blocks {
key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesReverse()) key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesReverse())
@ -88,14 +87,18 @@ func TestGetHeader(t *testing.T) {
err := bc.AddBlock(block) err := bc.AddBlock(block)
assert.Nil(t, err) assert.Nil(t, err)
// Test unpersisted and persisted access
for i := 0; i < 2; i++ {
hash := block.Hash() hash := block.Hash()
header, err := bc.GetHeader(hash) header, err := bc.GetHeader(hash)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, block.Header(), header) assert.Equal(t, block.Header(), header)
block = newBlock(2) b2 := newBlock(2)
_, err = bc.GetHeader(block.Hash()) _, err = bc.GetHeader(b2.Hash())
assert.Error(t, err) assert.Error(t, err)
assert.NoError(t, bc.persist(context.Background()))
}
} }
func TestGetBlock(t *testing.T) { func TestGetBlock(t *testing.T) {
@ -111,6 +114,8 @@ func TestGetBlock(t *testing.T) {
} }
} }
// Test unpersisted and persisted access
for j := 0; j < 2; j++ {
for i := 0; i < len(blocks); i++ { for i := 0; i < len(blocks); i++ {
block, err := bc.GetBlock(blocks[i].Hash()) block, err := bc.GetBlock(blocks[i].Hash())
if err != nil { if err != nil {
@ -119,6 +124,8 @@ func TestGetBlock(t *testing.T) {
assert.Equal(t, blocks[i].Index, block.Index) assert.Equal(t, blocks[i].Index, block.Index)
assert.Equal(t, blocks[i].Hash(), block.Hash()) assert.Equal(t, blocks[i].Hash(), block.Hash())
} }
assert.NoError(t, bc.persist(context.Background()))
}
} }
func TestHasBlock(t *testing.T) { func TestHasBlock(t *testing.T) {
@ -133,14 +140,16 @@ func TestHasBlock(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
assert.Nil(t, bc.Persist(context.Background()))
// Test unpersisted and persisted access
for j := 0; j < 2; j++ {
for i := 0; i < len(blocks); i++ { for i := 0; i < len(blocks); i++ {
assert.True(t, bc.HasBlock(blocks[i].Hash())) assert.True(t, bc.HasBlock(blocks[i].Hash()))
} }
newBlock := newBlock(51) newBlock := newBlock(51)
assert.False(t, bc.HasBlock(newBlock.Hash())) assert.False(t, bc.HasBlock(newBlock.Hash()))
assert.NoError(t, bc.persist(context.Background()))
}
} }
func TestGetTransaction(t *testing.T) { func TestGetTransaction(t *testing.T) {
@ -151,12 +160,11 @@ func TestGetTransaction(t *testing.T) {
}() }()
assert.Nil(t, bc.AddBlock(block)) assert.Nil(t, bc.AddBlock(block))
assert.Nil(t, bc.persistBlock(block))
// Test unpersisted and persisted access
for j := 0; j < 2; j++ {
tx, height, err := bc.GetTransaction(block.Transactions[0].Hash()) tx, height, err := bc.GetTransaction(block.Transactions[0].Hash())
if err != nil { require.Nil(t, err)
t.Fatal(err)
}
assert.Equal(t, block.Index, height) assert.Equal(t, block.Index, height)
assert.Equal(t, block.Transactions[0], tx) assert.Equal(t, block.Transactions[0], tx)
assert.Equal(t, 10, io.GetVarSize(tx)) assert.Equal(t, 10, io.GetVarSize(tx))
@ -164,6 +172,8 @@ func TestGetTransaction(t *testing.T) {
assert.Equal(t, 1, io.GetVarSize(tx.Inputs)) assert.Equal(t, 1, io.GetVarSize(tx.Inputs))
assert.Equal(t, 1, io.GetVarSize(tx.Outputs)) assert.Equal(t, 1, io.GetVarSize(tx.Outputs))
assert.Equal(t, 1, io.GetVarSize(tx.Scripts)) assert.Equal(t, 1, io.GetVarSize(tx.Scripts))
assert.NoError(t, bc.persist(context.Background()))
}
} }
func newTestChain(t *testing.T) *Blockchain { func newTestChain(t *testing.T) *Blockchain {

View file

@ -3,6 +3,7 @@ package core
import ( import (
"sync" "sync"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
) )
@ -13,6 +14,12 @@ type Cache struct {
m map[util.Uint256]interface{} m map[util.Uint256]interface{}
} }
// txWithHeight is an ugly wrapper to fit the needs of Blockchain's GetTransaction.
type txWithHeight struct {
tx *transaction.Transaction
height uint32
}
// NewCache returns a ready to use Cache object. // NewCache returns a ready to use Cache object.
func NewCache() *Cache { func NewCache() *Cache {
return &Cache{ return &Cache{
@ -20,6 +27,19 @@ func NewCache() *Cache {
} }
} }
// GetTransaction will return a Transaction type from the cache.
func (c *Cache) GetTransaction(h util.Uint256) (*transaction.Transaction, uint32, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
if v, ok := c.m[h]; ok {
txh, ok := v.(txWithHeight)
if ok {
return txh.tx, txh.height, ok
}
}
return nil, 0, false
}
// GetBlock will return a Block type from the cache. // GetBlock will return a Block type from the cache.
func (c *Cache) GetBlock(h util.Uint256) (block *Block, ok bool) { func (c *Cache) GetBlock(h util.Uint256) (block *Block, ok bool) {
c.lock.RLock() c.lock.RLock()
@ -44,6 +64,12 @@ func (c *Cache) Add(h util.Uint256, v interface{}) {
func (c *Cache) add(h util.Uint256, v interface{}) { func (c *Cache) add(h util.Uint256, v interface{}) {
c.m[h] = v c.m[h] = v
block, ok := v.(*Block)
if ok {
for _, tx := range block.Transactions {
c.m[tx.Hash()] = txWithHeight{tx, block.Index}
}
}
} }
func (c *Cache) has(h util.Uint256) bool { func (c *Cache) has(h util.Uint256) bool {
@ -69,6 +95,12 @@ func (c *Cache) Len() int {
func (c *Cache) Delete(h util.Uint256) { func (c *Cache) Delete(h util.Uint256) {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
block, ok := c.m[h].(*Block)
if ok {
for _, tx := range block.Transactions {
delete(c.m, tx.Hash())
}
}
delete(c.m, h) delete(c.m, h)
} }

View file

@ -167,7 +167,6 @@ func initBlocks(t *testing.T, chain *core.Blockchain) {
for i := 0; i < len(blocks); i++ { for i := 0; i < len(blocks); i++ {
require.NoError(t, chain.AddBlock(blocks[i])) require.NoError(t, chain.AddBlock(blocks[i]))
} }
require.NoError(t, chain.Persist(context.Background()))
} }
func makeBlocks(n int) []*core.Block { func makeBlocks(n int) []*core.Block {