core: remove blockCache, use MemoryStore, redesign persist()

Commit 578ac414d4 was wrong in that it saved
only a part of the block, so depending on how you use blockchain, you may
still see that the block was not really processed properly. To really fix it
this commit introduces intermediate storage layer in form of memStore, which
actually is a MemoryStore that supports full Store API (thus easily fitting
into the existing code) and one extension that allows it to flush its data to
some other Store.

It also changes AddBlock() semantics in that it only accepts now successive
blocks, but when it does it guarantees that they're properly added into the
Blockchain and can be referred to in any way. Pending block queing is now
moved into the server (see 8c0c055ac657813fe3ed10257bce199e9527d5ed).

So the only thing done with persist() now is just a move from memStore to
Store which probably should've always been the case (notice also that
previously headers and some other metadata was written into the Store
bypassing caching/batching mechanism thus leading to some inefficiency).
This commit is contained in:
Roman Khimov 2019-09-26 18:14:00 +03:00
parent c531dc0bde
commit 920d7c610c
8 changed files with 257 additions and 265 deletions

View file

@ -12,25 +12,46 @@ import (
// Accounts is mapping between a account address and AccountState. // Accounts is mapping between a account address and AccountState.
type Accounts map[util.Uint160]*AccountState type Accounts map[util.Uint160]*AccountState
func (a Accounts) getAndUpdate(s storage.Store, hash util.Uint160) (*AccountState, error) { // getAndUpdate retrieves AccountState from temporary or persistent Store
// or creates a new one if it doesn't exist.
func (a Accounts) getAndUpdate(ts storage.Store, ps storage.Store, hash util.Uint160) (*AccountState, error) {
if account, ok := a[hash]; ok { if account, ok := a[hash]; ok {
return account, nil return account, nil
} }
account := &AccountState{} account, err := getAccountStateFromStore(ts, hash)
if err != nil {
if err != storage.ErrKeyNotFound {
return nil, err
}
account, err = getAccountStateFromStore(ps, hash)
if err != nil {
if err != storage.ErrKeyNotFound {
return nil, err
}
account = NewAccountState(hash)
}
}
a[hash] = account
return account, nil
}
// getAccountStateFromStore returns AccountState from the given Store if it's
// present there. Returns nil otherwise.
func getAccountStateFromStore(s storage.Store, hash util.Uint160) (*AccountState, error) {
var account *AccountState
key := storage.AppendPrefix(storage.STAccount, hash.Bytes()) key := storage.AppendPrefix(storage.STAccount, hash.Bytes())
if b, err := s.Get(key); err == nil { b, err := s.Get(key)
if err == nil {
account = new(AccountState)
r := io.NewBinReaderFromBuf(b) r := io.NewBinReaderFromBuf(b)
account.DecodeBinary(r) account.DecodeBinary(r)
if r.Err != nil { if r.Err != nil {
return nil, fmt.Errorf("failed to decode (AccountState): %s", r.Err) return nil, fmt.Errorf("failed to decode (AccountState): %s", r.Err)
} }
} else {
account = NewAccountState(hash)
} }
return account, err
a[hash] = account
return account, nil
} }
// commit writes all account states to the given Batch. // commit writes all account states to the given Batch.

View file

@ -37,9 +37,12 @@ type Blockchain struct {
// Any object that satisfies the BlockchainStorer interface. // Any object that satisfies the BlockchainStorer interface.
storage.Store storage.Store
// In-memory storage to be persisted into the storage.Store
memStore *storage.MemoryStore
// Current index/height of the highest block. // Current index/height of the highest block.
// Read access should always be called by BlockHeight(). // Read access should always be called by BlockHeight().
// Write access should only happen in AddBlock(). // Write access should only happen in storeBlock().
blockHeight uint32 blockHeight uint32
// Current persisted block count. // Current persisted block count.
@ -48,8 +51,6 @@ type Blockchain struct {
// Number of headers stored in the chain file. // Number of headers stored in the chain file.
storedHeaderCount uint32 storedHeaderCount uint32
blockCache *Cache
// All operation on headerList must be called from an // All operation on headerList must be called from an
// headersOp to be routine safe. // headersOp to be routine safe.
headerList *HeaderHashList headerList *HeaderHashList
@ -72,9 +73,9 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha
bc := &Blockchain{ bc := &Blockchain{
config: cfg, config: cfg,
Store: s, Store: s,
memStore: storage.NewMemoryStore(),
headersOp: make(chan headersOpFunc), headersOp: make(chan headersOpFunc),
headersOpDone: make(chan struct{}), headersOpDone: make(chan struct{}),
blockCache: NewCache(),
verifyBlocks: false, verifyBlocks: false,
memPool: NewMemPool(50000), memPool: NewMemPool(50000),
} }
@ -99,7 +100,7 @@ func (bc *Blockchain) init() error {
return err return err
} }
bc.headerList = NewHeaderHashList(genesisBlock.Hash()) bc.headerList = NewHeaderHashList(genesisBlock.Hash())
return bc.persistBlock(genesisBlock) return bc.storeBlock(genesisBlock)
} }
if ver != version { if ver != version {
return fmt.Errorf("storage version mismatch betweeen %s and %s", version, ver) return fmt.Errorf("storage version mismatch betweeen %s and %s", version, ver)
@ -164,6 +165,11 @@ func (bc *Blockchain) Run(ctx context.Context) {
persistTimer := time.NewTimer(persistInterval) persistTimer := time.NewTimer(persistInterval)
defer func() { defer func() {
persistTimer.Stop() persistTimer.Stop()
if err := bc.persist(ctx); err != nil {
log.Warnf("failed to persist: %s", err)
}
// never fails
_ = bc.memStore.Close()
if err := bc.Store.Close(); err != nil { if err := bc.Store.Close(); err != nil {
log.Warnf("failed to close db: %s", err) log.Warnf("failed to close db: %s", err)
} }
@ -187,30 +193,24 @@ func (bc *Blockchain) Run(ctx context.Context) {
} }
} }
// AddBlock processes the given block and will add it to the cache so it // AddBlock accepts successive block for the Blockchain, verifies it and
// can be persisted. // stores internally. Eventually it will be persisted to the backing storage.
func (bc *Blockchain) AddBlock(block *Block) error { func (bc *Blockchain) AddBlock(block *Block) error {
if !bc.blockCache.Has(block.Hash()) { expectedHeight := bc.BlockHeight() + 1
bc.blockCache.Add(block.Hash(), block) if expectedHeight != block.Index {
return fmt.Errorf("expected block %d, but passed block %d", expectedHeight, block.Index)
}
if bc.verifyBlocks && !block.Verify(false) {
return fmt.Errorf("block %s is invalid", block.Hash())
} }
headerLen := bc.headerListLen() headerLen := bc.headerListLen()
if int(block.Index-1) >= headerLen {
return nil
}
if int(block.Index) == headerLen { if int(block.Index) == headerLen {
if bc.verifyBlocks && !block.Verify(false) {
return fmt.Errorf("block %s is invalid", block.Hash())
}
err := bc.AddHeaders(block.Header()) err := bc.AddHeaders(block.Header())
if err != nil { if err != nil {
return err return err
} }
} }
if bc.BlockHeight()+1 == block.Index { return bc.storeBlock(block)
atomic.StoreUint32(&bc.blockHeight, block.Index)
}
return nil
} }
// AddHeaders will process the given headers and add them to the // AddHeaders will process the given headers and add them to the
@ -218,7 +218,7 @@ func (bc *Blockchain) AddBlock(block *Block) error {
func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) { func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) {
var ( var (
start = time.Now() start = time.Now()
batch = bc.Batch() batch = bc.memStore.Batch()
) )
bc.headersOp <- func(headerList *HeaderHashList) { bc.headersOp <- func(headerList *HeaderHashList) {
@ -243,7 +243,7 @@ func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) {
} }
if batch.Len() > 0 { if batch.Len() > 0 {
if err = bc.PutBatch(batch); err != nil { if err = bc.memStore.PutBatch(batch); err != nil {
return return
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -286,13 +286,13 @@ func (bc *Blockchain) processHeader(h *Header, batch storage.Batch, headerList *
return nil return nil
} }
// TODO: persistBlock needs some more love, its implemented as in the original // TODO: storeBlock needs some more love, its implemented as in the original
// project. This for the sake of development speed and understanding of what // project. This for the sake of development speed and understanding of what
// is happening here, quite allot as you can see :). If things are wired together // is happening here, quite allot as you can see :). If things are wired together
// and all tests are in place, we can make a more optimized and cleaner implementation. // and all tests are in place, we can make a more optimized and cleaner implementation.
func (bc *Blockchain) persistBlock(block *Block) error { func (bc *Blockchain) storeBlock(block *Block) error {
var ( var (
batch = bc.Batch() batch = bc.memStore.Batch()
unspentCoins = make(UnspentCoins) unspentCoins = make(UnspentCoins)
spentCoins = make(SpentCoins) spentCoins = make(SpentCoins)
accounts = make(Accounts) accounts = make(Accounts)
@ -314,7 +314,7 @@ func (bc *Blockchain) persistBlock(block *Block) error {
// Process TX outputs. // Process TX outputs.
for _, output := range tx.Outputs { for _, output := range tx.Outputs {
account, err := accounts.getAndUpdate(bc.Store, output.ScriptHash) account, err := accounts.getAndUpdate(bc.memStore, bc.Store, output.ScriptHash)
if err != nil { if err != nil {
return err return err
} }
@ -332,14 +332,14 @@ func (bc *Blockchain) persistBlock(block *Block) error {
return fmt.Errorf("could not find previous TX: %s", prevHash) return fmt.Errorf("could not find previous TX: %s", prevHash)
} }
for _, input := range inputs { for _, input := range inputs {
unspent, err := unspentCoins.getAndUpdate(bc.Store, input.PrevHash) unspent, err := unspentCoins.getAndUpdate(bc.memStore, bc.Store, input.PrevHash)
if err != nil { if err != nil {
return err return err
} }
unspent.states[input.PrevIndex] = CoinStateSpent unspent.states[input.PrevIndex] = CoinStateSpent
prevTXOutput := prevTX.Outputs[input.PrevIndex] prevTXOutput := prevTX.Outputs[input.PrevIndex]
account, err := accounts.getAndUpdate(bc.Store, prevTXOutput.ScriptHash) account, err := accounts.getAndUpdate(bc.memStore, bc.Store, prevTXOutput.ScriptHash)
if err != nil { if err != nil {
return err return err
} }
@ -401,71 +401,45 @@ func (bc *Blockchain) persistBlock(block *Block) error {
if err := assets.commit(batch); err != nil { if err := assets.commit(batch); err != nil {
return err return err
} }
if err := bc.PutBatch(batch); err != nil { if err := bc.memStore.PutBatch(batch); err != nil {
return err return err
} }
bc.persistedHeight = block.Index atomic.StoreUint32(&bc.blockHeight, block.Index)
return nil return nil
} }
// persist flushed current block cache to the persistent storage. // persist flushes current in-memory store contents to the persistent storage.
func (bc *Blockchain) persist(ctx context.Context) (err error) { func (bc *Blockchain) persist(ctx context.Context) error {
var ( var (
start = time.Now() start = time.Now()
persisted = 0 persisted = 0
lenCache = bc.blockCache.Len() err error
) )
if lenCache == 0 { persisted, err = bc.memStore.Persist(bc.Store)
return nil if err != nil {
return err
} }
bHeight, err := storage.CurrentBlockHeight(bc.Store)
bc.headersOp <- func(headerList *HeaderHashList) { if err != nil {
for i := 0; i < lenCache; i++ { return err
if uint32(headerList.Len()) <= bc.BlockHeight() {
return
}
hash := headerList.Get(int(bc.persistedHeight + 1))
if block, ok := bc.blockCache.GetBlock(hash); ok {
if err = bc.persistBlock(block); err != nil {
return
}
bc.blockCache.Delete(hash)
persisted++
} else {
// no next block in the cache, no reason to continue looping
break
}
}
}
select {
case <-ctx.Done():
return
case <-bc.headersOpDone:
//
} }
oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight)
diff := bHeight - oldHeight
if persisted > 0 { if persisted > 0 {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"persisted": persisted, "persistedBlocks": diff,
"headerHeight": bc.HeaderHeight(), "persistedKeys": persisted,
"blockHeight": bc.BlockHeight(), "headerHeight": bc.HeaderHeight(),
"took": time.Since(start), "blockHeight": bc.BlockHeight(),
"persistedHeight": bc.persistedHeight,
"took": time.Since(start),
}).Info("blockchain persist completed") }).Info("blockchain persist completed")
} else {
// So we have some blocks in cache but can't persist them?
// Either there are some stale blocks there or the other way
// around (which was seen in practice) --- there are some fresh
// blocks that we can't persist yet. Some of the latter can be useful
// or can be bogus (higher than the header height we expect at
// the moment). So try to reap oldies and strange newbies, if
// there are any.
bc.blockCache.ReapStrangeBlocks(bc.BlockHeight(), bc.HeaderHeight())
} }
return return nil
} }
func (bc *Blockchain) headerListLen() (n int) { func (bc *Blockchain) headerListLen() (n int) {
@ -478,15 +452,21 @@ func (bc *Blockchain) headerListLen() (n int) {
// GetTransaction returns a TX and its height by the given hash. // GetTransaction returns a TX and its height by the given hash.
func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) { func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) {
if tx, height, ok := bc.blockCache.GetTransaction(hash); ok {
return tx, height, nil
}
if tx, ok := bc.memPool.TryGetValue(hash); ok { if tx, ok := bc.memPool.TryGetValue(hash); ok {
return tx, 0, nil // the height is not actually defined for memPool transaction. Not sure if zero is a good number in this case. return tx, 0, nil // the height is not actually defined for memPool transaction. Not sure if zero is a good number in this case.
} }
tx, height, err := getTransactionFromStore(bc.memStore, hash)
if err != nil {
tx, height, err = getTransactionFromStore(bc.Store, hash)
}
return tx, height, err
}
// getTransactionFromStore returns Transaction and its height by the given hash
// if it exists in the store.
func getTransactionFromStore(s storage.Store, hash util.Uint256) (*transaction.Transaction, uint32, error) {
key := storage.AppendPrefix(storage.DataTransaction, hash.BytesReverse()) key := storage.AppendPrefix(storage.DataTransaction, hash.BytesReverse())
b, err := bc.Get(key) b, err := s.Get(key)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
@ -506,14 +486,9 @@ func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transactio
// GetBlock returns a Block by the given hash. // GetBlock returns a Block by the given hash.
func (bc *Blockchain) GetBlock(hash util.Uint256) (*Block, error) { func (bc *Blockchain) GetBlock(hash util.Uint256) (*Block, error) {
block, ok := bc.blockCache.GetBlock(hash) block, err := getBlockFromStore(bc.memStore, hash)
if !ok { if err != nil {
key := storage.AppendPrefix(storage.DataBlock, hash.BytesReverse()) block, err = getBlockFromStore(bc.Store, hash)
b, err := bc.Get(key)
if err != nil {
return nil, err
}
block, err = NewBlockFromTrimmedBytes(b)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -524,34 +499,54 @@ func (bc *Blockchain) GetBlock(hash util.Uint256) (*Block, error) {
return block, nil return block, nil
} }
// getBlockFromStore returns Block by the given hash if it exists in the store.
func getBlockFromStore(s storage.Store, hash util.Uint256) (*Block, error) {
key := storage.AppendPrefix(storage.DataBlock, hash.BytesReverse())
b, err := s.Get(key)
if err != nil {
return nil, err
}
block, err := NewBlockFromTrimmedBytes(b)
if err != nil {
return nil, err
}
return block, err
}
// GetHeader returns data block header identified with the given hash value. // GetHeader returns data block header identified with the given hash value.
func (bc *Blockchain) GetHeader(hash util.Uint256) (*Header, error) { func (bc *Blockchain) GetHeader(hash util.Uint256) (*Header, error) {
block, ok := bc.blockCache.GetBlock(hash) header, err := getHeaderFromStore(bc.memStore, hash)
if !ok { if err != nil {
b, err := bc.Get(storage.AppendPrefix(storage.DataBlock, hash.BytesReverse())) header, err = getHeaderFromStore(bc.Store, hash)
if err != nil {
return nil, err
}
block, err = NewBlockFromTrimmedBytes(b)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
return header, err
}
// getHeaderFromStore returns Header by the given hash from the store.
func getHeaderFromStore(s storage.Store, hash util.Uint256) (*Header, error) {
block, err := getBlockFromStore(s, hash)
if err != nil {
return nil, err
}
return block.Header(), nil return block.Header(), nil
} }
// HasTransaction return true if the blockchain contains he given // HasTransaction return true if the blockchain contains he given
// transaction hash. // transaction hash.
func (bc *Blockchain) HasTransaction(hash util.Uint256) bool { func (bc *Blockchain) HasTransaction(hash util.Uint256) bool {
if _, _, ok := bc.blockCache.GetTransaction(hash); ok { return bc.memPool.ContainsKey(hash) ||
return true checkTransactionInStore(bc.memStore, hash) ||
} checkTransactionInStore(bc.Store, hash)
if bc.memPool.ContainsKey(hash) { }
return true
}
// checkTransactionInStore returns true if the given store contains the given
// Transaction hash.
func checkTransactionInStore(s storage.Store, hash util.Uint256) bool {
key := storage.AppendPrefix(storage.DataTransaction, hash.BytesReverse()) key := storage.AppendPrefix(storage.DataTransaction, hash.BytesReverse())
if _, err := bc.Get(key); err == nil { if _, err := s.Get(key); err == nil {
return true return true
} }
return false return false
@ -560,10 +555,6 @@ func (bc *Blockchain) HasTransaction(hash util.Uint256) bool {
// HasBlock return true if the blockchain contains the given // HasBlock return true if the blockchain contains the given
// block hash. // block hash.
func (bc *Blockchain) HasBlock(hash util.Uint256) bool { func (bc *Blockchain) HasBlock(hash util.Uint256) bool {
if bc.blockCache.Has(hash) {
return true
}
if header, err := bc.GetHeader(hash); err == nil { if header, err := bc.GetHeader(hash); err == nil {
return header.Index <= bc.BlockHeight() return header.Index <= bc.BlockHeight()
} }
@ -610,31 +601,43 @@ func (bc *Blockchain) HeaderHeight() uint32 {
// GetAssetState returns asset state from its assetID // GetAssetState returns asset state from its assetID
func (bc *Blockchain) GetAssetState(assetID util.Uint256) *AssetState { func (bc *Blockchain) GetAssetState(assetID util.Uint256) *AssetState {
var as *AssetState as := getAssetStateFromStore(bc.memStore, assetID)
bc.Store.Seek(storage.STAsset.Bytes(), func(k, v []byte) { if as == nil {
var a AssetState as = getAssetStateFromStore(bc.Store, assetID)
r := io.NewBinReaderFromBuf(v) }
a.DecodeBinary(r)
if r.Err == nil && a.ID == assetID {
as = &a
}
})
return as return as
} }
// getAssetStateFromStore returns given asset state as recorded in the given
// store.
func getAssetStateFromStore(s storage.Store, assetID util.Uint256) *AssetState {
key := storage.AppendPrefix(storage.STAsset, assetID.Bytes())
asEncoded, err := s.Get(key)
if err != nil {
return nil
}
var a AssetState
r := io.NewBinReaderFromBuf(asEncoded)
a.DecodeBinary(r)
if r.Err != nil || a.ID != assetID {
return nil
}
return &a
}
// GetAccountState returns the account state from its script hash // GetAccountState returns the account state from its script hash
func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *AccountState { func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *AccountState {
var as *AccountState as, err := getAccountStateFromStore(bc.memStore, scriptHash)
bc.Store.Seek(storage.STAccount.Bytes(), func(k, v []byte) { if as == nil {
var a AccountState if err != storage.ErrKeyNotFound {
r := io.NewBinReaderFromBuf(v) log.Warnf("failed to get account state: %s", err)
a.DecodeBinary(r)
if r.Err == nil && a.ScriptHash == scriptHash {
as = &a
} }
}) as, err = getAccountStateFromStore(bc.Store, scriptHash)
if as == nil && err != storage.ErrKeyNotFound {
log.Warnf("failed to get account state: %s", err)
}
}
return as return as
} }

View file

@ -24,7 +24,6 @@ func TestAddHeaders(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
assert.Equal(t, 0, bc.blockCache.Len())
assert.Equal(t, h3.Index, bc.HeaderHeight()) assert.Equal(t, h3.Index, bc.HeaderHeight())
assert.Equal(t, uint32(0), bc.BlockHeight()) assert.Equal(t, uint32(0), bc.BlockHeight())
assert.Equal(t, h3.Hash(), bc.CurrentHeaderHash()) assert.Equal(t, h3.Hash(), bc.CurrentHeaderHash())
@ -57,12 +56,9 @@ func TestAddBlock(t *testing.T) {
} }
lastBlock := blocks[len(blocks)-1] lastBlock := blocks[len(blocks)-1]
assert.Equal(t, 3, bc.blockCache.Len())
assert.Equal(t, lastBlock.Index, bc.HeaderHeight()) assert.Equal(t, lastBlock.Index, bc.HeaderHeight())
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())
t.Log(bc.blockCache)
// This one tests persisting blocks, so it does need to persist() // This one tests persisting blocks, so it does need to persist()
require.NoError(t, bc.persist(context.Background())) require.NoError(t, bc.persist(context.Background()))
@ -75,7 +71,6 @@ func TestAddBlock(t *testing.T) {
assert.Equal(t, lastBlock.Index, bc.BlockHeight()) assert.Equal(t, lastBlock.Index, bc.BlockHeight())
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())
assert.Equal(t, 0, bc.blockCache.Len())
} }
func TestGetHeader(t *testing.T) { func TestGetHeader(t *testing.T) {
@ -153,12 +148,14 @@ func TestHasBlock(t *testing.T) {
} }
func TestGetTransaction(t *testing.T) { func TestGetTransaction(t *testing.T) {
b1 := getDecodedBlock(t, 1)
block := getDecodedBlock(t, 2) block := getDecodedBlock(t, 2)
bc := newTestChain(t) bc := newTestChain(t)
defer func() { defer func() {
require.NoError(t, bc.Close()) require.NoError(t, bc.Close())
}() }()
assert.Nil(t, bc.AddBlock(b1))
assert.Nil(t, bc.AddBlock(block)) assert.Nil(t, bc.AddBlock(block))
// Test unpersisted and persisted access // Test unpersisted and persisted access

View file

@ -1,119 +0,0 @@
package core
import (
"sync"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/util"
)
// Cache is data structure with fixed type key of Uint256, but has a
// generic value. Used for block, tx and header cache types.
type Cache struct {
lock sync.RWMutex
m map[util.Uint256]interface{}
}
// txWithHeight is an ugly wrapper to fit the needs of Blockchain's GetTransaction.
type txWithHeight struct {
tx *transaction.Transaction
height uint32
}
// NewCache returns a ready to use Cache object.
func NewCache() *Cache {
return &Cache{
m: make(map[util.Uint256]interface{}),
}
}
// GetTransaction will return a Transaction type from the cache.
func (c *Cache) GetTransaction(h util.Uint256) (*transaction.Transaction, uint32, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
if v, ok := c.m[h]; ok {
txh, ok := v.(txWithHeight)
if ok {
return txh.tx, txh.height, ok
}
}
return nil, 0, false
}
// GetBlock will return a Block type from the cache.
func (c *Cache) GetBlock(h util.Uint256) (block *Block, ok bool) {
c.lock.RLock()
defer c.lock.RUnlock()
return c.getBlock(h)
}
func (c *Cache) getBlock(h util.Uint256) (block *Block, ok bool) {
if v, b := c.m[h]; b {
block, ok = v.(*Block)
return
}
return
}
// Add adds the given hash along with its value to the cache.
func (c *Cache) Add(h util.Uint256, v interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
c.add(h, v)
}
func (c *Cache) add(h util.Uint256, v interface{}) {
c.m[h] = v
block, ok := v.(*Block)
if ok {
for _, tx := range block.Transactions {
c.m[tx.Hash()] = txWithHeight{tx, block.Index}
}
}
}
func (c *Cache) has(h util.Uint256) bool {
_, ok := c.m[h]
return ok
}
// Has returns whether the cache contains the given hash.
func (c *Cache) Has(h util.Uint256) bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.has(h)
}
// Len return the number of items present in the cache.
func (c *Cache) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()
return len(c.m)
}
// Delete removes the item out of the cache.
func (c *Cache) Delete(h util.Uint256) {
c.lock.Lock()
defer c.lock.Unlock()
block, ok := c.m[h].(*Block)
if ok {
for _, tx := range block.Transactions {
delete(c.m, tx.Hash())
}
}
delete(c.m, h)
}
// ReapStrangeBlocks drops blocks from cache that don't fit into the
// blkHeight-headHeight interval. Cache should only contain blocks that we
// expect to get and store.
func (c *Cache) ReapStrangeBlocks(blkHeight, headHeight uint32) {
c.lock.Lock()
defer c.lock.Unlock()
for i, b := range c.m {
block, ok := b.(*Block)
if ok && (block.Index < blkHeight || block.Index > headHeight) {
delete(c.m, i)
}
}
}

View file

@ -83,6 +83,28 @@ func (s *MemoryStore) Batch() Batch {
} }
} }
// Persist flushes all the MemoryStore contents into the (supposedly) persistent
// store provided via parameter.
func (s *MemoryStore) Persist(ps Store) (int, error) {
s.Lock()
defer s.Unlock()
batch := ps.Batch()
keys := 0
for k, v := range s.mem {
kb, _ := hex.DecodeString(k)
batch.Put(kb, v)
keys++
}
var err error
if keys != 0 {
err = ps.PutBatch(batch)
}
if err == nil {
s.mem = make(map[string][]byte)
}
return keys, err
}
// Close implements Store interface and clears up memory. // Close implements Store interface and clears up memory.
func (s *MemoryStore) Close() error { func (s *MemoryStore) Close() error {
s.Lock() s.Lock()

View file

@ -75,3 +75,54 @@ func TestMemoryStore_Seek(t *testing.T) {
assert.Equal(t, value, v) assert.Equal(t, value, v)
}) })
} }
func TestMemoryStorePersist(t *testing.T) {
// temporary Store
ts := NewMemoryStore()
// persistent Store
ps := NewMemoryStore()
// persisting nothing should do nothing
c, err := ts.Persist(ps)
assert.Equal(t, nil, err)
assert.Equal(t, 0, c)
// persisting one key should result in one key in ps and nothing in ts
assert.NoError(t, ts.Put([]byte("key"), []byte("value")))
c, err = ts.Persist(ps)
assert.Equal(t, nil, err)
assert.Equal(t, 1, c)
v, err := ps.Get([]byte("key"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("value"), v)
v, err = ts.Get([]byte("key"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
// now we overwrite the previous `key` contents and also add `key2`,
assert.NoError(t, ts.Put([]byte("key"), []byte("newvalue")))
assert.NoError(t, ts.Put([]byte("key2"), []byte("value2")))
// this is to check that now key is written into the ps before we do
// persist
v, err = ps.Get([]byte("key2"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
// two keys should be persisted (one overwritten and one new) and
// available in the ps
c, err = ts.Persist(ps)
assert.Equal(t, nil, err)
assert.Equal(t, 2, c)
v, err = ts.Get([]byte("key"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
v, err = ts.Get([]byte("key2"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
v, err = ps.Get([]byte("key"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("newvalue"), v)
v, err = ps.Get([]byte("key2"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("value2"), v)
// we've persisted some values, make sure successive persist is a no-op
c, err = ts.Persist(ps)
assert.Equal(t, nil, err)
assert.Equal(t, 0, c)
}

View file

@ -13,11 +13,36 @@ import (
// coin state. // coin state.
type UnspentCoins map[util.Uint256]*UnspentCoinState type UnspentCoins map[util.Uint256]*UnspentCoinState
func (u UnspentCoins) getAndUpdate(s storage.Store, hash util.Uint256) (*UnspentCoinState, error) { // getAndUpdate retreives UnspentCoinState from temporary or persistent Store
// and return it. If it's not present in both stores, returns a new
// UnspentCoinState.
func (u UnspentCoins) getAndUpdate(ts storage.Store, ps storage.Store, hash util.Uint256) (*UnspentCoinState, error) {
if unspent, ok := u[hash]; ok { if unspent, ok := u[hash]; ok {
return unspent, nil return unspent, nil
} }
unspent, err := getUnspentCoinStateFromStore(ts, hash)
if err != nil {
if err != storage.ErrKeyNotFound {
return nil, err
}
unspent, err = getUnspentCoinStateFromStore(ps, hash)
if err != nil {
if err != storage.ErrKeyNotFound {
return nil, err
}
unspent = &UnspentCoinState{
states: []CoinState{},
}
}
}
u[hash] = unspent
return unspent, nil
}
// getUnspentCoinStateFromStore retrieves UnspentCoinState from the given store
func getUnspentCoinStateFromStore(s storage.Store, hash util.Uint256) (*UnspentCoinState, error) {
unspent := &UnspentCoinState{} unspent := &UnspentCoinState{}
key := storage.AppendPrefix(storage.STCoin, hash.BytesReverse()) key := storage.AppendPrefix(storage.STCoin, hash.BytesReverse())
if b, err := s.Get(key); err == nil { if b, err := s.Get(key); err == nil {
@ -27,12 +52,8 @@ func (u UnspentCoins) getAndUpdate(s storage.Store, hash util.Uint256) (*Unspent
return nil, fmt.Errorf("failed to decode (UnspentCoinState): %s", r.Err) return nil, fmt.Errorf("failed to decode (UnspentCoinState): %s", r.Err)
} }
} else { } else {
unspent = &UnspentCoinState{ return nil, err
states: []CoinState{},
}
} }
u[hash] = unspent
return unspent, nil return unspent, nil
} }

View file

@ -13,7 +13,6 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestRPC(t *testing.T) { func TestRPC(t *testing.T) {
@ -22,9 +21,6 @@ func TestRPC(t *testing.T) {
chain, handler := initServerWithInMemoryChain(ctx, t) chain, handler := initServerWithInMemoryChain(ctx, t)
defer func() {
require.NoError(t, chain.Close())
}()
t.Run("getbestblockhash", func(t *testing.T) { t.Run("getbestblockhash", func(t *testing.T) {
rpc := `{"jsonrpc": "2.0", "id": 1, "method": "getbestblockhash", "params": []}` rpc := `{"jsonrpc": "2.0", "id": 1, "method": "getbestblockhash", "params": []}`
body := doRPCCall(rpc, handler, t) body := doRPCCall(rpc, handler, t)