Merge pull request #425 from nspcc-dev/memcached-layer

This one introduces MemCachedStore and moves persistence there which allows easy stacking of cached/temporary stores and allows to deduplicate and simplify other code.
This commit is contained in:
Roman Khimov 2019-10-17 11:13:32 +03:00 committed by GitHub
commit b9fc34e8c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 351 additions and 261 deletions

View file

@ -14,23 +14,17 @@ type Accounts map[util.Uint160]*AccountState
// getAndUpdate retrieves AccountState from temporary or persistent Store
// or creates a new one if it doesn't exist.
func (a Accounts) getAndUpdate(ts storage.Store, ps storage.Store, hash util.Uint160) (*AccountState, error) {
func (a Accounts) getAndUpdate(s storage.Store, hash util.Uint160) (*AccountState, error) {
if account, ok := a[hash]; ok {
return account, nil
}
account, err := getAccountStateFromStore(ts, hash)
account, err := getAccountStateFromStore(s, hash)
if err != nil {
if err != storage.ErrKeyNotFound {
return nil, err
}
account, err = getAccountStateFromStore(ps, hash)
if err != nil {
if err != storage.ErrKeyNotFound {
return nil, err
}
account = NewAccountState(hash)
}
account = NewAccountState(hash)
}
a[hash] = account
@ -54,17 +48,23 @@ func getAccountStateFromStore(s storage.Store, hash util.Uint160) (*AccountState
return account, err
}
// commit writes all account states to the given Batch.
func (a Accounts) commit(b storage.Batch) error {
// putAccountStateIntoStore puts given AccountState into the given store.
func putAccountStateIntoStore(store storage.Store, as *AccountState) error {
buf := io.NewBufBinWriter()
for hash, state := range a {
state.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
as.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
}
key := storage.AppendPrefix(storage.STAccount, as.ScriptHash.Bytes())
return store.Put(key, buf.Bytes())
}
// commit writes all account states to the given Batch.
func (a Accounts) commit(store storage.Store) error {
for _, state := range a {
if err := putAccountStateIntoStore(store, state); err != nil {
return err
}
key := storage.AppendPrefix(storage.STAccount, hash.Bytes())
b.Put(key, buf.Bytes())
buf.Reset()
}
return nil
}

View file

@ -13,16 +13,11 @@ const feeMode = 0x0
// Assets is mapping between AssetID and the AssetState.
type Assets map[util.Uint256]*AssetState
func (a Assets) commit(b storage.Batch) error {
buf := io.NewBufBinWriter()
for hash, state := range a {
state.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
func (a Assets) commit(store storage.Store) error {
for _, state := range a {
if err := putAssetStateIntoStore(store, state); err != nil {
return err
}
key := storage.AppendPrefix(storage.STAsset, hash.Bytes())
b.Put(key, buf.Bytes())
buf.Reset()
}
return nil
}

View file

@ -43,11 +43,8 @@ var (
type Blockchain struct {
config config.ProtocolConfiguration
// Any object that satisfies the BlockchainStorer interface.
storage.Store
// In-memory storage to be persisted into the storage.Store
memStore *storage.MemoryStore
// Persistent storage wrapped around with a write memory caching layer.
store *storage.MemCachedStore
// Current index/height of the highest block.
// Read access should always be called by BlockHeight().
@ -78,8 +75,7 @@ type headersOpFunc func(headerList *HeaderHashList)
func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockchain, error) {
bc := &Blockchain{
config: cfg,
Store: s,
memStore: storage.NewMemoryStore(),
store: storage.NewMemCachedStore(s),
headersOp: make(chan headersOpFunc),
headersOpDone: make(chan struct{}),
memPool: NewMemPool(50000),
@ -94,10 +90,10 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha
func (bc *Blockchain) init() error {
// If we could not find the version in the Store, we know that there is nothing stored.
ver, err := storage.Version(bc.Store)
ver, err := storage.Version(bc.store)
if err != nil {
log.Infof("no storage version found! creating genesis block")
if err = storage.PutVersion(bc.Store, version); err != nil {
if err = storage.PutVersion(bc.store, version); err != nil {
return err
}
genesisBlock, err := createGenesisBlock(bc.config)
@ -116,14 +112,14 @@ func (bc *Blockchain) init() error {
// and the genesis block as first block.
log.Infof("restoring blockchain with version: %s", version)
bHeight, err := storage.CurrentBlockHeight(bc.Store)
bHeight, err := storage.CurrentBlockHeight(bc.store)
if err != nil {
return err
}
bc.blockHeight = bHeight
bc.persistedHeight = bHeight
hashes, err := storage.HeaderHashes(bc.Store)
hashes, err := storage.HeaderHashes(bc.store)
if err != nil {
return err
}
@ -131,7 +127,7 @@ func (bc *Blockchain) init() error {
bc.headerList = NewHeaderHashList(hashes...)
bc.storedHeaderCount = uint32(len(hashes))
currHeaderHeight, currHeaderHash, err := storage.CurrentHeaderHeight(bc.Store)
currHeaderHeight, currHeaderHash, err := storage.CurrentHeaderHeight(bc.store)
if err != nil {
return err
}
@ -173,9 +169,7 @@ func (bc *Blockchain) Run(ctx context.Context) {
if err := bc.persist(ctx); err != nil {
log.Warnf("failed to persist: %s", err)
}
// never fails
_ = bc.memStore.Close()
if err := bc.Store.Close(); err != nil {
if err := bc.store.Close(); err != nil {
log.Warnf("failed to close db: %s", err)
}
}()
@ -237,7 +231,7 @@ func (bc *Blockchain) AddBlock(block *Block) error {
func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) {
var (
start = time.Now()
batch = bc.memStore.Batch()
batch = bc.store.Batch()
)
bc.headersOp <- func(headerList *HeaderHashList) {
@ -263,7 +257,7 @@ func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) {
}
if oldlen != headerList.Len() {
if err = bc.memStore.PutBatch(batch); err != nil {
if err = bc.store.PutBatch(batch); err != nil {
return
}
log.WithFields(log.Fields{
@ -312,7 +306,7 @@ func (bc *Blockchain) processHeader(h *Header, batch storage.Batch, headerList *
// and all tests are in place, we can make a more optimized and cleaner implementation.
func (bc *Blockchain) storeBlock(block *Block) error {
var (
batch = bc.memStore.Batch()
tmpStore = storage.NewMemCachedStore(bc.store)
unspentCoins = make(UnspentCoins)
spentCoins = make(SpentCoins)
accounts = make(Accounts)
@ -320,14 +314,16 @@ func (bc *Blockchain) storeBlock(block *Block) error {
contracts = make(Contracts)
)
if err := storeAsBlock(batch, block, 0); err != nil {
if err := storeAsBlock(tmpStore, block, 0); err != nil {
return err
}
storeAsCurrentBlock(batch, block)
if err := storeAsCurrentBlock(tmpStore, block); err != nil {
return err
}
for _, tx := range block.Transactions {
if err := storeAsTransaction(batch, tx, block.Index); err != nil {
if err := storeAsTransaction(tmpStore, tx, block.Index); err != nil {
return err
}
@ -335,7 +331,7 @@ func (bc *Blockchain) storeBlock(block *Block) error {
// Process TX outputs.
for _, output := range tx.Outputs {
account, err := accounts.getAndUpdate(bc.memStore, bc.Store, output.ScriptHash)
account, err := accounts.getAndUpdate(bc.store, output.ScriptHash)
if err != nil {
return err
}
@ -353,14 +349,14 @@ func (bc *Blockchain) storeBlock(block *Block) error {
return fmt.Errorf("could not find previous TX: %s", prevHash)
}
for _, input := range inputs {
unspent, err := unspentCoins.getAndUpdate(bc.memStore, bc.Store, input.PrevHash)
unspent, err := unspentCoins.getAndUpdate(bc.store, input.PrevHash)
if err != nil {
return err
}
unspent.states[input.PrevIndex] = CoinStateSpent
prevTXOutput := prevTX.Outputs[input.PrevIndex]
account, err := accounts.getAndUpdate(bc.memStore, bc.Store, prevTXOutput.ScriptHash)
account, err := accounts.getAndUpdate(bc.store, prevTXOutput.ScriptHash)
if err != nil {
return err
}
@ -421,13 +417,13 @@ func (bc *Blockchain) storeBlock(block *Block) error {
return cs.Script
})
systemInterop := newInteropContext(0x10, bc, block, tx)
systemInterop := newInteropContext(0x10, bc, tmpStore, block, tx)
vm.RegisterInteropFuncs(systemInterop.getSystemInteropMap())
vm.RegisterInteropFuncs(systemInterop.getNeoInteropMap())
vm.LoadScript(t.Script)
vm.Run()
if !vm.HasFailed() {
_, err := systemInterop.mem.Persist(bc.memStore)
_, err := systemInterop.mem.Persist()
if err != nil {
return errors.Wrap(err, "failed to persist invocation results")
}
@ -441,22 +437,22 @@ func (bc *Blockchain) storeBlock(block *Block) error {
}
// Persist all to storage.
if err := accounts.commit(batch); err != nil {
if err := accounts.commit(tmpStore); err != nil {
return err
}
if err := unspentCoins.commit(batch); err != nil {
if err := unspentCoins.commit(tmpStore); err != nil {
return err
}
if err := spentCoins.commit(batch); err != nil {
if err := spentCoins.commit(tmpStore); err != nil {
return err
}
if err := assets.commit(batch); err != nil {
if err := assets.commit(tmpStore); err != nil {
return err
}
if err := contracts.commit(batch); err != nil {
if err := contracts.commit(tmpStore); err != nil {
return err
}
if err := bc.memStore.PutBatch(batch); err != nil {
if _, err := tmpStore.Persist(); err != nil {
return err
}
@ -472,11 +468,11 @@ func (bc *Blockchain) persist(ctx context.Context) error {
err error
)
persisted, err = bc.memStore.Persist(bc.Store)
persisted, err = bc.store.Persist()
if err != nil {
return err
}
bHeight, err := storage.CurrentBlockHeight(bc.Store)
bHeight, err := storage.CurrentBlockHeight(bc.store)
if err != nil {
return err
}
@ -510,11 +506,7 @@ func (bc *Blockchain) GetTransaction(hash util.Uint256) (*transaction.Transactio
if tx, ok := bc.memPool.TryGetValue(hash); ok {
return tx, 0, nil // the height is not actually defined for memPool transaction. Not sure if zero is a good number in this case.
}
tx, height, err := getTransactionFromStore(bc.memStore, hash)
if err != nil {
tx, height, err = getTransactionFromStore(bc.Store, hash)
}
return tx, height, err
return getTransactionFromStore(bc.store, hash)
}
// getTransactionFromStore returns Transaction and its height by the given hash
@ -541,11 +533,7 @@ func getTransactionFromStore(s storage.Store, hash util.Uint256) (*transaction.T
// GetStorageItem returns an item from storage.
func (bc *Blockchain) GetStorageItem(scripthash util.Uint160, key []byte) *StorageItem {
sItem := getStorageItemFromStore(bc.memStore, scripthash, key)
if sItem == nil {
sItem = getStorageItemFromStore(bc.Store, scripthash, key)
}
return sItem
return getStorageItemFromStore(bc.store, scripthash, key)
}
// GetStorageItems returns all storage items for a given scripthash.
@ -568,8 +556,7 @@ func (bc *Blockchain) GetStorageItems(hash util.Uint160) (map[string]*StorageIte
// Cut prefix and hash.
siMap[string(k[21:])] = si
}
bc.memStore.Seek(storage.AppendPrefix(storage.STStorage, hash.BytesReverse()), saveToMap)
bc.Store.Seek(storage.AppendPrefix(storage.STStorage, hash.BytesReverse()), saveToMap)
bc.store.Seek(storage.AppendPrefix(storage.STStorage, hash.BytesReverse()), saveToMap)
if err != nil {
return nil, err
}
@ -578,12 +565,9 @@ func (bc *Blockchain) GetStorageItems(hash util.Uint160) (map[string]*StorageIte
// GetBlock returns a Block by the given hash.
func (bc *Blockchain) GetBlock(hash util.Uint256) (*Block, error) {
block, err := getBlockFromStore(bc.memStore, hash)
block, err := getBlockFromStore(bc.store, hash)
if err != nil {
block, err = getBlockFromStore(bc.Store, hash)
if err != nil {
return nil, err
}
return nil, err
}
if len(block.Transactions) == 0 {
return nil, fmt.Errorf("only header is available")
@ -614,14 +598,7 @@ func getBlockFromStore(s storage.Store, hash util.Uint256) (*Block, error) {
// GetHeader returns data block header identified with the given hash value.
func (bc *Blockchain) GetHeader(hash util.Uint256) (*Header, error) {
header, err := getHeaderFromStore(bc.memStore, hash)
if err != nil {
header, err = getHeaderFromStore(bc.Store, hash)
if err != nil {
return nil, err
}
}
return header, err
return getHeaderFromStore(bc.store, hash)
}
// getHeaderFromStore returns Header by the given hash from the store.
@ -637,8 +614,7 @@ func getHeaderFromStore(s storage.Store, hash util.Uint256) (*Header, error) {
// transaction hash.
func (bc *Blockchain) HasTransaction(hash util.Uint256) bool {
return bc.memPool.ContainsKey(hash) ||
checkTransactionInStore(bc.memStore, hash) ||
checkTransactionInStore(bc.Store, hash)
checkTransactionInStore(bc.store, hash)
}
// checkTransactionInStore returns true if the given store contains the given
@ -700,11 +676,7 @@ func (bc *Blockchain) HeaderHeight() uint32 {
// GetAssetState returns asset state from its assetID
func (bc *Blockchain) GetAssetState(assetID util.Uint256) *AssetState {
as := getAssetStateFromStore(bc.memStore, assetID)
if as == nil {
as = getAssetStateFromStore(bc.Store, assetID)
}
return as
return getAssetStateFromStore(bc.store, assetID)
}
// getAssetStateFromStore returns given asset state as recorded in the given
@ -727,11 +699,7 @@ func getAssetStateFromStore(s storage.Store, assetID util.Uint256) *AssetState {
// GetContractState returns contract by its script hash.
func (bc *Blockchain) GetContractState(hash util.Uint160) *ContractState {
cs := getContractStateFromStore(bc.memStore, hash)
if cs == nil {
cs = getContractStateFromStore(bc.Store, hash)
}
return cs
return getContractStateFromStore(bc.store, hash)
}
// getContractStateFromStore returns contract state as recorded in the given
@ -754,24 +722,18 @@ func getContractStateFromStore(s storage.Store, hash util.Uint160) *ContractStat
// GetAccountState returns the account state from its script hash
func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *AccountState {
as, err := getAccountStateFromStore(bc.memStore, scriptHash)
if as == nil {
if err != storage.ErrKeyNotFound {
log.Warnf("failed to get account state: %s", err)
}
as, err = getAccountStateFromStore(bc.Store, scriptHash)
if as == nil && err != storage.ErrKeyNotFound {
log.Warnf("failed to get account state: %s", err)
}
as, err := getAccountStateFromStore(bc.store, scriptHash)
if as == nil && err != storage.ErrKeyNotFound {
log.Warnf("failed to get account state: %s", err)
}
return as
}
// GetUnspentCoinState returns unspent coin state for given tx hash.
func (bc *Blockchain) GetUnspentCoinState(hash util.Uint256) *UnspentCoinState {
ucs, err := getUnspentCoinStateFromStore(bc.memStore, hash)
if err != nil {
ucs, _ = getUnspentCoinStateFromStore(bc.Store, hash)
ucs, err := getUnspentCoinStateFromStore(bc.store, hash)
if ucs == nil && err != storage.ErrKeyNotFound {
log.Warnf("failed to get unspent coin state: %s", err)
}
return ucs
}
@ -872,7 +834,7 @@ func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *Block) error {
if ok := bc.memPool.Verify(t); !ok {
return errors.New("invalid transaction due to conflicts with the memory pool")
}
if IsDoubleSpend(bc.Store, t) {
if IsDoubleSpend(bc.store, t) {
return errors.New("invalid transaction caused by double spending")
}
if err := bc.verifyOutputs(t); err != nil {
@ -1180,7 +1142,7 @@ func (bc *Blockchain) verifyTxWitnesses(t *transaction.Transaction, block *Block
}
sort.Slice(hashes, func(i, j int) bool { return hashes[i].Less(hashes[j]) })
sort.Slice(witnesses, func(i, j int) bool { return witnesses[i].ScriptHash().Less(witnesses[j].ScriptHash()) })
interopCtx := newInteropContext(0, bc, block, t)
interopCtx := newInteropContext(0, bc, bc.store, block, t)
for i := 0; i < len(hashes); i++ {
err := bc.verifyHashAgainstScript(hashes[i], witnesses[i], t.VerificationHash(), interopCtx)
if err != nil {
@ -1200,7 +1162,7 @@ func (bc *Blockchain) verifyBlockWitnesses(block *Block, prevHeader *Header) err
} else {
hash = prevHeader.NextConsensus
}
interopCtx := newInteropContext(0, bc, nil, nil)
interopCtx := newInteropContext(0, bc, bc.store, nil, nil)
return bc.verifyHashAgainstScript(hash, block.Script, block.VerificationHash(), interopCtx)
}

View file

@ -57,7 +57,7 @@ func TestAddBlock(t *testing.T) {
for _, block := range blocks {
key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesReverse())
if _, err := bc.Get(key); err != nil {
if _, err := bc.store.Get(key); err != nil {
t.Fatalf("block %s not persisted", block.Hash())
}
}

View file

@ -27,16 +27,11 @@ type ContractState struct {
}
// commit flushes all contracts to the given storage.Batch.
func (a Contracts) commit(b storage.Batch) error {
buf := io.NewBufBinWriter()
for hash, contract := range a {
contract.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
func (a Contracts) commit(store storage.Store) error {
for _, contract := range a {
if err := putContractStateIntoStore(store, contract); err != nil {
return err
}
key := storage.AppendPrefix(storage.STContract, hash.Bytes())
b.Put(key, buf.Bytes())
buf.Reset()
}
return nil
}

View file

@ -18,11 +18,11 @@ type interopContext struct {
trigger byte
block *Block
tx *transaction.Transaction
mem *storage.MemoryStore
mem *storage.MemCachedStore
}
func newInteropContext(trigger byte, bc Blockchainer, block *Block, tx *transaction.Transaction) *interopContext {
mem := storage.NewMemoryStore()
func newInteropContext(trigger byte, bc Blockchainer, s storage.Store, block *Block, tx *transaction.Transaction) *interopContext {
mem := storage.NewMemCachedStore(s)
return &interopContext{bc, trigger, block, tx, mem}
}

View file

@ -35,16 +35,22 @@ func (s SpentCoins) getAndUpdate(store storage.Store, hash util.Uint256) (*Spent
return spent, nil
}
func (s SpentCoins) commit(b storage.Batch) error {
// putSpentCoinStateIntoStore puts given SpentCoinState into the given store.
func putSpentCoinStateIntoStore(store storage.Store, hash util.Uint256, scs *SpentCoinState) error {
buf := io.NewBufBinWriter()
scs.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
}
key := storage.AppendPrefix(storage.STSpentCoin, hash.BytesReverse())
return store.Put(key, buf.Bytes())
}
func (s SpentCoins) commit(store storage.Store) error {
for hash, state := range s {
state.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
if err := putSpentCoinStateIntoStore(store, hash, state); err != nil {
return err
}
key := storage.AppendPrefix(storage.STSpentCoin, hash.BytesReverse())
b.Put(key, buf.Bytes())
buf.Reset()
}
return nil
}

View file

@ -33,7 +33,6 @@ func TestEncodeDecodeSpentCoinState(t *testing.T) {
func TestCommitSpentCoins(t *testing.T) {
var (
store = storage.NewMemoryStore()
batch = store.Batch()
spentCoins = make(SpentCoins)
)
@ -49,6 +48,5 @@ func TestCommitSpentCoins(t *testing.T) {
txHeight: 1,
}
}
assert.Nil(t, spentCoins.commit(batch))
assert.Nil(t, store.PutBatch(batch))
assert.Nil(t, spentCoins.commit(store))
}

View file

@ -0,0 +1,85 @@
package storage
// MemCachedStore is a wrapper around persistent store that caches all changes
// being made for them to be later flushed in one batch.
type MemCachedStore struct {
MemoryStore
// Persistent Store.
ps Store
}
// NewMemCachedStore creates a new MemCachedStore object.
func NewMemCachedStore(lower Store) *MemCachedStore {
return &MemCachedStore{
MemoryStore: *NewMemoryStore(),
ps: lower,
}
}
// Get implements the Store interface.
func (s *MemCachedStore) Get(key []byte) ([]byte, error) {
s.mut.RLock()
defer s.mut.RUnlock()
k := string(key)
if val, ok := s.mem[k]; ok {
return val, nil
}
if _, ok := s.del[k]; ok {
return nil, ErrKeyNotFound
}
return s.ps.Get(key)
}
// Seek implements the Store interface.
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
s.mut.RLock()
defer s.mut.RUnlock()
s.MemoryStore.Seek(key, f)
s.ps.Seek(key, func(k, v []byte) {
elem := string(k)
// If it's in mem, we already called f() for it in MemoryStore.Seek().
_, present := s.mem[elem]
if !present {
// If it's in del, we shouldn't be calling f() anyway.
_, present = s.del[elem]
}
if !present {
f(k, v)
}
})
}
// Persist flushes all the MemoryStore contents into the (supposedly) persistent
// store ps.
func (s *MemCachedStore) Persist() (int, error) {
s.mut.Lock()
defer s.mut.Unlock()
batch := s.ps.Batch()
keys, dkeys := 0, 0
for k, v := range s.mem {
batch.Put([]byte(k), v)
keys++
}
for k := range s.del {
batch.Delete([]byte(k))
dkeys++
}
var err error
if keys != 0 || dkeys != 0 {
err = s.ps.PutBatch(batch)
}
if err == nil {
s.mem = make(map[string][]byte)
s.del = make(map[string]bool)
}
return keys, err
}
// Close implements Store interface, clears up memory and closes the lower layer
// Store.
func (s *MemCachedStore) Close() error {
// It's always successful.
_ = s.MemoryStore.Close()
return s.ps.Close()
}

View file

@ -0,0 +1,141 @@
package storage
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMemCachedStorePersist(t *testing.T) {
// persistent Store
ps := NewMemoryStore()
// cached Store
ts := NewMemCachedStore(ps)
// persisting nothing should do nothing
c, err := ts.Persist()
assert.Equal(t, nil, err)
assert.Equal(t, 0, c)
// persisting one key should result in one key in ps and nothing in ts
assert.NoError(t, ts.Put([]byte("key"), []byte("value")))
c, err = ts.Persist()
assert.Equal(t, nil, err)
assert.Equal(t, 1, c)
v, err := ps.Get([]byte("key"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("value"), v)
v, err = ts.MemoryStore.Get([]byte("key"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
// now we overwrite the previous `key` contents and also add `key2`,
assert.NoError(t, ts.Put([]byte("key"), []byte("newvalue")))
assert.NoError(t, ts.Put([]byte("key2"), []byte("value2")))
// this is to check that now key is written into the ps before we do
// persist
v, err = ps.Get([]byte("key2"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
// two keys should be persisted (one overwritten and one new) and
// available in the ps
c, err = ts.Persist()
assert.Equal(t, nil, err)
assert.Equal(t, 2, c)
v, err = ts.MemoryStore.Get([]byte("key"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
v, err = ts.MemoryStore.Get([]byte("key2"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
v, err = ps.Get([]byte("key"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("newvalue"), v)
v, err = ps.Get([]byte("key2"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("value2"), v)
// we've persisted some values, make sure successive persist is a no-op
c, err = ts.Persist()
assert.Equal(t, nil, err)
assert.Equal(t, 0, c)
// test persisting deletions
err = ts.Delete([]byte("key"))
assert.Equal(t, nil, err)
c, err = ts.Persist()
assert.Equal(t, nil, err)
assert.Equal(t, 0, c)
v, err = ps.Get([]byte("key"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
v, err = ps.Get([]byte("key2"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("value2"), v)
}
func TestCachedGetFromPersistent(t *testing.T) {
key := []byte("key")
value := []byte("value")
ps := NewMemoryStore()
ts := NewMemCachedStore(ps)
assert.NoError(t, ps.Put(key, value))
val, err := ts.Get(key)
assert.Nil(t, err)
assert.Equal(t, value, val)
assert.NoError(t, ts.Delete(key))
val, err = ts.Get(key)
assert.Equal(t, err, ErrKeyNotFound)
assert.Nil(t, val)
}
func TestCachedSeek(t *testing.T) {
var (
// Given this prefix...
goodPrefix = []byte{'f'}
// these pairs should be found...
lowerKVs = []kvSeen{
{[]byte("foo"), []byte("bar"), false},
{[]byte("faa"), []byte("bra"), false},
}
// and these should be not.
deletedKVs = []kvSeen{
{[]byte("fee"), []byte("pow"), false},
{[]byte("fii"), []byte("qaz"), false},
}
// and these should be not.
updatedKVs = []kvSeen{
{[]byte("fuu"), []byte("wop"), false},
{[]byte("fyy"), []byte("zaq"), false},
}
ps = NewMemoryStore()
ts = NewMemCachedStore(ps)
)
for _, v := range lowerKVs {
require.NoError(t, ps.Put(v.key, v.val))
}
for _, v := range deletedKVs {
require.NoError(t, ps.Put(v.key, v.val))
require.NoError(t, ts.Delete(v.key))
}
for _, v := range updatedKVs {
require.NoError(t, ps.Put(v.key, []byte("stub")))
require.NoError(t, ts.Put(v.key, v.val))
}
foundKVs := make(map[string][]byte)
ts.Seek(goodPrefix, func(k, v []byte) {
foundKVs[string(k)] = v
})
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
for _, kv := range lowerKVs {
assert.Equal(t, kv.val, foundKVs[string(kv.key)])
}
for _, kv := range deletedKVs {
_, ok := foundKVs[string(kv.key)]
assert.Equal(t, false, ok)
}
for _, kv := range updatedKVs {
assert.Equal(t, kv.val, foundKVs[string(kv.key)])
}
}
func newMemCachedStoreForTesting(t *testing.T) Store {
return NewMemCachedStore(NewMemoryStore())
}

View file

@ -114,32 +114,6 @@ func newMemoryBatch() *MemoryBatch {
return &MemoryBatch{MemoryStore: *NewMemoryStore()}
}
// Persist flushes all the MemoryStore contents into the (supposedly) persistent
// store provided via parameter.
func (s *MemoryStore) Persist(ps Store) (int, error) {
s.mut.Lock()
defer s.mut.Unlock()
batch := ps.Batch()
keys, dkeys := 0, 0
for k, v := range s.mem {
batch.Put([]byte(k), v)
keys++
}
for k := range s.del {
batch.Delete([]byte(k))
dkeys++
}
var err error
if keys != 0 || dkeys != 0 {
err = ps.PutBatch(batch)
}
if err == nil {
s.mem = make(map[string][]byte)
s.del = make(map[string]bool)
}
return keys, err
}
// Close implements Store interface and clears up memory. Never returns an
// error.
func (s *MemoryStore) Close() error {

View file

@ -2,73 +2,8 @@ package storage
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMemoryStorePersist(t *testing.T) {
// temporary Store
ts := NewMemoryStore()
// persistent Store
ps := NewMemoryStore()
// persisting nothing should do nothing
c, err := ts.Persist(ps)
assert.Equal(t, nil, err)
assert.Equal(t, 0, c)
// persisting one key should result in one key in ps and nothing in ts
assert.NoError(t, ts.Put([]byte("key"), []byte("value")))
c, err = ts.Persist(ps)
assert.Equal(t, nil, err)
assert.Equal(t, 1, c)
v, err := ps.Get([]byte("key"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("value"), v)
v, err = ts.Get([]byte("key"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
// now we overwrite the previous `key` contents and also add `key2`,
assert.NoError(t, ts.Put([]byte("key"), []byte("newvalue")))
assert.NoError(t, ts.Put([]byte("key2"), []byte("value2")))
// this is to check that now key is written into the ps before we do
// persist
v, err = ps.Get([]byte("key2"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
// two keys should be persisted (one overwritten and one new) and
// available in the ps
c, err = ts.Persist(ps)
assert.Equal(t, nil, err)
assert.Equal(t, 2, c)
v, err = ts.Get([]byte("key"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
v, err = ts.Get([]byte("key2"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
v, err = ps.Get([]byte("key"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("newvalue"), v)
v, err = ps.Get([]byte("key2"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("value2"), v)
// we've persisted some values, make sure successive persist is a no-op
c, err = ts.Persist(ps)
assert.Equal(t, nil, err)
assert.Equal(t, 0, c)
// test persisting deletions
err = ts.Delete([]byte("key"))
assert.Equal(t, nil, err)
c, err = ts.Persist(ps)
assert.Equal(t, nil, err)
assert.Equal(t, 0, c)
v, err = ps.Get([]byte("key"))
assert.Equal(t, ErrKeyNotFound, err)
assert.Equal(t, []byte(nil), v)
v, err = ps.Get([]byte("key2"))
assert.Equal(t, nil, err)
assert.Equal(t, []byte("value2"), v)
}
func newMemoryStoreForTesting(t *testing.T) Store {
return NewMemoryStore()
}

View file

@ -9,6 +9,13 @@ import (
"github.com/stretchr/testify/require"
)
// kvSeen is used to test Seek implementations.
type kvSeen struct {
key []byte
val []byte
seen bool
}
type dbSetup struct {
name string
create func(*testing.T) Store
@ -66,11 +73,6 @@ func testStorePutBatch(t *testing.T, s Store) {
}
func testStoreSeek(t *testing.T, s Store) {
type kvSeen struct {
key []byte
val []byte
seen bool
}
var (
// Given this prefix...
goodprefix = []byte{'f'}
@ -219,6 +221,7 @@ func TestAllDBs(t *testing.T) {
var DBs = []dbSetup{
{"BoltDB", newBoltStoreForTesting},
{"LevelDB", newLevelDBForTesting},
{"MemCached", newMemCachedStoreForTesting},
{"Memory", newMemoryStoreForTesting},
{"RedisDB", newRedisStoreForTesting},
}

View file

@ -16,24 +16,18 @@ type UnspentCoins map[util.Uint256]*UnspentCoinState
// getAndUpdate retreives UnspentCoinState from temporary or persistent Store
// and return it. If it's not present in both stores, returns a new
// UnspentCoinState.
func (u UnspentCoins) getAndUpdate(ts storage.Store, ps storage.Store, hash util.Uint256) (*UnspentCoinState, error) {
func (u UnspentCoins) getAndUpdate(s storage.Store, hash util.Uint256) (*UnspentCoinState, error) {
if unspent, ok := u[hash]; ok {
return unspent, nil
}
unspent, err := getUnspentCoinStateFromStore(ts, hash)
unspent, err := getUnspentCoinStateFromStore(s, hash)
if err != nil {
if err != storage.ErrKeyNotFound {
return nil, err
}
unspent, err = getUnspentCoinStateFromStore(ps, hash)
if err != nil {
if err != storage.ErrKeyNotFound {
return nil, err
}
unspent = &UnspentCoinState{
states: []CoinState{},
}
unspent = &UnspentCoinState{
states: []CoinState{},
}
}
@ -57,6 +51,17 @@ func getUnspentCoinStateFromStore(s storage.Store, hash util.Uint256) (*UnspentC
return unspent, nil
}
// putUnspentCoinStateIntoStore puts given UnspentCoinState into the given store.
func putUnspentCoinStateIntoStore(store storage.Store, hash util.Uint256, ucs *UnspentCoinState) error {
buf := io.NewBufBinWriter()
ucs.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
}
key := storage.AppendPrefix(storage.STCoin, hash.BytesReverse())
return store.Put(key, buf.Bytes())
}
// UnspentCoinState hold the state of a unspent coin.
type UnspentCoinState struct {
states []CoinState
@ -74,16 +79,11 @@ func NewUnspentCoinState(n int) *UnspentCoinState {
}
// commit writes all unspent coin states to the given Batch.
func (u UnspentCoins) commit(b storage.Batch) error {
buf := io.NewBufBinWriter()
func (u UnspentCoins) commit(store storage.Store) error {
for hash, state := range u {
state.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
if err := putUnspentCoinStateIntoStore(store, hash, state); err != nil {
return err
}
key := storage.AppendPrefix(storage.STCoin, hash.BytesReverse())
b.Put(key, buf.Bytes())
buf.Reset()
}
return nil
}

View file

@ -31,7 +31,6 @@ func TestDecodeEncodeUnspentCoinState(t *testing.T) {
func TestCommitUnspentCoins(t *testing.T) {
var (
store = storage.NewMemoryStore()
batch = store.Batch()
unspentCoins = make(UnspentCoins)
)
@ -56,6 +55,5 @@ func TestCommitUnspentCoins(t *testing.T) {
},
}
assert.Nil(t, unspentCoins.commit(batch))
assert.Nil(t, store.PutBatch(batch))
assert.Nil(t, unspentCoins.commit(store))
}

View file

@ -179,15 +179,15 @@ func headerSliceReverse(dest []*Header) {
// storeAsCurrentBlock stores the given block witch prefix
// SYSCurrentBlock.
func storeAsCurrentBlock(batch storage.Batch, block *Block) {
func storeAsCurrentBlock(store storage.Store, block *Block) error {
buf := io.NewBufBinWriter()
buf.WriteLE(block.Hash().BytesReverse())
buf.WriteLE(block.Index)
batch.Put(storage.SYSCurrentBlock.Bytes(), buf.Bytes())
return store.Put(storage.SYSCurrentBlock.Bytes(), buf.Bytes())
}
// storeAsBlock stores the given block as DataBlock.
func storeAsBlock(batch storage.Batch, block *Block, sysFee uint32) error {
func storeAsBlock(store storage.Store, block *Block, sysFee uint32) error {
var (
key = storage.AppendPrefix(storage.DataBlock, block.Hash().BytesReverse())
buf = io.NewBufBinWriter()
@ -202,12 +202,11 @@ func storeAsBlock(batch storage.Batch, block *Block, sysFee uint32) error {
if buf.Err != nil {
return buf.Err
}
batch.Put(key, buf.Bytes())
return nil
return store.Put(key, buf.Bytes())
}
// storeAsTransaction stores the given TX as DataTransaction.
func storeAsTransaction(batch storage.Batch, tx *transaction.Transaction, index uint32) error {
func storeAsTransaction(store storage.Store, tx *transaction.Transaction, index uint32) error {
key := storage.AppendPrefix(storage.DataTransaction, tx.Hash().BytesReverse())
buf := io.NewBufBinWriter()
buf.WriteLE(index)
@ -215,6 +214,5 @@ func storeAsTransaction(batch storage.Batch, tx *transaction.Transaction, index
if buf.Err != nil {
return buf.Err
}
batch.Put(key, buf.Bytes())
return nil
return store.Put(key, buf.Bytes())
}