From 920d7c610c171d1b6c5f3185d3016b9c91a848ad Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 26 Sep 2019 18:14:00 +0300 Subject: [PATCH] core: remove blockCache, use MemoryStore, redesign persist() Commit 578ac414d4c5699d2ee65b9f247fa198fed8ae94 was wrong in that it saved only a part of the block, so depending on how you use blockchain, you may still see that the block was not really processed properly. To really fix it this commit introduces intermediate storage layer in form of memStore, which actually is a MemoryStore that supports full Store API (thus easily fitting into the existing code) and one extension that allows it to flush its data to some other Store. It also changes AddBlock() semantics in that it only accepts now successive blocks, but when it does it guarantees that they're properly added into the Blockchain and can be referred to in any way. Pending block queing is now moved into the server (see 8c0c055ac657813fe3ed10257bce199e9527d5ed). So the only thing done with persist() now is just a move from memStore to Store which probably should've always been the case (notice also that previously headers and some other metadata was written into the Store bypassing caching/batching mechanism thus leading to some inefficiency). --- pkg/core/account_state.go | 37 +++- pkg/core/blockchain.go | 249 +++++++++++++------------- pkg/core/blockchain_test.go | 7 +- pkg/core/cache.go | 119 ------------ pkg/core/storage/memory_store.go | 22 +++ pkg/core/storage/memory_store_test.go | 51 ++++++ pkg/core/unspent_coin_state.go | 33 +++- pkg/rpc/server_test.go | 4 - 8 files changed, 257 insertions(+), 265 deletions(-) delete mode 100644 pkg/core/cache.go diff --git a/pkg/core/account_state.go b/pkg/core/account_state.go index 87c9771a0..ae34d573a 100644 --- a/pkg/core/account_state.go +++ b/pkg/core/account_state.go @@ -12,25 +12,46 @@ import ( // Accounts is mapping between a account address and AccountState. type Accounts map[util.Uint160]*AccountState -func (a Accounts) getAndUpdate(s storage.Store, hash util.Uint160) (*AccountState, error) { +// getAndUpdate retrieves AccountState from temporary or persistent Store +// or creates a new one if it doesn't exist. +func (a Accounts) getAndUpdate(ts storage.Store, ps storage.Store, hash util.Uint160) (*AccountState, error) { if account, ok := a[hash]; ok { return account, nil } - account := &AccountState{} + account, err := getAccountStateFromStore(ts, hash) + if err != nil { + if err != storage.ErrKeyNotFound { + return nil, err + } + account, err = getAccountStateFromStore(ps, hash) + if err != nil { + if err != storage.ErrKeyNotFound { + return nil, err + } + account = NewAccountState(hash) + } + } + + a[hash] = account + return account, nil +} + +// getAccountStateFromStore returns AccountState from the given Store if it's +// present there. Returns nil otherwise. +func getAccountStateFromStore(s storage.Store, hash util.Uint160) (*AccountState, error) { + var account *AccountState key := storage.AppendPrefix(storage.STAccount, hash.Bytes()) - if b, err := s.Get(key); err == nil { + b, err := s.Get(key) + if err == nil { + account = new(AccountState) r := io.NewBinReaderFromBuf(b) account.DecodeBinary(r) if r.Err != nil { return nil, fmt.Errorf("failed to decode (AccountState): %s", r.Err) } - } else { - account = NewAccountState(hash) } - - a[hash] = account - return account, nil + return account, err } // commit writes all account states to the given Batch. diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 5705e4425..03e75bedd 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -37,9 +37,12 @@ type Blockchain struct { // Any object that satisfies the BlockchainStorer interface. storage.Store + // In-memory storage to be persisted into the storage.Store + memStore *storage.MemoryStore + // Current index/height of the highest block. // Read access should always be called by BlockHeight(). - // Write access should only happen in AddBlock(). + // Write access should only happen in storeBlock(). blockHeight uint32 // Current persisted block count. @@ -48,8 +51,6 @@ type Blockchain struct { // Number of headers stored in the chain file. storedHeaderCount uint32 - blockCache *Cache - // All operation on headerList must be called from an // headersOp to be routine safe. headerList *HeaderHashList @@ -72,9 +73,9 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha bc := &Blockchain{ config: cfg, Store: s, + memStore: storage.NewMemoryStore(), headersOp: make(chan headersOpFunc), headersOpDone: make(chan struct{}), - blockCache: NewCache(), verifyBlocks: false, memPool: NewMemPool(50000), } @@ -99,7 +100,7 @@ func (bc *Blockchain) init() error { return err } bc.headerList = NewHeaderHashList(genesisBlock.Hash()) - return bc.persistBlock(genesisBlock) + return bc.storeBlock(genesisBlock) } if ver != version { return fmt.Errorf("storage version mismatch betweeen %s and %s", version, ver) @@ -164,6 +165,11 @@ func (bc *Blockchain) Run(ctx context.Context) { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() + if err := bc.persist(ctx); err != nil { + log.Warnf("failed to persist: %s", err) + } + // never fails + _ = bc.memStore.Close() if err := bc.Store.Close(); err != nil { log.Warnf("failed to close db: %s", err) } @@ -187,30 +193,24 @@ func (bc *Blockchain) Run(ctx context.Context) { } } -// AddBlock processes the given block and will add it to the cache so it -// can be persisted. +// AddBlock accepts successive block for the Blockchain, verifies it and +// stores internally. Eventually it will be persisted to the backing storage. func (bc *Blockchain) AddBlock(block *Block) error { - if !bc.blockCache.Has(block.Hash()) { - bc.blockCache.Add(block.Hash(), block) + expectedHeight := bc.BlockHeight() + 1 + if expectedHeight != block.Index { + return fmt.Errorf("expected block %d, but passed block %d", expectedHeight, block.Index) + } + if bc.verifyBlocks && !block.Verify(false) { + return fmt.Errorf("block %s is invalid", block.Hash()) } - headerLen := bc.headerListLen() - if int(block.Index-1) >= headerLen { - return nil - } if int(block.Index) == headerLen { - if bc.verifyBlocks && !block.Verify(false) { - return fmt.Errorf("block %s is invalid", block.Hash()) - } err := bc.AddHeaders(block.Header()) if err != nil { return err } } - if bc.BlockHeight()+1 == block.Index { - atomic.StoreUint32(&bc.blockHeight, block.Index) - } - return nil + return bc.storeBlock(block) } // AddHeaders will process the given headers and add them to the @@ -218,7 +218,7 @@ func (bc *Blockchain) AddBlock(block *Block) error { func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) { var ( start = time.Now() - batch = bc.Batch() + batch = bc.memStore.Batch() ) bc.headersOp <- func(headerList *HeaderHashList) { @@ -243,7 +243,7 @@ func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) { } if batch.Len() > 0 { - if err = bc.PutBatch(batch); err != nil { + if err = bc.memStore.PutBatch(batch); err != nil { return } log.WithFields(log.Fields{ @@ -286,13 +286,13 @@ func (bc *Blockchain) processHeader(h *Header, batch storage.Batch, headerList * return nil } -// TODO: persistBlock needs some more love, its implemented as in the original +// TODO: storeBlock needs some more love, its implemented as in the original // project. This for the sake of development speed and understanding of what // is happening here, quite allot as you can see :). If things are wired together // and all tests are in place, we can make a more optimized and cleaner implementation. -func (bc *Blockchain) persistBlock(block *Block) error { +func (bc *Blockchain) storeBlock(block *Block) error { var ( - batch = bc.Batch() + batch = bc.memStore.Batch() unspentCoins = make(UnspentCoins) spentCoins = make(SpentCoins) accounts = make(Accounts) @@ -314,7 +314,7 @@ func (bc *Blockchain) persistBlock(block *Block) error { // Process TX outputs. for _, output := range tx.Outputs { - account, err := accounts.getAndUpdate(bc.Store, output.ScriptHash) + account, err := accounts.getAndUpdate(bc.memStore, bc.Store, output.ScriptHash) if err != nil { return err } @@ -332,14 +332,14 @@ func (bc *Blockchain) persistBlock(block *Block) error { return fmt.Errorf("could not find previous TX: %s", prevHash) } for _, input := range inputs { - unspent, err := unspentCoins.getAndUpdate(bc.Store, input.PrevHash) + unspent, err := unspentCoins.getAndUpdate(bc.memStore, bc.Store, input.PrevHash) if err != nil { return err } unspent.states[input.PrevIndex] = CoinStateSpent prevTXOutput := prevTX.Outputs[input.PrevIndex] - account, err := accounts.getAndUpdate(bc.Store, prevTXOutput.ScriptHash) + account, err := accounts.getAndUpdate(bc.memStore, bc.Store, prevTXOutput.ScriptHash) if err != nil { return err } @@ -401,71 +401,45 @@ func (bc *Blockchain) persistBlock(block *Block) error { if err := assets.commit(batch); err != nil { return err } - if err := bc.PutBatch(batch); err != nil { + if err := bc.memStore.PutBatch(batch); err != nil { return err } - bc.persistedHeight = block.Index + atomic.StoreUint32(&bc.blockHeight, block.Index) return nil } -// persist flushed current block cache to the persistent storage. -func (bc *Blockchain) persist(ctx context.Context) (err error) { +// persist flushes current in-memory store contents to the persistent storage. +func (bc *Blockchain) persist(ctx context.Context) error { var ( start = time.Now() persisted = 0 - lenCache = bc.blockCache.Len() + err error ) - if lenCache == 0 { - return nil + persisted, err = bc.memStore.Persist(bc.Store) + if err != nil { + return err } - - bc.headersOp <- func(headerList *HeaderHashList) { - for i := 0; i < lenCache; i++ { - if uint32(headerList.Len()) <= bc.BlockHeight() { - return - } - hash := headerList.Get(int(bc.persistedHeight + 1)) - if block, ok := bc.blockCache.GetBlock(hash); ok { - if err = bc.persistBlock(block); err != nil { - return - } - bc.blockCache.Delete(hash) - persisted++ - } else { - // no next block in the cache, no reason to continue looping - break - } - } - } - - select { - case <-ctx.Done(): - return - case <-bc.headersOpDone: - // + bHeight, err := storage.CurrentBlockHeight(bc.Store) + if err != nil { + return err } + oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight) + diff := bHeight - oldHeight if persisted > 0 { log.WithFields(log.Fields{ - "persisted": persisted, - "headerHeight": bc.HeaderHeight(), - "blockHeight": bc.BlockHeight(), - "took": time.Since(start), + "persistedBlocks": diff, + "persistedKeys": persisted, + "headerHeight": bc.HeaderHeight(), + "blockHeight": bc.BlockHeight(), + "persistedHeight": bc.persistedHeight, + "took": time.Since(start), }).Info("blockchain persist completed") - } else { - // So we have some blocks in cache but can't persist them? - // Either there are some stale blocks there or the other way - // around (which was seen in practice) --- there are some fresh - // blocks that we can't persist yet. Some of the latter can be useful - // or can be bogus (higher than the header height we expect at - // the moment). So try to reap oldies and strange newbies, if - // there are any. - bc.blockCache.ReapStrangeBlocks(bc.BlockHeight(), bc.HeaderHeight()) } - return + return nil } func (bc *Blockchain) headerListLen() (n int) { @@ -478,15 +452,21 @@ func (bc *Blockchain) headerListLen() (n int) { // GetTransaction returns a TX and its height by the given hash. func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) { - if tx, height, ok := bc.blockCache.GetTransaction(hash); ok { - return tx, height, nil - } if tx, ok := bc.memPool.TryGetValue(hash); ok { return tx, 0, nil // the height is not actually defined for memPool transaction. Not sure if zero is a good number in this case. } + tx, height, err := getTransactionFromStore(bc.memStore, hash) + if err != nil { + tx, height, err = getTransactionFromStore(bc.Store, hash) + } + return tx, height, err +} +// getTransactionFromStore returns Transaction and its height by the given hash +// if it exists in the store. +func getTransactionFromStore(s storage.Store, hash util.Uint256) (*transaction.Transaction, uint32, error) { key := storage.AppendPrefix(storage.DataTransaction, hash.BytesReverse()) - b, err := bc.Get(key) + b, err := s.Get(key) if err != nil { return nil, 0, err } @@ -506,14 +486,9 @@ func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transactio // GetBlock returns a Block by the given hash. func (bc *Blockchain) GetBlock(hash util.Uint256) (*Block, error) { - block, ok := bc.blockCache.GetBlock(hash) - if !ok { - key := storage.AppendPrefix(storage.DataBlock, hash.BytesReverse()) - b, err := bc.Get(key) - if err != nil { - return nil, err - } - block, err = NewBlockFromTrimmedBytes(b) + block, err := getBlockFromStore(bc.memStore, hash) + if err != nil { + block, err = getBlockFromStore(bc.Store, hash) if err != nil { return nil, err } @@ -524,34 +499,54 @@ func (bc *Blockchain) GetBlock(hash util.Uint256) (*Block, error) { return block, nil } +// getBlockFromStore returns Block by the given hash if it exists in the store. +func getBlockFromStore(s storage.Store, hash util.Uint256) (*Block, error) { + key := storage.AppendPrefix(storage.DataBlock, hash.BytesReverse()) + b, err := s.Get(key) + if err != nil { + return nil, err + } + block, err := NewBlockFromTrimmedBytes(b) + if err != nil { + return nil, err + } + return block, err +} + // GetHeader returns data block header identified with the given hash value. func (bc *Blockchain) GetHeader(hash util.Uint256) (*Header, error) { - block, ok := bc.blockCache.GetBlock(hash) - if !ok { - b, err := bc.Get(storage.AppendPrefix(storage.DataBlock, hash.BytesReverse())) - if err != nil { - return nil, err - } - block, err = NewBlockFromTrimmedBytes(b) + header, err := getHeaderFromStore(bc.memStore, hash) + if err != nil { + header, err = getHeaderFromStore(bc.Store, hash) if err != nil { return nil, err } } + return header, err +} + +// getHeaderFromStore returns Header by the given hash from the store. +func getHeaderFromStore(s storage.Store, hash util.Uint256) (*Header, error) { + block, err := getBlockFromStore(s, hash) + if err != nil { + return nil, err + } return block.Header(), nil } // HasTransaction return true if the blockchain contains he given // transaction hash. func (bc *Blockchain) HasTransaction(hash util.Uint256) bool { - if _, _, ok := bc.blockCache.GetTransaction(hash); ok { - return true - } - if bc.memPool.ContainsKey(hash) { - return true - } + return bc.memPool.ContainsKey(hash) || + checkTransactionInStore(bc.memStore, hash) || + checkTransactionInStore(bc.Store, hash) +} +// checkTransactionInStore returns true if the given store contains the given +// Transaction hash. +func checkTransactionInStore(s storage.Store, hash util.Uint256) bool { key := storage.AppendPrefix(storage.DataTransaction, hash.BytesReverse()) - if _, err := bc.Get(key); err == nil { + if _, err := s.Get(key); err == nil { return true } return false @@ -560,10 +555,6 @@ func (bc *Blockchain) HasTransaction(hash util.Uint256) bool { // HasBlock return true if the blockchain contains the given // block hash. func (bc *Blockchain) HasBlock(hash util.Uint256) bool { - if bc.blockCache.Has(hash) { - return true - } - if header, err := bc.GetHeader(hash); err == nil { return header.Index <= bc.BlockHeight() } @@ -610,31 +601,43 @@ func (bc *Blockchain) HeaderHeight() uint32 { // GetAssetState returns asset state from its assetID func (bc *Blockchain) GetAssetState(assetID util.Uint256) *AssetState { - var as *AssetState - bc.Store.Seek(storage.STAsset.Bytes(), func(k, v []byte) { - var a AssetState - r := io.NewBinReaderFromBuf(v) - a.DecodeBinary(r) - if r.Err == nil && a.ID == assetID { - as = &a - } - }) - + as := getAssetStateFromStore(bc.memStore, assetID) + if as == nil { + as = getAssetStateFromStore(bc.Store, assetID) + } return as } +// getAssetStateFromStore returns given asset state as recorded in the given +// store. +func getAssetStateFromStore(s storage.Store, assetID util.Uint256) *AssetState { + key := storage.AppendPrefix(storage.STAsset, assetID.Bytes()) + asEncoded, err := s.Get(key) + if err != nil { + return nil + } + var a AssetState + r := io.NewBinReaderFromBuf(asEncoded) + a.DecodeBinary(r) + if r.Err != nil || a.ID != assetID { + return nil + } + + return &a +} + // GetAccountState returns the account state from its script hash func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *AccountState { - var as *AccountState - bc.Store.Seek(storage.STAccount.Bytes(), func(k, v []byte) { - var a AccountState - r := io.NewBinReaderFromBuf(v) - a.DecodeBinary(r) - if r.Err == nil && a.ScriptHash == scriptHash { - as = &a + as, err := getAccountStateFromStore(bc.memStore, scriptHash) + if as == nil { + if err != storage.ErrKeyNotFound { + log.Warnf("failed to get account state: %s", err) } - }) - + as, err = getAccountStateFromStore(bc.Store, scriptHash) + if as == nil && err != storage.ErrKeyNotFound { + log.Warnf("failed to get account state: %s", err) + } + } return as } diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index e70c5b66d..7d39d1225 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -24,7 +24,6 @@ func TestAddHeaders(t *testing.T) { t.Fatal(err) } - assert.Equal(t, 0, bc.blockCache.Len()) assert.Equal(t, h3.Index, bc.HeaderHeight()) assert.Equal(t, uint32(0), bc.BlockHeight()) assert.Equal(t, h3.Hash(), bc.CurrentHeaderHash()) @@ -57,12 +56,9 @@ func TestAddBlock(t *testing.T) { } lastBlock := blocks[len(blocks)-1] - assert.Equal(t, 3, bc.blockCache.Len()) assert.Equal(t, lastBlock.Index, bc.HeaderHeight()) assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) - t.Log(bc.blockCache) - // This one tests persisting blocks, so it does need to persist() require.NoError(t, bc.persist(context.Background())) @@ -75,7 +71,6 @@ func TestAddBlock(t *testing.T) { assert.Equal(t, lastBlock.Index, bc.BlockHeight()) assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) - assert.Equal(t, 0, bc.blockCache.Len()) } func TestGetHeader(t *testing.T) { @@ -153,12 +148,14 @@ func TestHasBlock(t *testing.T) { } func TestGetTransaction(t *testing.T) { + b1 := getDecodedBlock(t, 1) block := getDecodedBlock(t, 2) bc := newTestChain(t) defer func() { require.NoError(t, bc.Close()) }() + assert.Nil(t, bc.AddBlock(b1)) assert.Nil(t, bc.AddBlock(block)) // Test unpersisted and persisted access diff --git a/pkg/core/cache.go b/pkg/core/cache.go deleted file mode 100644 index 9208d9be9..000000000 --- a/pkg/core/cache.go +++ /dev/null @@ -1,119 +0,0 @@ -package core - -import ( - "sync" - - "github.com/CityOfZion/neo-go/pkg/core/transaction" - "github.com/CityOfZion/neo-go/pkg/util" -) - -// Cache is data structure with fixed type key of Uint256, but has a -// generic value. Used for block, tx and header cache types. -type Cache struct { - lock sync.RWMutex - m map[util.Uint256]interface{} -} - -// txWithHeight is an ugly wrapper to fit the needs of Blockchain's GetTransaction. -type txWithHeight struct { - tx *transaction.Transaction - height uint32 -} - -// NewCache returns a ready to use Cache object. -func NewCache() *Cache { - return &Cache{ - m: make(map[util.Uint256]interface{}), - } -} - -// GetTransaction will return a Transaction type from the cache. -func (c *Cache) GetTransaction(h util.Uint256) (*transaction.Transaction, uint32, bool) { - c.lock.RLock() - defer c.lock.RUnlock() - if v, ok := c.m[h]; ok { - txh, ok := v.(txWithHeight) - if ok { - return txh.tx, txh.height, ok - } - } - return nil, 0, false -} - -// GetBlock will return a Block type from the cache. -func (c *Cache) GetBlock(h util.Uint256) (block *Block, ok bool) { - c.lock.RLock() - defer c.lock.RUnlock() - return c.getBlock(h) -} - -func (c *Cache) getBlock(h util.Uint256) (block *Block, ok bool) { - if v, b := c.m[h]; b { - block, ok = v.(*Block) - return - } - return -} - -// Add adds the given hash along with its value to the cache. -func (c *Cache) Add(h util.Uint256, v interface{}) { - c.lock.Lock() - defer c.lock.Unlock() - c.add(h, v) -} - -func (c *Cache) add(h util.Uint256, v interface{}) { - c.m[h] = v - block, ok := v.(*Block) - if ok { - for _, tx := range block.Transactions { - c.m[tx.Hash()] = txWithHeight{tx, block.Index} - } - } -} - -func (c *Cache) has(h util.Uint256) bool { - _, ok := c.m[h] - return ok -} - -// Has returns whether the cache contains the given hash. -func (c *Cache) Has(h util.Uint256) bool { - c.lock.Lock() - defer c.lock.Unlock() - return c.has(h) -} - -// Len return the number of items present in the cache. -func (c *Cache) Len() int { - c.lock.RLock() - defer c.lock.RUnlock() - return len(c.m) -} - -// Delete removes the item out of the cache. -func (c *Cache) Delete(h util.Uint256) { - c.lock.Lock() - defer c.lock.Unlock() - block, ok := c.m[h].(*Block) - if ok { - for _, tx := range block.Transactions { - delete(c.m, tx.Hash()) - } - } - delete(c.m, h) -} - -// ReapStrangeBlocks drops blocks from cache that don't fit into the -// blkHeight-headHeight interval. Cache should only contain blocks that we -// expect to get and store. -func (c *Cache) ReapStrangeBlocks(blkHeight, headHeight uint32) { - c.lock.Lock() - defer c.lock.Unlock() - for i, b := range c.m { - block, ok := b.(*Block) - if ok && (block.Index < blkHeight || block.Index > headHeight) { - delete(c.m, i) - } - } -} diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 4c9d2f33f..0f2a92428 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -83,6 +83,28 @@ func (s *MemoryStore) Batch() Batch { } } +// Persist flushes all the MemoryStore contents into the (supposedly) persistent +// store provided via parameter. +func (s *MemoryStore) Persist(ps Store) (int, error) { + s.Lock() + defer s.Unlock() + batch := ps.Batch() + keys := 0 + for k, v := range s.mem { + kb, _ := hex.DecodeString(k) + batch.Put(kb, v) + keys++ + } + var err error + if keys != 0 { + err = ps.PutBatch(batch) + } + if err == nil { + s.mem = make(map[string][]byte) + } + return keys, err +} + // Close implements Store interface and clears up memory. func (s *MemoryStore) Close() error { s.Lock() diff --git a/pkg/core/storage/memory_store_test.go b/pkg/core/storage/memory_store_test.go index 1ad760e3e..30aa526fd 100644 --- a/pkg/core/storage/memory_store_test.go +++ b/pkg/core/storage/memory_store_test.go @@ -75,3 +75,54 @@ func TestMemoryStore_Seek(t *testing.T) { assert.Equal(t, value, v) }) } + +func TestMemoryStorePersist(t *testing.T) { + // temporary Store + ts := NewMemoryStore() + // persistent Store + ps := NewMemoryStore() + // persisting nothing should do nothing + c, err := ts.Persist(ps) + assert.Equal(t, nil, err) + assert.Equal(t, 0, c) + // persisting one key should result in one key in ps and nothing in ts + assert.NoError(t, ts.Put([]byte("key"), []byte("value"))) + c, err = ts.Persist(ps) + assert.Equal(t, nil, err) + assert.Equal(t, 1, c) + v, err := ps.Get([]byte("key")) + assert.Equal(t, nil, err) + assert.Equal(t, []byte("value"), v) + v, err = ts.Get([]byte("key")) + assert.Equal(t, ErrKeyNotFound, err) + assert.Equal(t, []byte(nil), v) + // now we overwrite the previous `key` contents and also add `key2`, + assert.NoError(t, ts.Put([]byte("key"), []byte("newvalue"))) + assert.NoError(t, ts.Put([]byte("key2"), []byte("value2"))) + // this is to check that now key is written into the ps before we do + // persist + v, err = ps.Get([]byte("key2")) + assert.Equal(t, ErrKeyNotFound, err) + assert.Equal(t, []byte(nil), v) + // two keys should be persisted (one overwritten and one new) and + // available in the ps + c, err = ts.Persist(ps) + assert.Equal(t, nil, err) + assert.Equal(t, 2, c) + v, err = ts.Get([]byte("key")) + assert.Equal(t, ErrKeyNotFound, err) + assert.Equal(t, []byte(nil), v) + v, err = ts.Get([]byte("key2")) + assert.Equal(t, ErrKeyNotFound, err) + assert.Equal(t, []byte(nil), v) + v, err = ps.Get([]byte("key")) + assert.Equal(t, nil, err) + assert.Equal(t, []byte("newvalue"), v) + v, err = ps.Get([]byte("key2")) + assert.Equal(t, nil, err) + assert.Equal(t, []byte("value2"), v) + // we've persisted some values, make sure successive persist is a no-op + c, err = ts.Persist(ps) + assert.Equal(t, nil, err) + assert.Equal(t, 0, c) +} diff --git a/pkg/core/unspent_coin_state.go b/pkg/core/unspent_coin_state.go index 9f8026578..1b7aa408c 100644 --- a/pkg/core/unspent_coin_state.go +++ b/pkg/core/unspent_coin_state.go @@ -13,11 +13,36 @@ import ( // coin state. type UnspentCoins map[util.Uint256]*UnspentCoinState -func (u UnspentCoins) getAndUpdate(s storage.Store, hash util.Uint256) (*UnspentCoinState, error) { +// getAndUpdate retreives UnspentCoinState from temporary or persistent Store +// and return it. If it's not present in both stores, returns a new +// UnspentCoinState. +func (u UnspentCoins) getAndUpdate(ts storage.Store, ps storage.Store, hash util.Uint256) (*UnspentCoinState, error) { if unspent, ok := u[hash]; ok { return unspent, nil } + unspent, err := getUnspentCoinStateFromStore(ts, hash) + if err != nil { + if err != storage.ErrKeyNotFound { + return nil, err + } + unspent, err = getUnspentCoinStateFromStore(ps, hash) + if err != nil { + if err != storage.ErrKeyNotFound { + return nil, err + } + unspent = &UnspentCoinState{ + states: []CoinState{}, + } + } + } + + u[hash] = unspent + return unspent, nil +} + +// getUnspentCoinStateFromStore retrieves UnspentCoinState from the given store +func getUnspentCoinStateFromStore(s storage.Store, hash util.Uint256) (*UnspentCoinState, error) { unspent := &UnspentCoinState{} key := storage.AppendPrefix(storage.STCoin, hash.BytesReverse()) if b, err := s.Get(key); err == nil { @@ -27,12 +52,8 @@ func (u UnspentCoins) getAndUpdate(s storage.Store, hash util.Uint256) (*Unspent return nil, fmt.Errorf("failed to decode (UnspentCoinState): %s", r.Err) } } else { - unspent = &UnspentCoinState{ - states: []CoinState{}, - } + return nil, err } - - u[hash] = unspent return unspent, nil } diff --git a/pkg/rpc/server_test.go b/pkg/rpc/server_test.go index 2db969c7c..3d6c139e1 100644 --- a/pkg/rpc/server_test.go +++ b/pkg/rpc/server_test.go @@ -13,7 +13,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestRPC(t *testing.T) { @@ -22,9 +21,6 @@ func TestRPC(t *testing.T) { chain, handler := initServerWithInMemoryChain(ctx, t) - defer func() { - require.NoError(t, chain.Close()) - }() t.Run("getbestblockhash", func(t *testing.T) { rpc := `{"jsonrpc": "2.0", "id": 1, "method": "getbestblockhash", "params": []}` body := doRPCCall(rpc, handler, t)