From fc0031e5aa7a503d5351e4e11f986b655309fbbb Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 16 Oct 2019 16:41:50 +0300 Subject: [PATCH 1/2] core: move write caching layer into MemCacheStore Simplify Blockchain and associated functions, deduplicate code, fix Get() and Seek() implementations. --- pkg/core/account_state.go | 12 +- pkg/core/blockchain.go | 116 ++++++------------- pkg/core/blockchain_test.go | 2 +- pkg/core/interops.go | 6 +- pkg/core/storage/memcached_store.go | 85 ++++++++++++++ pkg/core/storage/memcached_store_test.go | 141 +++++++++++++++++++++++ pkg/core/storage/memory_store.go | 26 ----- pkg/core/storage/memory_store_test.go | 65 ----------- pkg/core/storage/storeandbatch_test.go | 13 ++- pkg/core/unspent_coin_state.go | 14 +-- 10 files changed, 283 insertions(+), 197 deletions(-) create mode 100644 pkg/core/storage/memcached_store.go create mode 100644 pkg/core/storage/memcached_store_test.go diff --git a/pkg/core/account_state.go b/pkg/core/account_state.go index ae34d573a..00fcad55d 100644 --- a/pkg/core/account_state.go +++ b/pkg/core/account_state.go @@ -14,23 +14,17 @@ type Accounts map[util.Uint160]*AccountState // getAndUpdate retrieves AccountState from temporary or persistent Store // or creates a new one if it doesn't exist. -func (a Accounts) getAndUpdate(ts storage.Store, ps storage.Store, hash util.Uint160) (*AccountState, error) { +func (a Accounts) getAndUpdate(s storage.Store, hash util.Uint160) (*AccountState, error) { if account, ok := a[hash]; ok { return account, nil } - account, err := getAccountStateFromStore(ts, hash) + account, err := getAccountStateFromStore(s, hash) if err != nil { if err != storage.ErrKeyNotFound { return nil, err } - account, err = getAccountStateFromStore(ps, hash) - if err != nil { - if err != storage.ErrKeyNotFound { - return nil, err - } - account = NewAccountState(hash) - } + account = NewAccountState(hash) } a[hash] = account diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 50abcd962..899a13bdf 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -43,11 +43,8 @@ var ( type Blockchain struct { config config.ProtocolConfiguration - // Any object that satisfies the BlockchainStorer interface. - storage.Store - - // In-memory storage to be persisted into the storage.Store - memStore *storage.MemoryStore + // Persistent storage wrapped around with a write memory caching layer. + store *storage.MemCachedStore // Current index/height of the highest block. // Read access should always be called by BlockHeight(). @@ -78,8 +75,7 @@ type headersOpFunc func(headerList *HeaderHashList) func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockchain, error) { bc := &Blockchain{ config: cfg, - Store: s, - memStore: storage.NewMemoryStore(), + store: storage.NewMemCachedStore(s), headersOp: make(chan headersOpFunc), headersOpDone: make(chan struct{}), memPool: NewMemPool(50000), @@ -94,10 +90,10 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha func (bc *Blockchain) init() error { // If we could not find the version in the Store, we know that there is nothing stored. - ver, err := storage.Version(bc.Store) + ver, err := storage.Version(bc.store) if err != nil { log.Infof("no storage version found! creating genesis block") - if err = storage.PutVersion(bc.Store, version); err != nil { + if err = storage.PutVersion(bc.store, version); err != nil { return err } genesisBlock, err := createGenesisBlock(bc.config) @@ -116,14 +112,14 @@ func (bc *Blockchain) init() error { // and the genesis block as first block. log.Infof("restoring blockchain with version: %s", version) - bHeight, err := storage.CurrentBlockHeight(bc.Store) + bHeight, err := storage.CurrentBlockHeight(bc.store) if err != nil { return err } bc.blockHeight = bHeight bc.persistedHeight = bHeight - hashes, err := storage.HeaderHashes(bc.Store) + hashes, err := storage.HeaderHashes(bc.store) if err != nil { return err } @@ -131,7 +127,7 @@ func (bc *Blockchain) init() error { bc.headerList = NewHeaderHashList(hashes...) bc.storedHeaderCount = uint32(len(hashes)) - currHeaderHeight, currHeaderHash, err := storage.CurrentHeaderHeight(bc.Store) + currHeaderHeight, currHeaderHash, err := storage.CurrentHeaderHeight(bc.store) if err != nil { return err } @@ -173,9 +169,7 @@ func (bc *Blockchain) Run(ctx context.Context) { if err := bc.persist(ctx); err != nil { log.Warnf("failed to persist: %s", err) } - // never fails - _ = bc.memStore.Close() - if err := bc.Store.Close(); err != nil { + if err := bc.store.Close(); err != nil { log.Warnf("failed to close db: %s", err) } }() @@ -237,7 +231,7 @@ func (bc *Blockchain) AddBlock(block *Block) error { func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) { var ( start = time.Now() - batch = bc.memStore.Batch() + batch = bc.store.Batch() ) bc.headersOp <- func(headerList *HeaderHashList) { @@ -263,7 +257,7 @@ func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) { } if oldlen != headerList.Len() { - if err = bc.memStore.PutBatch(batch); err != nil { + if err = bc.store.PutBatch(batch); err != nil { return } log.WithFields(log.Fields{ @@ -312,7 +306,7 @@ func (bc *Blockchain) processHeader(h *Header, batch storage.Batch, headerList * // and all tests are in place, we can make a more optimized and cleaner implementation. func (bc *Blockchain) storeBlock(block *Block) error { var ( - batch = bc.memStore.Batch() + batch = bc.store.Batch() unspentCoins = make(UnspentCoins) spentCoins = make(SpentCoins) accounts = make(Accounts) @@ -335,7 +329,7 @@ func (bc *Blockchain) storeBlock(block *Block) error { // Process TX outputs. for _, output := range tx.Outputs { - account, err := accounts.getAndUpdate(bc.memStore, bc.Store, output.ScriptHash) + account, err := accounts.getAndUpdate(bc.store, output.ScriptHash) if err != nil { return err } @@ -353,14 +347,14 @@ func (bc *Blockchain) storeBlock(block *Block) error { return fmt.Errorf("could not find previous TX: %s", prevHash) } for _, input := range inputs { - unspent, err := unspentCoins.getAndUpdate(bc.memStore, bc.Store, input.PrevHash) + unspent, err := unspentCoins.getAndUpdate(bc.store, input.PrevHash) if err != nil { return err } unspent.states[input.PrevIndex] = CoinStateSpent prevTXOutput := prevTX.Outputs[input.PrevIndex] - account, err := accounts.getAndUpdate(bc.memStore, bc.Store, prevTXOutput.ScriptHash) + account, err := accounts.getAndUpdate(bc.store, prevTXOutput.ScriptHash) if err != nil { return err } @@ -421,13 +415,13 @@ func (bc *Blockchain) storeBlock(block *Block) error { return cs.Script }) - systemInterop := newInteropContext(0x10, bc, block, tx) + systemInterop := newInteropContext(0x10, bc, bc.store, block, tx) vm.RegisterInteropFuncs(systemInterop.getSystemInteropMap()) vm.RegisterInteropFuncs(systemInterop.getNeoInteropMap()) vm.LoadScript(t.Script) vm.Run() if !vm.HasFailed() { - _, err := systemInterop.mem.Persist(bc.memStore) + _, err := systemInterop.mem.Persist() if err != nil { return errors.Wrap(err, "failed to persist invocation results") } @@ -456,7 +450,7 @@ func (bc *Blockchain) storeBlock(block *Block) error { if err := contracts.commit(batch); err != nil { return err } - if err := bc.memStore.PutBatch(batch); err != nil { + if err := bc.store.PutBatch(batch); err != nil { return err } @@ -472,11 +466,11 @@ func (bc *Blockchain) persist(ctx context.Context) error { err error ) - persisted, err = bc.memStore.Persist(bc.Store) + persisted, err = bc.store.Persist() if err != nil { return err } - bHeight, err := storage.CurrentBlockHeight(bc.Store) + bHeight, err := storage.CurrentBlockHeight(bc.store) if err != nil { return err } @@ -510,11 +504,7 @@ func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transactio if tx, ok := bc.memPool.TryGetValue(hash); ok { return tx, 0, nil // the height is not actually defined for memPool transaction. Not sure if zero is a good number in this case. } - tx, height, err := getTransactionFromStore(bc.memStore, hash) - if err != nil { - tx, height, err = getTransactionFromStore(bc.Store, hash) - } - return tx, height, err + return getTransactionFromStore(bc.store, hash) } // getTransactionFromStore returns Transaction and its height by the given hash @@ -541,11 +531,7 @@ func getTransactionFromStore(s storage.Store, hash util.Uint256) (*transaction.T // GetStorageItem returns an item from storage. func (bc *Blockchain) GetStorageItem(scripthash util.Uint160, key []byte) *StorageItem { - sItem := getStorageItemFromStore(bc.memStore, scripthash, key) - if sItem == nil { - sItem = getStorageItemFromStore(bc.Store, scripthash, key) - } - return sItem + return getStorageItemFromStore(bc.store, scripthash, key) } // GetStorageItems returns all storage items for a given scripthash. @@ -568,8 +554,7 @@ func (bc *Blockchain) GetStorageItems(hash util.Uint160) (map[string]*StorageIte // Cut prefix and hash. siMap[string(k[21:])] = si } - bc.memStore.Seek(storage.AppendPrefix(storage.STStorage, hash.BytesReverse()), saveToMap) - bc.Store.Seek(storage.AppendPrefix(storage.STStorage, hash.BytesReverse()), saveToMap) + bc.store.Seek(storage.AppendPrefix(storage.STStorage, hash.BytesReverse()), saveToMap) if err != nil { return nil, err } @@ -578,12 +563,9 @@ func (bc *Blockchain) GetStorageItems(hash util.Uint160) (map[string]*StorageIte // GetBlock returns a Block by the given hash. func (bc *Blockchain) GetBlock(hash util.Uint256) (*Block, error) { - block, err := getBlockFromStore(bc.memStore, hash) + block, err := getBlockFromStore(bc.store, hash) if err != nil { - block, err = getBlockFromStore(bc.Store, hash) - if err != nil { - return nil, err - } + return nil, err } if len(block.Transactions) == 0 { return nil, fmt.Errorf("only header is available") @@ -614,14 +596,7 @@ func getBlockFromStore(s storage.Store, hash util.Uint256) (*Block, error) { // GetHeader returns data block header identified with the given hash value. func (bc *Blockchain) GetHeader(hash util.Uint256) (*Header, error) { - header, err := getHeaderFromStore(bc.memStore, hash) - if err != nil { - header, err = getHeaderFromStore(bc.Store, hash) - if err != nil { - return nil, err - } - } - return header, err + return getHeaderFromStore(bc.store, hash) } // getHeaderFromStore returns Header by the given hash from the store. @@ -637,8 +612,7 @@ func getHeaderFromStore(s storage.Store, hash util.Uint256) (*Header, error) { // transaction hash. func (bc *Blockchain) HasTransaction(hash util.Uint256) bool { return bc.memPool.ContainsKey(hash) || - checkTransactionInStore(bc.memStore, hash) || - checkTransactionInStore(bc.Store, hash) + checkTransactionInStore(bc.store, hash) } // checkTransactionInStore returns true if the given store contains the given @@ -700,11 +674,7 @@ func (bc *Blockchain) HeaderHeight() uint32 { // GetAssetState returns asset state from its assetID func (bc *Blockchain) GetAssetState(assetID util.Uint256) *AssetState { - as := getAssetStateFromStore(bc.memStore, assetID) - if as == nil { - as = getAssetStateFromStore(bc.Store, assetID) - } - return as + return getAssetStateFromStore(bc.store, assetID) } // getAssetStateFromStore returns given asset state as recorded in the given @@ -727,11 +697,7 @@ func getAssetStateFromStore(s storage.Store, assetID util.Uint256) *AssetState { // GetContractState returns contract by its script hash. func (bc *Blockchain) GetContractState(hash util.Uint160) *ContractState { - cs := getContractStateFromStore(bc.memStore, hash) - if cs == nil { - cs = getContractStateFromStore(bc.Store, hash) - } - return cs + return getContractStateFromStore(bc.store, hash) } // getContractStateFromStore returns contract state as recorded in the given @@ -754,24 +720,18 @@ func getContractStateFromStore(s storage.Store, hash util.Uint160) *ContractStat // GetAccountState returns the account state from its script hash func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *AccountState { - as, err := getAccountStateFromStore(bc.memStore, scriptHash) - if as == nil { - if err != storage.ErrKeyNotFound { - log.Warnf("failed to get account state: %s", err) - } - as, err = getAccountStateFromStore(bc.Store, scriptHash) - if as == nil && err != storage.ErrKeyNotFound { - log.Warnf("failed to get account state: %s", err) - } + as, err := getAccountStateFromStore(bc.store, scriptHash) + if as == nil && err != storage.ErrKeyNotFound { + log.Warnf("failed to get account state: %s", err) } return as } // GetUnspentCoinState returns unspent coin state for given tx hash. func (bc *Blockchain) GetUnspentCoinState(hash util.Uint256) *UnspentCoinState { - ucs, err := getUnspentCoinStateFromStore(bc.memStore, hash) - if err != nil { - ucs, _ = getUnspentCoinStateFromStore(bc.Store, hash) + ucs, err := getUnspentCoinStateFromStore(bc.store, hash) + if ucs == nil && err != storage.ErrKeyNotFound { + log.Warnf("failed to get unspent coin state: %s", err) } return ucs } @@ -872,7 +832,7 @@ func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *Block) error { if ok := bc.memPool.Verify(t); !ok { return errors.New("invalid transaction due to conflicts with the memory pool") } - if IsDoubleSpend(bc.Store, t) { + if IsDoubleSpend(bc.store, t) { return errors.New("invalid transaction caused by double spending") } if err := bc.verifyOutputs(t); err != nil { @@ -1180,7 +1140,7 @@ func (bc *Blockchain) verifyTxWitnesses(t *transaction.Transaction, block *Block } sort.Slice(hashes, func(i, j int) bool { return hashes[i].Less(hashes[j]) }) sort.Slice(witnesses, func(i, j int) bool { return witnesses[i].ScriptHash().Less(witnesses[j].ScriptHash()) }) - interopCtx := newInteropContext(0, bc, block, t) + interopCtx := newInteropContext(0, bc, bc.store, block, t) for i := 0; i < len(hashes); i++ { err := bc.verifyHashAgainstScript(hashes[i], witnesses[i], t.VerificationHash(), interopCtx) if err != nil { @@ -1200,7 +1160,7 @@ func (bc *Blockchain) verifyBlockWitnesses(block *Block, prevHeader *Header) err } else { hash = prevHeader.NextConsensus } - interopCtx := newInteropContext(0, bc, nil, nil) + interopCtx := newInteropContext(0, bc, bc.store, nil, nil) return bc.verifyHashAgainstScript(hash, block.Script, block.VerificationHash(), interopCtx) } diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index bf88230e3..71756941d 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -57,7 +57,7 @@ func TestAddBlock(t *testing.T) { for _, block := range blocks { key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesReverse()) - if _, err := bc.Get(key); err != nil { + if _, err := bc.store.Get(key); err != nil { t.Fatalf("block %s not persisted", block.Hash()) } } diff --git a/pkg/core/interops.go b/pkg/core/interops.go index 02ad6ea23..9f7782a86 100644 --- a/pkg/core/interops.go +++ b/pkg/core/interops.go @@ -18,11 +18,11 @@ type interopContext struct { trigger byte block *Block tx *transaction.Transaction - mem *storage.MemoryStore + mem *storage.MemCachedStore } -func newInteropContext(trigger byte, bc Blockchainer, block *Block, tx *transaction.Transaction) *interopContext { - mem := storage.NewMemoryStore() +func newInteropContext(trigger byte, bc Blockchainer, s storage.Store, block *Block, tx *transaction.Transaction) *interopContext { + mem := storage.NewMemCachedStore(s) return &interopContext{bc, trigger, block, tx, mem} } diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go new file mode 100644 index 000000000..de13fcada --- /dev/null +++ b/pkg/core/storage/memcached_store.go @@ -0,0 +1,85 @@ +package storage + +// MemCachedStore is a wrapper around persistent store that caches all changes +// being made for them to be later flushed in one batch. +type MemCachedStore struct { + MemoryStore + + // Persistent Store. + ps Store +} + +// NewMemCachedStore creates a new MemCachedStore object. +func NewMemCachedStore(lower Store) *MemCachedStore { + return &MemCachedStore{ + MemoryStore: *NewMemoryStore(), + ps: lower, + } +} + +// Get implements the Store interface. +func (s *MemCachedStore) Get(key []byte) ([]byte, error) { + s.mut.RLock() + defer s.mut.RUnlock() + k := string(key) + if val, ok := s.mem[k]; ok { + return val, nil + } + if _, ok := s.del[k]; ok { + return nil, ErrKeyNotFound + } + return s.ps.Get(key) +} + +// Seek implements the Store interface. +func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { + s.mut.RLock() + defer s.mut.RUnlock() + s.MemoryStore.Seek(key, f) + s.ps.Seek(key, func(k, v []byte) { + elem := string(k) + // If it's in mem, we already called f() for it in MemoryStore.Seek(). + _, present := s.mem[elem] + if !present { + // If it's in del, we shouldn't be calling f() anyway. + _, present = s.del[elem] + } + if !present { + f(k, v) + } + }) +} + +// Persist flushes all the MemoryStore contents into the (supposedly) persistent +// store ps. +func (s *MemCachedStore) Persist() (int, error) { + s.mut.Lock() + defer s.mut.Unlock() + batch := s.ps.Batch() + keys, dkeys := 0, 0 + for k, v := range s.mem { + batch.Put([]byte(k), v) + keys++ + } + for k := range s.del { + batch.Delete([]byte(k)) + dkeys++ + } + var err error + if keys != 0 || dkeys != 0 { + err = s.ps.PutBatch(batch) + } + if err == nil { + s.mem = make(map[string][]byte) + s.del = make(map[string]bool) + } + return keys, err +} + +// Close implements Store interface, clears up memory and closes the lower layer +// Store. +func (s *MemCachedStore) Close() error { + // It's always successful. + _ = s.MemoryStore.Close() + return s.ps.Close() +} diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go new file mode 100644 index 000000000..b99fc6330 --- /dev/null +++ b/pkg/core/storage/memcached_store_test.go @@ -0,0 +1,141 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMemCachedStorePersist(t *testing.T) { + // persistent Store + ps := NewMemoryStore() + // cached Store + ts := NewMemCachedStore(ps) + // persisting nothing should do nothing + c, err := ts.Persist() + assert.Equal(t, nil, err) + assert.Equal(t, 0, c) + // persisting one key should result in one key in ps and nothing in ts + assert.NoError(t, ts.Put([]byte("key"), []byte("value"))) + c, err = ts.Persist() + assert.Equal(t, nil, err) + assert.Equal(t, 1, c) + v, err := ps.Get([]byte("key")) + assert.Equal(t, nil, err) + assert.Equal(t, []byte("value"), v) + v, err = ts.MemoryStore.Get([]byte("key")) + assert.Equal(t, ErrKeyNotFound, err) + assert.Equal(t, []byte(nil), v) + // now we overwrite the previous `key` contents and also add `key2`, + assert.NoError(t, ts.Put([]byte("key"), []byte("newvalue"))) + assert.NoError(t, ts.Put([]byte("key2"), []byte("value2"))) + // this is to check that now key is written into the ps before we do + // persist + v, err = ps.Get([]byte("key2")) + assert.Equal(t, ErrKeyNotFound, err) + assert.Equal(t, []byte(nil), v) + // two keys should be persisted (one overwritten and one new) and + // available in the ps + c, err = ts.Persist() + assert.Equal(t, nil, err) + assert.Equal(t, 2, c) + v, err = ts.MemoryStore.Get([]byte("key")) + assert.Equal(t, ErrKeyNotFound, err) + assert.Equal(t, []byte(nil), v) + v, err = ts.MemoryStore.Get([]byte("key2")) + assert.Equal(t, ErrKeyNotFound, err) + assert.Equal(t, []byte(nil), v) + v, err = ps.Get([]byte("key")) + assert.Equal(t, nil, err) + assert.Equal(t, []byte("newvalue"), v) + v, err = ps.Get([]byte("key2")) + assert.Equal(t, nil, err) + assert.Equal(t, []byte("value2"), v) + // we've persisted some values, make sure successive persist is a no-op + c, err = ts.Persist() + assert.Equal(t, nil, err) + assert.Equal(t, 0, c) + // test persisting deletions + err = ts.Delete([]byte("key")) + assert.Equal(t, nil, err) + c, err = ts.Persist() + assert.Equal(t, nil, err) + assert.Equal(t, 0, c) + v, err = ps.Get([]byte("key")) + assert.Equal(t, ErrKeyNotFound, err) + assert.Equal(t, []byte(nil), v) + v, err = ps.Get([]byte("key2")) + assert.Equal(t, nil, err) + assert.Equal(t, []byte("value2"), v) +} + +func TestCachedGetFromPersistent(t *testing.T) { + key := []byte("key") + value := []byte("value") + ps := NewMemoryStore() + ts := NewMemCachedStore(ps) + + assert.NoError(t, ps.Put(key, value)) + val, err := ts.Get(key) + assert.Nil(t, err) + assert.Equal(t, value, val) + assert.NoError(t, ts.Delete(key)) + val, err = ts.Get(key) + assert.Equal(t, err, ErrKeyNotFound) + assert.Nil(t, val) +} + +func TestCachedSeek(t *testing.T) { + var ( + // Given this prefix... + goodPrefix = []byte{'f'} + // these pairs should be found... + lowerKVs = []kvSeen{ + {[]byte("foo"), []byte("bar"), false}, + {[]byte("faa"), []byte("bra"), false}, + } + // and these should be not. + deletedKVs = []kvSeen{ + {[]byte("fee"), []byte("pow"), false}, + {[]byte("fii"), []byte("qaz"), false}, + } + // and these should be not. + updatedKVs = []kvSeen{ + {[]byte("fuu"), []byte("wop"), false}, + {[]byte("fyy"), []byte("zaq"), false}, + } + ps = NewMemoryStore() + ts = NewMemCachedStore(ps) + ) + for _, v := range lowerKVs { + require.NoError(t, ps.Put(v.key, v.val)) + } + for _, v := range deletedKVs { + require.NoError(t, ps.Put(v.key, v.val)) + require.NoError(t, ts.Delete(v.key)) + } + for _, v := range updatedKVs { + require.NoError(t, ps.Put(v.key, []byte("stub"))) + require.NoError(t, ts.Put(v.key, v.val)) + } + foundKVs := make(map[string][]byte) + ts.Seek(goodPrefix, func(k, v []byte) { + foundKVs[string(k)] = v + }) + assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs)) + for _, kv := range lowerKVs { + assert.Equal(t, kv.val, foundKVs[string(kv.key)]) + } + for _, kv := range deletedKVs { + _, ok := foundKVs[string(kv.key)] + assert.Equal(t, false, ok) + } + for _, kv := range updatedKVs { + assert.Equal(t, kv.val, foundKVs[string(kv.key)]) + } +} + +func newMemCachedStoreForTesting(t *testing.T) Store { + return NewMemCachedStore(NewMemoryStore()) +} diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 9e74b977c..06609326f 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -114,32 +114,6 @@ func newMemoryBatch() *MemoryBatch { return &MemoryBatch{MemoryStore: *NewMemoryStore()} } -// Persist flushes all the MemoryStore contents into the (supposedly) persistent -// store provided via parameter. -func (s *MemoryStore) Persist(ps Store) (int, error) { - s.mut.Lock() - defer s.mut.Unlock() - batch := ps.Batch() - keys, dkeys := 0, 0 - for k, v := range s.mem { - batch.Put([]byte(k), v) - keys++ - } - for k := range s.del { - batch.Delete([]byte(k)) - dkeys++ - } - var err error - if keys != 0 || dkeys != 0 { - err = ps.PutBatch(batch) - } - if err == nil { - s.mem = make(map[string][]byte) - s.del = make(map[string]bool) - } - return keys, err -} - // Close implements Store interface and clears up memory. Never returns an // error. func (s *MemoryStore) Close() error { diff --git a/pkg/core/storage/memory_store_test.go b/pkg/core/storage/memory_store_test.go index 1cf9f6d9a..259dbae68 100644 --- a/pkg/core/storage/memory_store_test.go +++ b/pkg/core/storage/memory_store_test.go @@ -2,73 +2,8 @@ package storage import ( "testing" - - "github.com/stretchr/testify/assert" ) -func TestMemoryStorePersist(t *testing.T) { - // temporary Store - ts := NewMemoryStore() - // persistent Store - ps := NewMemoryStore() - // persisting nothing should do nothing - c, err := ts.Persist(ps) - assert.Equal(t, nil, err) - assert.Equal(t, 0, c) - // persisting one key should result in one key in ps and nothing in ts - assert.NoError(t, ts.Put([]byte("key"), []byte("value"))) - c, err = ts.Persist(ps) - assert.Equal(t, nil, err) - assert.Equal(t, 1, c) - v, err := ps.Get([]byte("key")) - assert.Equal(t, nil, err) - assert.Equal(t, []byte("value"), v) - v, err = ts.Get([]byte("key")) - assert.Equal(t, ErrKeyNotFound, err) - assert.Equal(t, []byte(nil), v) - // now we overwrite the previous `key` contents and also add `key2`, - assert.NoError(t, ts.Put([]byte("key"), []byte("newvalue"))) - assert.NoError(t, ts.Put([]byte("key2"), []byte("value2"))) - // this is to check that now key is written into the ps before we do - // persist - v, err = ps.Get([]byte("key2")) - assert.Equal(t, ErrKeyNotFound, err) - assert.Equal(t, []byte(nil), v) - // two keys should be persisted (one overwritten and one new) and - // available in the ps - c, err = ts.Persist(ps) - assert.Equal(t, nil, err) - assert.Equal(t, 2, c) - v, err = ts.Get([]byte("key")) - assert.Equal(t, ErrKeyNotFound, err) - assert.Equal(t, []byte(nil), v) - v, err = ts.Get([]byte("key2")) - assert.Equal(t, ErrKeyNotFound, err) - assert.Equal(t, []byte(nil), v) - v, err = ps.Get([]byte("key")) - assert.Equal(t, nil, err) - assert.Equal(t, []byte("newvalue"), v) - v, err = ps.Get([]byte("key2")) - assert.Equal(t, nil, err) - assert.Equal(t, []byte("value2"), v) - // we've persisted some values, make sure successive persist is a no-op - c, err = ts.Persist(ps) - assert.Equal(t, nil, err) - assert.Equal(t, 0, c) - // test persisting deletions - err = ts.Delete([]byte("key")) - assert.Equal(t, nil, err) - c, err = ts.Persist(ps) - assert.Equal(t, nil, err) - assert.Equal(t, 0, c) - v, err = ps.Get([]byte("key")) - assert.Equal(t, ErrKeyNotFound, err) - assert.Equal(t, []byte(nil), v) - v, err = ps.Get([]byte("key2")) - assert.Equal(t, nil, err) - assert.Equal(t, []byte("value2"), v) -} - func newMemoryStoreForTesting(t *testing.T) Store { return NewMemoryStore() } diff --git a/pkg/core/storage/storeandbatch_test.go b/pkg/core/storage/storeandbatch_test.go index 76ecbe754..c71f8cb1a 100644 --- a/pkg/core/storage/storeandbatch_test.go +++ b/pkg/core/storage/storeandbatch_test.go @@ -9,6 +9,13 @@ import ( "github.com/stretchr/testify/require" ) +// kvSeen is used to test Seek implementations. +type kvSeen struct { + key []byte + val []byte + seen bool +} + type dbSetup struct { name string create func(*testing.T) Store @@ -66,11 +73,6 @@ func testStorePutBatch(t *testing.T, s Store) { } func testStoreSeek(t *testing.T, s Store) { - type kvSeen struct { - key []byte - val []byte - seen bool - } var ( // Given this prefix... goodprefix = []byte{'f'} @@ -219,6 +221,7 @@ func TestAllDBs(t *testing.T) { var DBs = []dbSetup{ {"BoltDB", newBoltStoreForTesting}, {"LevelDB", newLevelDBForTesting}, + {"MemCached", newMemCachedStoreForTesting}, {"Memory", newMemoryStoreForTesting}, {"RedisDB", newRedisStoreForTesting}, } diff --git a/pkg/core/unspent_coin_state.go b/pkg/core/unspent_coin_state.go index 1b7aa408c..9901b9d2b 100644 --- a/pkg/core/unspent_coin_state.go +++ b/pkg/core/unspent_coin_state.go @@ -16,24 +16,18 @@ type UnspentCoins map[util.Uint256]*UnspentCoinState // getAndUpdate retreives UnspentCoinState from temporary or persistent Store // and return it. If it's not present in both stores, returns a new // UnspentCoinState. -func (u UnspentCoins) getAndUpdate(ts storage.Store, ps storage.Store, hash util.Uint256) (*UnspentCoinState, error) { +func (u UnspentCoins) getAndUpdate(s storage.Store, hash util.Uint256) (*UnspentCoinState, error) { if unspent, ok := u[hash]; ok { return unspent, nil } - unspent, err := getUnspentCoinStateFromStore(ts, hash) + unspent, err := getUnspentCoinStateFromStore(s, hash) if err != nil { if err != storage.ErrKeyNotFound { return nil, err } - unspent, err = getUnspentCoinStateFromStore(ps, hash) - if err != nil { - if err != storage.ErrKeyNotFound { - return nil, err - } - unspent = &UnspentCoinState{ - states: []CoinState{}, - } + unspent = &UnspentCoinState{ + states: []CoinState{}, } } From 2245fedbb1568d31928949a297ec5ec1d34ac062 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 16 Oct 2019 17:29:21 +0300 Subject: [PATCH 2/2] core: deduplicate state commit methods, use interim MemCachedStore commit methods duplicated putSmthIntoStore functions, but have MemCachedStore now that can easily substitute for a Batch, especially given that interop needs something like that for its storage purposes anyway. --- pkg/core/account_state.go | 24 +++++++++++++++--------- pkg/core/asset_state.go | 13 ++++--------- pkg/core/blockchain.go | 24 +++++++++++++----------- pkg/core/contract_state.go | 13 ++++--------- pkg/core/spent_coin_state.go | 20 +++++++++++++------- pkg/core/spent_coin_state_test.go | 4 +--- pkg/core/unspent_coin_state.go | 22 ++++++++++++++-------- pkg/core/unspent_coint_state_test.go | 4 +--- pkg/core/util.go | 14 ++++++-------- 9 files changed, 71 insertions(+), 67 deletions(-) diff --git a/pkg/core/account_state.go b/pkg/core/account_state.go index 00fcad55d..4d6ceb532 100644 --- a/pkg/core/account_state.go +++ b/pkg/core/account_state.go @@ -48,17 +48,23 @@ func getAccountStateFromStore(s storage.Store, hash util.Uint160) (*AccountState return account, err } -// commit writes all account states to the given Batch. -func (a Accounts) commit(b storage.Batch) error { +// putAccountStateIntoStore puts given AccountState into the given store. +func putAccountStateIntoStore(store storage.Store, as *AccountState) error { buf := io.NewBufBinWriter() - for hash, state := range a { - state.EncodeBinary(buf.BinWriter) - if buf.Err != nil { - return buf.Err + as.EncodeBinary(buf.BinWriter) + if buf.Err != nil { + return buf.Err + } + key := storage.AppendPrefix(storage.STAccount, as.ScriptHash.Bytes()) + return store.Put(key, buf.Bytes()) +} + +// commit writes all account states to the given Batch. +func (a Accounts) commit(store storage.Store) error { + for _, state := range a { + if err := putAccountStateIntoStore(store, state); err != nil { + return err } - key := storage.AppendPrefix(storage.STAccount, hash.Bytes()) - b.Put(key, buf.Bytes()) - buf.Reset() } return nil } diff --git a/pkg/core/asset_state.go b/pkg/core/asset_state.go index 8dd8663e6..8f883a467 100644 --- a/pkg/core/asset_state.go +++ b/pkg/core/asset_state.go @@ -13,16 +13,11 @@ const feeMode = 0x0 // Assets is mapping between AssetID and the AssetState. type Assets map[util.Uint256]*AssetState -func (a Assets) commit(b storage.Batch) error { - buf := io.NewBufBinWriter() - for hash, state := range a { - state.EncodeBinary(buf.BinWriter) - if buf.Err != nil { - return buf.Err +func (a Assets) commit(store storage.Store) error { + for _, state := range a { + if err := putAssetStateIntoStore(store, state); err != nil { + return err } - key := storage.AppendPrefix(storage.STAsset, hash.Bytes()) - b.Put(key, buf.Bytes()) - buf.Reset() } return nil } diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 899a13bdf..03622aa9c 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -306,7 +306,7 @@ func (bc *Blockchain) processHeader(h *Header, batch storage.Batch, headerList * // and all tests are in place, we can make a more optimized and cleaner implementation. func (bc *Blockchain) storeBlock(block *Block) error { var ( - batch = bc.store.Batch() + tmpStore = storage.NewMemCachedStore(bc.store) unspentCoins = make(UnspentCoins) spentCoins = make(SpentCoins) accounts = make(Accounts) @@ -314,14 +314,16 @@ func (bc *Blockchain) storeBlock(block *Block) error { contracts = make(Contracts) ) - if err := storeAsBlock(batch, block, 0); err != nil { + if err := storeAsBlock(tmpStore, block, 0); err != nil { return err } - storeAsCurrentBlock(batch, block) + if err := storeAsCurrentBlock(tmpStore, block); err != nil { + return err + } for _, tx := range block.Transactions { - if err := storeAsTransaction(batch, tx, block.Index); err != nil { + if err := storeAsTransaction(tmpStore, tx, block.Index); err != nil { return err } @@ -415,7 +417,7 @@ func (bc *Blockchain) storeBlock(block *Block) error { return cs.Script }) - systemInterop := newInteropContext(0x10, bc, bc.store, block, tx) + systemInterop := newInteropContext(0x10, bc, tmpStore, block, tx) vm.RegisterInteropFuncs(systemInterop.getSystemInteropMap()) vm.RegisterInteropFuncs(systemInterop.getNeoInteropMap()) vm.LoadScript(t.Script) @@ -435,22 +437,22 @@ func (bc *Blockchain) storeBlock(block *Block) error { } // Persist all to storage. - if err := accounts.commit(batch); err != nil { + if err := accounts.commit(tmpStore); err != nil { return err } - if err := unspentCoins.commit(batch); err != nil { + if err := unspentCoins.commit(tmpStore); err != nil { return err } - if err := spentCoins.commit(batch); err != nil { + if err := spentCoins.commit(tmpStore); err != nil { return err } - if err := assets.commit(batch); err != nil { + if err := assets.commit(tmpStore); err != nil { return err } - if err := contracts.commit(batch); err != nil { + if err := contracts.commit(tmpStore); err != nil { return err } - if err := bc.store.PutBatch(batch); err != nil { + if _, err := tmpStore.Persist(); err != nil { return err } diff --git a/pkg/core/contract_state.go b/pkg/core/contract_state.go index 1e0a582e8..b5e736b7a 100644 --- a/pkg/core/contract_state.go +++ b/pkg/core/contract_state.go @@ -27,16 +27,11 @@ type ContractState struct { } // commit flushes all contracts to the given storage.Batch. -func (a Contracts) commit(b storage.Batch) error { - buf := io.NewBufBinWriter() - for hash, contract := range a { - contract.EncodeBinary(buf.BinWriter) - if buf.Err != nil { - return buf.Err +func (a Contracts) commit(store storage.Store) error { + for _, contract := range a { + if err := putContractStateIntoStore(store, contract); err != nil { + return err } - key := storage.AppendPrefix(storage.STContract, hash.Bytes()) - b.Put(key, buf.Bytes()) - buf.Reset() } return nil } diff --git a/pkg/core/spent_coin_state.go b/pkg/core/spent_coin_state.go index 978ff2a02..84973df26 100644 --- a/pkg/core/spent_coin_state.go +++ b/pkg/core/spent_coin_state.go @@ -35,16 +35,22 @@ func (s SpentCoins) getAndUpdate(store storage.Store, hash util.Uint256) (*Spent return spent, nil } -func (s SpentCoins) commit(b storage.Batch) error { +// putSpentCoinStateIntoStore puts given SpentCoinState into the given store. +func putSpentCoinStateIntoStore(store storage.Store, hash util.Uint256, scs *SpentCoinState) error { buf := io.NewBufBinWriter() + scs.EncodeBinary(buf.BinWriter) + if buf.Err != nil { + return buf.Err + } + key := storage.AppendPrefix(storage.STSpentCoin, hash.BytesReverse()) + return store.Put(key, buf.Bytes()) +} + +func (s SpentCoins) commit(store storage.Store) error { for hash, state := range s { - state.EncodeBinary(buf.BinWriter) - if buf.Err != nil { - return buf.Err + if err := putSpentCoinStateIntoStore(store, hash, state); err != nil { + return err } - key := storage.AppendPrefix(storage.STSpentCoin, hash.BytesReverse()) - b.Put(key, buf.Bytes()) - buf.Reset() } return nil } diff --git a/pkg/core/spent_coin_state_test.go b/pkg/core/spent_coin_state_test.go index 0736196ed..0aa1c2fad 100644 --- a/pkg/core/spent_coin_state_test.go +++ b/pkg/core/spent_coin_state_test.go @@ -33,7 +33,6 @@ func TestEncodeDecodeSpentCoinState(t *testing.T) { func TestCommitSpentCoins(t *testing.T) { var ( store = storage.NewMemoryStore() - batch = store.Batch() spentCoins = make(SpentCoins) ) @@ -49,6 +48,5 @@ func TestCommitSpentCoins(t *testing.T) { txHeight: 1, } } - assert.Nil(t, spentCoins.commit(batch)) - assert.Nil(t, store.PutBatch(batch)) + assert.Nil(t, spentCoins.commit(store)) } diff --git a/pkg/core/unspent_coin_state.go b/pkg/core/unspent_coin_state.go index 9901b9d2b..ab9cabfaa 100644 --- a/pkg/core/unspent_coin_state.go +++ b/pkg/core/unspent_coin_state.go @@ -51,6 +51,17 @@ func getUnspentCoinStateFromStore(s storage.Store, hash util.Uint256) (*UnspentC return unspent, nil } +// putUnspentCoinStateIntoStore puts given UnspentCoinState into the given store. +func putUnspentCoinStateIntoStore(store storage.Store, hash util.Uint256, ucs *UnspentCoinState) error { + buf := io.NewBufBinWriter() + ucs.EncodeBinary(buf.BinWriter) + if buf.Err != nil { + return buf.Err + } + key := storage.AppendPrefix(storage.STCoin, hash.BytesReverse()) + return store.Put(key, buf.Bytes()) +} + // UnspentCoinState hold the state of a unspent coin. type UnspentCoinState struct { states []CoinState @@ -68,16 +79,11 @@ func NewUnspentCoinState(n int) *UnspentCoinState { } // commit writes all unspent coin states to the given Batch. -func (u UnspentCoins) commit(b storage.Batch) error { - buf := io.NewBufBinWriter() +func (u UnspentCoins) commit(store storage.Store) error { for hash, state := range u { - state.EncodeBinary(buf.BinWriter) - if buf.Err != nil { - return buf.Err + if err := putUnspentCoinStateIntoStore(store, hash, state); err != nil { + return err } - key := storage.AppendPrefix(storage.STCoin, hash.BytesReverse()) - b.Put(key, buf.Bytes()) - buf.Reset() } return nil } diff --git a/pkg/core/unspent_coint_state_test.go b/pkg/core/unspent_coint_state_test.go index 658233ebc..5a475b605 100644 --- a/pkg/core/unspent_coint_state_test.go +++ b/pkg/core/unspent_coint_state_test.go @@ -31,7 +31,6 @@ func TestDecodeEncodeUnspentCoinState(t *testing.T) { func TestCommitUnspentCoins(t *testing.T) { var ( store = storage.NewMemoryStore() - batch = store.Batch() unspentCoins = make(UnspentCoins) ) @@ -56,6 +55,5 @@ func TestCommitUnspentCoins(t *testing.T) { }, } - assert.Nil(t, unspentCoins.commit(batch)) - assert.Nil(t, store.PutBatch(batch)) + assert.Nil(t, unspentCoins.commit(store)) } diff --git a/pkg/core/util.go b/pkg/core/util.go index 05a189d26..3d909cfd5 100644 --- a/pkg/core/util.go +++ b/pkg/core/util.go @@ -179,15 +179,15 @@ func headerSliceReverse(dest []*Header) { // storeAsCurrentBlock stores the given block witch prefix // SYSCurrentBlock. -func storeAsCurrentBlock(batch storage.Batch, block *Block) { +func storeAsCurrentBlock(store storage.Store, block *Block) error { buf := io.NewBufBinWriter() buf.WriteLE(block.Hash().BytesReverse()) buf.WriteLE(block.Index) - batch.Put(storage.SYSCurrentBlock.Bytes(), buf.Bytes()) + return store.Put(storage.SYSCurrentBlock.Bytes(), buf.Bytes()) } // storeAsBlock stores the given block as DataBlock. -func storeAsBlock(batch storage.Batch, block *Block, sysFee uint32) error { +func storeAsBlock(store storage.Store, block *Block, sysFee uint32) error { var ( key = storage.AppendPrefix(storage.DataBlock, block.Hash().BytesReverse()) buf = io.NewBufBinWriter() @@ -202,12 +202,11 @@ func storeAsBlock(batch storage.Batch, block *Block, sysFee uint32) error { if buf.Err != nil { return buf.Err } - batch.Put(key, buf.Bytes()) - return nil + return store.Put(key, buf.Bytes()) } // storeAsTransaction stores the given TX as DataTransaction. -func storeAsTransaction(batch storage.Batch, tx *transaction.Transaction, index uint32) error { +func storeAsTransaction(store storage.Store, tx *transaction.Transaction, index uint32) error { key := storage.AppendPrefix(storage.DataTransaction, tx.Hash().BytesReverse()) buf := io.NewBufBinWriter() buf.WriteLE(index) @@ -215,6 +214,5 @@ func storeAsTransaction(batch storage.Batch, tx *transaction.Transaction, index if buf.Err != nil { return buf.Err } - batch.Put(key, buf.Bytes()) - return nil + return store.Put(key, buf.Bytes()) }