forked from TrueCloudLab/neoneo-go
Merge pull request #2316 from nspcc-dev/seek-optimisation
storage: allow to Seek from some point and backwards
This commit is contained in:
commit
ffe5df066e
19 changed files with 549 additions and 161 deletions
|
@ -486,7 +486,7 @@ func (bc *Blockchain) removeOldStorageItems() {
|
||||||
|
|
||||||
b := bc.dao.Store.Batch()
|
b := bc.dao.Store.Batch()
|
||||||
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
|
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
|
||||||
bc.dao.Store.Seek([]byte{byte(prefix)}, func(k, _ []byte) {
|
bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) {
|
||||||
// #1468, but don't need to copy here, because it is done by Store.
|
// #1468, but don't need to copy here, because it is done by Store.
|
||||||
b.Delete(k)
|
b.Delete(k)
|
||||||
})
|
})
|
||||||
|
|
|
@ -1769,7 +1769,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
|
||||||
if bcSpout.dao.Version.StoragePrefix == tempPrefix {
|
if bcSpout.dao.Version.StoragePrefix == tempPrefix {
|
||||||
tempPrefix = storage.STStorage
|
tempPrefix = storage.STStorage
|
||||||
}
|
}
|
||||||
bcSpout.dao.Store.Seek(bcSpout.dao.Version.StoragePrefix.Bytes(), func(k, v []byte) {
|
bcSpout.dao.Store.Seek(storage.SeekRange{Prefix: bcSpout.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
key[0] = byte(tempPrefix)
|
key[0] = byte(tempPrefix)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
|
|
|
@ -62,8 +62,8 @@ type DAO interface {
|
||||||
PutStateSyncCurrentBlockHeight(h uint32) error
|
PutStateSyncCurrentBlockHeight(h uint32) error
|
||||||
PutStorageItem(id int32, key []byte, si state.StorageItem) error
|
PutStorageItem(id int32, key []byte, si state.StorageItem) error
|
||||||
PutVersion(v Version) error
|
PutVersion(v Version) error
|
||||||
Seek(id int32, prefix []byte, f func(k, v []byte))
|
Seek(id int32, rng storage.SeekRange, f func(k, v []byte))
|
||||||
SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue
|
SeekAsync(ctx context.Context, id int32, rng storage.SeekRange) chan storage.KeyValue
|
||||||
StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error
|
StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error
|
||||||
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
|
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
|
||||||
StoreAsTransaction(tx *transaction.Transaction, index uint32, aer *state.AppExecResult, buf *io.BufBinWriter) error
|
StoreAsTransaction(tx *transaction.Transaction, index uint32, aer *state.AppExecResult, buf *io.BufBinWriter) error
|
||||||
|
@ -300,30 +300,26 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S
|
||||||
Item: state.StorageItem(v),
|
Item: state.StorageItem(v),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
dao.Seek(id, prefix, saveToArr)
|
dao.Seek(id, storage.SeekRange{Prefix: prefix}, saveToArr)
|
||||||
return siArr, nil
|
return siArr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek executes f for all items with a given prefix.
|
// Seek executes f for all storage items matching a given `rng` (matching given prefix and
|
||||||
// If key is to be used outside of f, they may not be copied.
|
// starting from the point specified). If key or value is to be used outside of f, they
|
||||||
func (dao *Simple) Seek(id int32, prefix []byte, f func(k, v []byte)) {
|
// may not be copied.
|
||||||
lookupKey := makeStorageItemKey(dao.Version.StoragePrefix, id, nil)
|
func (dao *Simple) Seek(id int32, rng storage.SeekRange, f func(k, v []byte)) {
|
||||||
if prefix != nil {
|
rng.Prefix = makeStorageItemKey(dao.Version.StoragePrefix, id, rng.Prefix)
|
||||||
lookupKey = append(lookupKey, prefix...)
|
dao.Store.Seek(rng, func(k, v []byte) {
|
||||||
}
|
f(k[len(rng.Prefix):], v)
|
||||||
dao.Store.Seek(lookupKey, func(k, v []byte) {
|
|
||||||
f(k[len(lookupKey):], v)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeekAsync sends all storage items matching given prefix to a channel and returns
|
// SeekAsync sends all storage items matching a given `rng` (matching given prefix and
|
||||||
// the channel. Resulting keys and values may not be copied.
|
// starting from the point specified) to a channel and returns the channel.
|
||||||
func (dao *Simple) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue {
|
// Resulting keys and values may not be copied.
|
||||||
lookupKey := makeStorageItemKey(dao.Version.StoragePrefix, id, nil)
|
func (dao *Simple) SeekAsync(ctx context.Context, id int32, rng storage.SeekRange) chan storage.KeyValue {
|
||||||
if prefix != nil {
|
rng.Prefix = makeStorageItemKey(dao.Version.StoragePrefix, id, rng.Prefix)
|
||||||
lookupKey = append(lookupKey, prefix...)
|
return dao.Store.SeekAsync(ctx, rng, true)
|
||||||
}
|
|
||||||
return dao.Store.SeekAsync(ctx, lookupKey, true)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeStorageItemKey returns a key used to store StorageItem in the DB.
|
// makeStorageItemKey returns a key used to store StorageItem in the DB.
|
||||||
|
@ -479,7 +475,9 @@ func (dao *Simple) GetStateSyncCurrentBlockHeight() (uint32, error) {
|
||||||
// the given underlying store.
|
// the given underlying store.
|
||||||
func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
|
func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
|
||||||
hashMap := make(map[uint32][]util.Uint256)
|
hashMap := make(map[uint32][]util.Uint256)
|
||||||
dao.Store.Seek(storage.IXHeaderHashList.Bytes(), func(k, v []byte) {
|
dao.Store.Seek(storage.SeekRange{
|
||||||
|
Prefix: storage.IXHeaderHashList.Bytes(),
|
||||||
|
}, func(k, v []byte) {
|
||||||
storedCount := binary.LittleEndian.Uint32(k[1:])
|
storedCount := binary.LittleEndian.Uint32(k[1:])
|
||||||
hashes, err := read2000Uint256Hashes(v)
|
hashes, err := read2000Uint256Hashes(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -68,6 +69,27 @@ func newTestChainWithCustomCfgAndStore(t testing.TB, st storage.Store, f func(*c
|
||||||
return chain
|
return chain
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newLevelDBForTesting(t testing.TB) storage.Store {
|
||||||
|
ldbDir := t.TempDir()
|
||||||
|
dbConfig := storage.DBConfiguration{
|
||||||
|
Type: "leveldb",
|
||||||
|
LevelDBOptions: storage.LevelDBOptions{
|
||||||
|
DataDirectoryPath: ldbDir,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
newLevelStore, err := storage.NewLevelDBStore(dbConfig.LevelDBOptions)
|
||||||
|
require.Nil(t, err, "NewLevelDBStore error")
|
||||||
|
return newLevelStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBoltStoreForTesting(t testing.TB) storage.Store {
|
||||||
|
d := t.TempDir()
|
||||||
|
testFileName := filepath.Join(d, "test_bolt_db")
|
||||||
|
boltDBStore, err := storage.NewBoltDBStore(storage.BoltDBOptions{FilePath: testFileName})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return boltDBStore
|
||||||
|
}
|
||||||
|
|
||||||
func initTestChain(t testing.TB, st storage.Store, f func(*config.Config)) *Blockchain {
|
func initTestChain(t testing.TB, st storage.Store, f func(*config.Config)) *Blockchain {
|
||||||
unitTestNetCfg, err := config.Load("../../config", testchain.Network())
|
unitTestNetCfg, err := config.Load("../../config", testchain.Network())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -190,7 +190,7 @@ func storageFind(ic *interop.Context) error {
|
||||||
// Items in seekres should be sorted by key, but GetStorageItemsWithPrefix returns
|
// Items in seekres should be sorted by key, but GetStorageItemsWithPrefix returns
|
||||||
// sorted items, so no need to sort them one more time.
|
// sorted items, so no need to sort them one more time.
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
seekres := ic.DAO.SeekAsync(ctx, stc.ID, prefix)
|
seekres := ic.DAO.SeekAsync(ctx, stc.ID, storage.SeekRange{Prefix: prefix})
|
||||||
item := istorage.NewIterator(seekres, prefix, opts)
|
item := istorage.NewIterator(seekres, prefix, opts)
|
||||||
ic.VM.Estack().PushItem(stackitem.NewInterop(item))
|
ic.VM.Estack().PushItem(stackitem.NewInterop(item))
|
||||||
ic.RegisterCancelFunc(cancel)
|
ic.RegisterCancelFunc(cancel)
|
||||||
|
|
|
@ -503,7 +503,7 @@ func (m *Management) InitializeCache(d dao.DAO) error {
|
||||||
defer m.mtx.Unlock()
|
defer m.mtx.Unlock()
|
||||||
|
|
||||||
var initErr error
|
var initErr error
|
||||||
d.Seek(m.ID, []byte{prefixContract}, func(_, v []byte) {
|
d.Seek(m.ID, storage.SeekRange{Prefix: []byte{prefixContract}}, func(_, v []byte) {
|
||||||
var cs = new(state.Contract)
|
var cs = new(state.Contract)
|
||||||
initErr = stackitem.DeserializeConvertible(v, cs)
|
initErr = stackitem.DeserializeConvertible(v, cs)
|
||||||
if initErr != nil {
|
if initErr != nil {
|
||||||
|
|
|
@ -352,7 +352,7 @@ func (n *NEO) PostPersist(ic *interop.Context) error {
|
||||||
if g, ok := n.gasPerVoteCache[cs[i].Key]; ok {
|
if g, ok := n.gasPerVoteCache[cs[i].Key]; ok {
|
||||||
r = &g
|
r = &g
|
||||||
} else {
|
} else {
|
||||||
reward := n.getGASPerVote(ic.DAO, key[:34], ic.Block.Index+1)
|
reward := n.getGASPerVote(ic.DAO, key[:34], []uint32{ic.Block.Index + 1})
|
||||||
r = &reward[0]
|
r = &reward[0]
|
||||||
}
|
}
|
||||||
tmp.Add(tmp, r)
|
tmp.Add(tmp, r)
|
||||||
|
@ -383,16 +383,27 @@ func (n *NEO) PostPersist(ic *interop.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NEO) getGASPerVote(d dao.DAO, key []byte, index ...uint32) []big.Int {
|
func (n *NEO) getGASPerVote(d dao.DAO, key []byte, indexes []uint32) []big.Int {
|
||||||
var max = make([]uint32, len(index))
|
sort.Slice(indexes, func(i, j int) bool {
|
||||||
var reward = make([]big.Int, len(index))
|
return indexes[i] < indexes[j]
|
||||||
d.Seek(n.ID, key, func(k, v []byte) {
|
})
|
||||||
if len(k) == 4 {
|
start := make([]byte, 4)
|
||||||
|
binary.BigEndian.PutUint32(start, indexes[len(indexes)-1])
|
||||||
|
|
||||||
|
need := len(indexes)
|
||||||
|
var reward = make([]big.Int, need)
|
||||||
|
collected := 0
|
||||||
|
d.Seek(n.ID, storage.SeekRange{
|
||||||
|
Prefix: key,
|
||||||
|
Start: start,
|
||||||
|
Backwards: true,
|
||||||
|
}, func(k, v []byte) {
|
||||||
|
if collected < need && len(k) == 4 {
|
||||||
num := binary.BigEndian.Uint32(k)
|
num := binary.BigEndian.Uint32(k)
|
||||||
for i, ind := range index {
|
for i, ind := range indexes {
|
||||||
if max[i] < num && num <= ind {
|
if reward[i].Sign() == 0 && num <= ind {
|
||||||
max[i] = num
|
|
||||||
reward[i] = *bigint.FromBytes(v)
|
reward[i] = *bigint.FromBytes(v)
|
||||||
|
collected++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -591,7 +602,7 @@ func (n *NEO) dropCandidateIfZero(d dao.DAO, pub *keys.PublicKey, c *candidate)
|
||||||
|
|
||||||
var toRemove []string
|
var toRemove []string
|
||||||
voterKey := makeVoterKey(pub.Bytes())
|
voterKey := makeVoterKey(pub.Bytes())
|
||||||
d.Seek(n.ID, voterKey, func(k, v []byte) {
|
d.Seek(n.ID, storage.SeekRange{Prefix: voterKey}, func(k, v []byte) {
|
||||||
toRemove = append(toRemove, string(k))
|
toRemove = append(toRemove, string(k))
|
||||||
})
|
})
|
||||||
for i := range toRemove {
|
for i := range toRemove {
|
||||||
|
@ -638,8 +649,8 @@ func (n *NEO) calculateBonus(d dao.DAO, vote *keys.PublicKey, value *big.Int, st
|
||||||
}
|
}
|
||||||
|
|
||||||
var key = makeVoterKey(vote.Bytes())
|
var key = makeVoterKey(vote.Bytes())
|
||||||
var reward = n.getGASPerVote(d, key, start, end)
|
var reward = n.getGASPerVote(d, key, []uint32{start, end})
|
||||||
var tmp = new(big.Int).Sub(&reward[1], &reward[0])
|
var tmp = (&reward[1]).Sub(&reward[1], &reward[0])
|
||||||
tmp.Mul(tmp, value)
|
tmp.Mul(tmp, value)
|
||||||
tmp.Div(tmp, bigVoterRewardFactor)
|
tmp.Div(tmp, bigVoterRewardFactor)
|
||||||
tmp.Add(tmp, r)
|
tmp.Add(tmp, r)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package core
|
package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/big"
|
"math/big"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -10,6 +11,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/internal/testchain"
|
"github.com/nspcc-dev/neo-go/internal/testchain"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
|
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/native"
|
"github.com/nspcc-dev/neo-go/pkg/core/native"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/storage"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
|
@ -31,7 +33,7 @@ func setSigner(tx *transaction.Transaction, h util.Uint160) {
|
||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkTxHalt(t *testing.T, bc *Blockchain, h util.Uint256) {
|
func checkTxHalt(t testing.TB, bc *Blockchain, h util.Uint256) {
|
||||||
aer, err := bc.GetAppExecResults(h, trigger.Application)
|
aer, err := bc.GetAppExecResults(h, trigger.Application)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 1, len(aer))
|
require.Equal(t, 1, len(aer))
|
||||||
|
@ -476,3 +478,119 @@ func newAccountWithGAS(t *testing.T, bc *Blockchain) *wallet.Account {
|
||||||
transferTokenFromMultisigAccount(t, bc, acc.PrivateKey().GetScriptHash(), bc.contracts.GAS.Hash, 1000_00000000)
|
transferTokenFromMultisigAccount(t, bc, acc.PrivateKey().GetScriptHash(), bc.contracts.GAS.Hash, 1000_00000000)
|
||||||
return acc
|
return acc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkNEO_GetGASPerVote(t *testing.B) {
|
||||||
|
var stores = map[string]func(testing.TB) storage.Store{
|
||||||
|
"MemPS": func(t testing.TB) storage.Store {
|
||||||
|
return storage.NewMemoryStore()
|
||||||
|
},
|
||||||
|
"BoltPS": newBoltStoreForTesting,
|
||||||
|
"LevelPS": newLevelDBForTesting,
|
||||||
|
}
|
||||||
|
for psName, newPS := range stores {
|
||||||
|
for nRewardRecords := 10; nRewardRecords <= 1000; nRewardRecords *= 10 {
|
||||||
|
for rewardDistance := 1; rewardDistance <= 1000; rewardDistance *= 10 {
|
||||||
|
t.Run(fmt.Sprintf("%s_%dRewardRecords_%dRewardDistance", psName, nRewardRecords, rewardDistance), func(t *testing.B) {
|
||||||
|
ps := newPS(t)
|
||||||
|
t.Cleanup(func() { ps.Close() })
|
||||||
|
benchmarkGasPerVote(t, ps, nRewardRecords, rewardDistance)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkGasPerVote(t *testing.B, ps storage.Store, nRewardRecords int, rewardDistance int) {
|
||||||
|
bc := newTestChainWithCustomCfgAndStore(t, ps, nil)
|
||||||
|
|
||||||
|
neo := bc.contracts.NEO
|
||||||
|
tx := transaction.New([]byte{byte(opcode.PUSH1)}, 0)
|
||||||
|
ic := bc.newInteropContext(trigger.Application, bc.dao, nil, tx)
|
||||||
|
ic.SpawnVM()
|
||||||
|
ic.Block = bc.newBlock(tx)
|
||||||
|
|
||||||
|
advanceChain := func(t *testing.B, count int) {
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
require.NoError(t, bc.AddBlock(bc.newBlock()))
|
||||||
|
ic.Block.Index++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Vote for new committee.
|
||||||
|
sz := testchain.CommitteeSize()
|
||||||
|
accs := make([]*wallet.Account, sz)
|
||||||
|
candidates := make(keys.PublicKeys, sz)
|
||||||
|
txs := make([]*transaction.Transaction, 0, len(accs))
|
||||||
|
for i := 0; i < sz; i++ {
|
||||||
|
priv, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
candidates[i] = priv.PublicKey()
|
||||||
|
accs[i], err = wallet.NewAccount()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, neo.RegisterCandidateInternal(ic, candidates[i]))
|
||||||
|
|
||||||
|
to := accs[i].Contract.ScriptHash()
|
||||||
|
w := io.NewBufBinWriter()
|
||||||
|
emit.AppCall(w.BinWriter, bc.contracts.NEO.Hash, "transfer", callflag.All,
|
||||||
|
neoOwner.BytesBE(), to.BytesBE(),
|
||||||
|
big.NewInt(int64(sz-i)*1000000).Int64(), nil)
|
||||||
|
emit.Opcodes(w.BinWriter, opcode.ASSERT)
|
||||||
|
emit.AppCall(w.BinWriter, bc.contracts.GAS.Hash, "transfer", callflag.All,
|
||||||
|
neoOwner.BytesBE(), to.BytesBE(),
|
||||||
|
int64(1_000_000_000), nil)
|
||||||
|
emit.Opcodes(w.BinWriter, opcode.ASSERT)
|
||||||
|
require.NoError(t, w.Err)
|
||||||
|
tx := transaction.New(w.Bytes(), 1000_000_000)
|
||||||
|
tx.ValidUntilBlock = bc.BlockHeight() + 1
|
||||||
|
setSigner(tx, testchain.MultisigScriptHash())
|
||||||
|
require.NoError(t, testchain.SignTx(bc, tx))
|
||||||
|
txs = append(txs, tx)
|
||||||
|
}
|
||||||
|
require.NoError(t, bc.AddBlock(bc.newBlock(txs...)))
|
||||||
|
for _, tx := range txs {
|
||||||
|
checkTxHalt(t, bc, tx.Hash())
|
||||||
|
}
|
||||||
|
for i := 0; i < sz; i++ {
|
||||||
|
priv := accs[i].PrivateKey()
|
||||||
|
h := priv.GetScriptHash()
|
||||||
|
setSigner(tx, h)
|
||||||
|
ic.VM.Load(priv.PublicKey().GetVerificationScript())
|
||||||
|
require.NoError(t, neo.VoteInternal(ic, h, candidates[i]))
|
||||||
|
}
|
||||||
|
_, err := ic.DAO.Persist()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Collect set of nRewardRecords reward records for each voter.
|
||||||
|
advanceChain(t, nRewardRecords*testchain.CommitteeSize())
|
||||||
|
|
||||||
|
// Transfer some more NEO to first voter to update his balance height.
|
||||||
|
to := accs[0].Contract.ScriptHash()
|
||||||
|
w := io.NewBufBinWriter()
|
||||||
|
emit.AppCall(w.BinWriter, bc.contracts.NEO.Hash, "transfer", callflag.All,
|
||||||
|
neoOwner.BytesBE(), to.BytesBE(), int64(1), nil)
|
||||||
|
emit.Opcodes(w.BinWriter, opcode.ASSERT)
|
||||||
|
require.NoError(t, w.Err)
|
||||||
|
tx = transaction.New(w.Bytes(), 1000_000_000)
|
||||||
|
tx.ValidUntilBlock = bc.BlockHeight() + 1
|
||||||
|
setSigner(tx, testchain.MultisigScriptHash())
|
||||||
|
require.NoError(t, testchain.SignTx(bc, tx))
|
||||||
|
require.NoError(t, bc.AddBlock(bc.newBlock(tx)))
|
||||||
|
|
||||||
|
aer, err := bc.GetAppExecResults(tx.Hash(), trigger.Application)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(aer))
|
||||||
|
require.Equal(t, vm.HaltState, aer[0].VMState, aer[0].FaultException)
|
||||||
|
|
||||||
|
// Advance chain one more time to avoid same start/end rewarding bounds.
|
||||||
|
advanceChain(t, rewardDistance)
|
||||||
|
end := bc.BlockHeight()
|
||||||
|
|
||||||
|
t.ResetTimer()
|
||||||
|
t.ReportAllocs()
|
||||||
|
t.StartTimer()
|
||||||
|
for i := 0; i < t.N; i++ {
|
||||||
|
_, err := neo.CalculateBonus(ic.DAO, to, end)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
t.StopTimer()
|
||||||
|
}
|
||||||
|
|
|
@ -127,7 +127,7 @@ func (s *Module) CleanStorage() error {
|
||||||
return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load())
|
return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load())
|
||||||
}
|
}
|
||||||
b := s.Store.Batch()
|
b := s.Store.Batch()
|
||||||
s.Store.Seek([]byte{byte(storage.DataMPT)}, func(k, _ []byte) {
|
s.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, _ []byte) {
|
||||||
// #1468, but don't need to copy here, because it is done by Store.
|
// #1468, but don't need to copy here, because it is done by Store.
|
||||||
b.Delete(k)
|
b.Delete(k)
|
||||||
})
|
})
|
||||||
|
|
|
@ -32,7 +32,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
||||||
nodes = make(map[util.Uint256][]byte)
|
nodes = make(map[util.Uint256][]byte)
|
||||||
expectedItems []storage.KeyValue
|
expectedItems []storage.KeyValue
|
||||||
)
|
)
|
||||||
expectedStorage.Seek(storage.DataMPT.Bytes(), func(k, v []byte) {
|
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
expectedItems = append(expectedItems, storage.KeyValue{
|
expectedItems = append(expectedItems, storage.KeyValue{
|
||||||
|
@ -95,7 +95,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
||||||
|
|
||||||
// Compare resulting storage items and refcounts.
|
// Compare resulting storage items and refcounts.
|
||||||
var actualItems []storage.KeyValue
|
var actualItems []storage.KeyValue
|
||||||
expectedStorage.Seek(storage.DataMPT.Bytes(), func(k, v []byte) {
|
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
actualItems = append(actualItems, storage.KeyValue{
|
actualItems = append(actualItems, storage.KeyValue{
|
||||||
|
|
|
@ -424,7 +424,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
// compare storage states
|
// compare storage states
|
||||||
fetchStorage := func(bc *Blockchain) []storage.KeyValue {
|
fetchStorage := func(bc *Blockchain) []storage.KeyValue {
|
||||||
var kv []storage.KeyValue
|
var kv []storage.KeyValue
|
||||||
bc.dao.Store.Seek(bc.dao.Version.StoragePrefix.Bytes(), func(k, v []byte) {
|
bc.dao.Store.Seek(storage.SeekRange{Prefix: bc.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
if key[0] == byte(storage.STTempStorage) {
|
if key[0] == byte(storage.STTempStorage) {
|
||||||
|
@ -444,7 +444,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
// no temp items should be left
|
// no temp items should be left
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
var haveItems bool
|
var haveItems bool
|
||||||
bcBolt.dao.Store.Seek(storage.STStorage.Bytes(), func(_, _ []byte) {
|
bcBolt.dao.Store.Seek(storage.SeekRange{Prefix: storage.STStorage.Bytes()}, func(_, _ []byte) {
|
||||||
haveItems = true
|
haveItems = true
|
||||||
})
|
})
|
||||||
return !haveItems
|
return !haveItems
|
||||||
|
|
|
@ -109,11 +109,43 @@ func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *BoltDBStore) Seek(key []byte, f func(k, v []byte)) {
|
func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
|
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
||||||
|
copy(start, rng.Prefix)
|
||||||
|
copy(start[len(rng.Prefix):], rng.Start)
|
||||||
|
if rng.Backwards {
|
||||||
|
s.seekBackwards(rng.Prefix, start, f)
|
||||||
|
} else {
|
||||||
|
s.seek(rng.Prefix, start, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte)) {
|
||||||
|
prefix := util.BytesPrefix(key)
|
||||||
|
prefix.Start = start
|
||||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||||
c := tx.Bucket(Bucket).Cursor()
|
c := tx.Bucket(Bucket).Cursor()
|
||||||
prefix := util.BytesPrefix(key)
|
for k, v := c.Seek(prefix.Start); k != nil && (len(prefix.Limit) == 0 || bytes.Compare(k, prefix.Limit) <= 0); k, v = c.Next() {
|
||||||
for k, v := c.Seek(prefix.Start); k != nil && bytes.Compare(k, prefix.Limit) <= 0; k, v = c.Next() {
|
f(k, v)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte)) {
|
||||||
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
c := tx.Bucket(Bucket).Cursor()
|
||||||
|
// Move cursor to the first kv pair which is followed by the pair matching the specified prefix.
|
||||||
|
if len(start) == 0 {
|
||||||
|
lastKey, _ := c.Last()
|
||||||
|
start = lastKey
|
||||||
|
}
|
||||||
|
rng := util.BytesPrefix(start) // in fact, we only need limit based on start slice to iterate backwards starting from this limit
|
||||||
|
c.Seek(rng.Limit)
|
||||||
|
for k, v := c.Prev(); k != nil && bytes.HasPrefix(k, key); k, v = c.Prev() {
|
||||||
f(k, v)
|
f(k, v)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -85,14 +85,38 @@ func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *LevelDBStore) Seek(key []byte, f func(k, v []byte)) {
|
func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
iter := s.db.NewIterator(util.BytesPrefix(key), nil)
|
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
||||||
|
copy(start, rng.Prefix)
|
||||||
|
copy(start[len(rng.Prefix):], rng.Start)
|
||||||
|
if rng.Backwards {
|
||||||
|
s.seekBackwards(rng.Prefix, start, f)
|
||||||
|
} else {
|
||||||
|
s.seek(rng.Prefix, start, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *LevelDBStore) seek(key []byte, start []byte, f func(k, v []byte)) {
|
||||||
|
prefix := util.BytesPrefix(key)
|
||||||
|
prefix.Start = start
|
||||||
|
iter := s.db.NewIterator(prefix, nil)
|
||||||
for iter.Next() {
|
for iter.Next() {
|
||||||
f(iter.Key(), iter.Value())
|
f(iter.Key(), iter.Value())
|
||||||
}
|
}
|
||||||
iter.Release()
|
iter.Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *LevelDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte)) {
|
||||||
|
iRange := util.BytesPrefix(start)
|
||||||
|
iRange.Start = key
|
||||||
|
|
||||||
|
iter := s.db.NewIterator(iRange, nil)
|
||||||
|
for ok := iter.Last(); ok; ok = iter.Prev() {
|
||||||
|
f(iter.Key(), iter.Value())
|
||||||
|
}
|
||||||
|
iter.Release()
|
||||||
|
}
|
||||||
|
|
||||||
// Batch implements the Batch interface and returns a leveldb
|
// Batch implements the Batch interface and returns a leveldb
|
||||||
// compatible Batch.
|
// compatible Batch.
|
||||||
func (s *LevelDBStore) Batch() Batch {
|
func (s *LevelDBStore) Batch() Batch {
|
||||||
|
|
|
@ -90,17 +90,17 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) {
|
func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
s.seek(context.Background(), key, false, f)
|
s.seek(context.Background(), rng, false, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and
|
// SeekAsync returns non-buffered channel with matching KeyValue pairs. Key and
|
||||||
// value slices may not be copied and may be modified. SeekAsync can guarantee
|
// value slices may not be copied and may be modified. SeekAsync can guarantee
|
||||||
// that key-value items are sorted by key in ascending way.
|
// that key-value items are sorted by key in ascending way.
|
||||||
func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bool) chan KeyValue {
|
func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue {
|
||||||
res := make(chan KeyValue)
|
res := make(chan KeyValue)
|
||||||
go func() {
|
go func() {
|
||||||
s.seek(ctx, key, cutPrefix, func(k, v []byte) {
|
s.seek(ctx, rng, cutPrefix, func(k, v []byte) {
|
||||||
res <- KeyValue{
|
res <- KeyValue{
|
||||||
Key: k,
|
Key: k,
|
||||||
Value: v,
|
Value: v,
|
||||||
|
@ -112,13 +112,29 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, key []byte, cutPrefix bo
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f func(k, v []byte)) {
|
// seek is internal representations of Seek* capable of seeking for the given key
|
||||||
|
// and supporting early stop using provided context. `cutPrefix` denotes whether provided
|
||||||
|
// key needs to be cut off the resulting keys. `rng` specifies prefix items must match
|
||||||
|
// and point to start seeking from. Backwards seeking from some point is supported
|
||||||
|
// with corresponding `rng` field set.
|
||||||
|
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte)) {
|
||||||
// Create memory store `mem` and `del` snapshot not to hold the lock.
|
// Create memory store `mem` and `del` snapshot not to hold the lock.
|
||||||
var memRes []KeyValueExists
|
var memRes []KeyValueExists
|
||||||
sk := string(key)
|
sPrefix := string(rng.Prefix)
|
||||||
|
lPrefix := len(sPrefix)
|
||||||
|
sStart := string(rng.Start)
|
||||||
|
lStart := len(sStart)
|
||||||
|
isKeyOK := func(key string) bool {
|
||||||
|
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
|
||||||
|
}
|
||||||
|
if rng.Backwards {
|
||||||
|
isKeyOK = func(key string) bool {
|
||||||
|
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
s.mut.RLock()
|
s.mut.RLock()
|
||||||
for k, v := range s.MemoryStore.mem {
|
for k, v := range s.MemoryStore.mem {
|
||||||
if strings.HasPrefix(k, sk) {
|
if isKeyOK(k) {
|
||||||
memRes = append(memRes, KeyValueExists{
|
memRes = append(memRes, KeyValueExists{
|
||||||
KeyValue: KeyValue{
|
KeyValue: KeyValue{
|
||||||
Key: []byte(k),
|
Key: []byte(k),
|
||||||
|
@ -129,7 +145,7 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for k := range s.MemoryStore.del {
|
for k := range s.MemoryStore.del {
|
||||||
if strings.HasPrefix(k, sk) {
|
if isKeyOK(k) {
|
||||||
memRes = append(memRes, KeyValueExists{
|
memRes = append(memRes, KeyValueExists{
|
||||||
KeyValue: KeyValue{
|
KeyValue: KeyValue{
|
||||||
Key: []byte(k),
|
Key: []byte(k),
|
||||||
|
@ -139,9 +155,14 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
||||||
}
|
}
|
||||||
ps := s.ps
|
ps := s.ps
|
||||||
s.mut.RUnlock()
|
s.mut.RUnlock()
|
||||||
|
|
||||||
|
less := func(k1, k2 []byte) bool {
|
||||||
|
res := bytes.Compare(k1, k2)
|
||||||
|
return res != 0 && rng.Backwards == (res > 0)
|
||||||
|
}
|
||||||
// Sort memRes items for further comparison with ps items.
|
// Sort memRes items for further comparison with ps items.
|
||||||
sort.Slice(memRes, func(i, j int) bool {
|
sort.Slice(memRes, func(i, j int) bool {
|
||||||
return bytes.Compare(memRes[i].Key, memRes[j].Key) < 0
|
return less(memRes[i].Key, memRes[j].Key)
|
||||||
})
|
})
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -156,7 +177,7 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
||||||
iMem++
|
iMem++
|
||||||
}
|
}
|
||||||
// Merge results of seek operations in ascending order.
|
// Merge results of seek operations in ascending order.
|
||||||
ps.Seek(key, func(k, v []byte) {
|
mergeFunc := func(k, v []byte) {
|
||||||
if done {
|
if done {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -171,11 +192,11 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
||||||
done = true
|
done = true
|
||||||
break loop
|
break loop
|
||||||
default:
|
default:
|
||||||
var isMem = haveMem && (bytes.Compare(kvMem.Key, kvPs.Key) < 0)
|
var isMem = haveMem && less(kvMem.Key, kvPs.Key)
|
||||||
if isMem {
|
if isMem {
|
||||||
if kvMem.Exists {
|
if kvMem.Exists {
|
||||||
if cutPrefix {
|
if cutPrefix {
|
||||||
kvMem.Key = kvMem.Key[len(key):]
|
kvMem.Key = kvMem.Key[lPrefix:]
|
||||||
}
|
}
|
||||||
f(kvMem.Key, kvMem.Value)
|
f(kvMem.Key, kvMem.Value)
|
||||||
}
|
}
|
||||||
|
@ -189,7 +210,7 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
||||||
} else {
|
} else {
|
||||||
if !bytes.Equal(kvMem.Key, kvPs.Key) {
|
if !bytes.Equal(kvMem.Key, kvPs.Key) {
|
||||||
if cutPrefix {
|
if cutPrefix {
|
||||||
kvPs.Key = kvPs.Key[len(key):]
|
kvPs.Key = kvPs.Key[lPrefix:]
|
||||||
}
|
}
|
||||||
f(kvPs.Key, kvPs.Value)
|
f(kvPs.Key, kvPs.Value)
|
||||||
}
|
}
|
||||||
|
@ -197,7 +218,9 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
|
ps.Seek(rng, mergeFunc)
|
||||||
|
|
||||||
if !done && haveMem {
|
if !done && haveMem {
|
||||||
loop:
|
loop:
|
||||||
for i := iMem - 1; i < len(memRes); i++ {
|
for i := iMem - 1; i < len(memRes); i++ {
|
||||||
|
@ -208,7 +231,7 @@ func (s *MemCachedStore) seek(ctx context.Context, key []byte, cutPrefix bool, f
|
||||||
kvMem = memRes[i]
|
kvMem = memRes[i]
|
||||||
if kvMem.Exists {
|
if kvMem.Exists {
|
||||||
if cutPrefix {
|
if cutPrefix {
|
||||||
kvMem.Key = kvMem.Key[len(key):]
|
kvMem.Key = kvMem.Key[lPrefix:]
|
||||||
}
|
}
|
||||||
f(kvMem.Key, kvMem.Value)
|
f(kvMem.Key, kvMem.Value)
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,48 +138,48 @@ func TestCachedSeek(t *testing.T) {
|
||||||
// Given this prefix...
|
// Given this prefix...
|
||||||
goodPrefix = []byte{'f'}
|
goodPrefix = []byte{'f'}
|
||||||
// these pairs should be found...
|
// these pairs should be found...
|
||||||
lowerKVs = []kvSeen{
|
lowerKVs = []KeyValue{
|
||||||
{[]byte("foo"), []byte("bar"), false},
|
{[]byte("foo"), []byte("bar")},
|
||||||
{[]byte("faa"), []byte("bra"), false},
|
{[]byte("faa"), []byte("bra")},
|
||||||
}
|
}
|
||||||
// and these should be not.
|
// and these should be not.
|
||||||
deletedKVs = []kvSeen{
|
deletedKVs = []KeyValue{
|
||||||
{[]byte("fee"), []byte("pow"), false},
|
{[]byte("fee"), []byte("pow")},
|
||||||
{[]byte("fii"), []byte("qaz"), false},
|
{[]byte("fii"), []byte("qaz")},
|
||||||
}
|
}
|
||||||
// and these should be not.
|
// and these should be not.
|
||||||
updatedKVs = []kvSeen{
|
updatedKVs = []KeyValue{
|
||||||
{[]byte("fuu"), []byte("wop"), false},
|
{[]byte("fuu"), []byte("wop")},
|
||||||
{[]byte("fyy"), []byte("zaq"), false},
|
{[]byte("fyy"), []byte("zaq")},
|
||||||
}
|
}
|
||||||
ps = NewMemoryStore()
|
ps = NewMemoryStore()
|
||||||
ts = NewMemCachedStore(ps)
|
ts = NewMemCachedStore(ps)
|
||||||
)
|
)
|
||||||
for _, v := range lowerKVs {
|
for _, v := range lowerKVs {
|
||||||
require.NoError(t, ps.Put(v.key, v.val))
|
require.NoError(t, ps.Put(v.Key, v.Value))
|
||||||
}
|
}
|
||||||
for _, v := range deletedKVs {
|
for _, v := range deletedKVs {
|
||||||
require.NoError(t, ps.Put(v.key, v.val))
|
require.NoError(t, ps.Put(v.Key, v.Value))
|
||||||
require.NoError(t, ts.Delete(v.key))
|
require.NoError(t, ts.Delete(v.Key))
|
||||||
}
|
}
|
||||||
for _, v := range updatedKVs {
|
for _, v := range updatedKVs {
|
||||||
require.NoError(t, ps.Put(v.key, []byte("stub")))
|
require.NoError(t, ps.Put(v.Key, []byte("stub")))
|
||||||
require.NoError(t, ts.Put(v.key, v.val))
|
require.NoError(t, ts.Put(v.Key, v.Value))
|
||||||
}
|
}
|
||||||
foundKVs := make(map[string][]byte)
|
foundKVs := make(map[string][]byte)
|
||||||
ts.Seek(goodPrefix, func(k, v []byte) {
|
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) {
|
||||||
foundKVs[string(k)] = v
|
foundKVs[string(k)] = v
|
||||||
})
|
})
|
||||||
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
||||||
for _, kv := range lowerKVs {
|
for _, kv := range lowerKVs {
|
||||||
assert.Equal(t, kv.val, foundKVs[string(kv.key)])
|
assert.Equal(t, kv.Value, foundKVs[string(kv.Key)])
|
||||||
}
|
}
|
||||||
for _, kv := range deletedKVs {
|
for _, kv := range deletedKVs {
|
||||||
_, ok := foundKVs[string(kv.key)]
|
_, ok := foundKVs[string(kv.Key)]
|
||||||
assert.Equal(t, false, ok)
|
assert.Equal(t, false, ok)
|
||||||
}
|
}
|
||||||
for _, kv := range updatedKVs {
|
for _, kv := range updatedKVs {
|
||||||
assert.Equal(t, kv.val, foundKVs[string(kv.key)])
|
assert.Equal(t, kv.Value, foundKVs[string(kv.Key)])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,7 +232,7 @@ func benchmarkCachedSeek(t *testing.B, ps Store, psElementsCount, tsElementsCoun
|
||||||
t.ReportAllocs()
|
t.ReportAllocs()
|
||||||
t.ResetTimer()
|
t.ResetTimer()
|
||||||
for n := 0; n < t.N; n++ {
|
for n := 0; n < t.N; n++ {
|
||||||
ts.Seek(searchPrefix, func(k, v []byte) {})
|
ts.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) {})
|
||||||
}
|
}
|
||||||
t.StopTimer()
|
t.StopTimer()
|
||||||
}
|
}
|
||||||
|
@ -290,7 +290,7 @@ func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error {
|
||||||
b.onPutBatch()
|
b.onPutBatch()
|
||||||
return ErrKeyNotFound
|
return ErrKeyNotFound
|
||||||
}
|
}
|
||||||
func (b *BadStore) Seek(k []byte, f func(k, v []byte)) {
|
func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
}
|
}
|
||||||
func (b *BadStore) Close() error {
|
func (b *BadStore) Close() error {
|
||||||
return nil
|
return nil
|
||||||
|
@ -332,46 +332,46 @@ func TestCachedSeekSorting(t *testing.T) {
|
||||||
// Given this prefix...
|
// Given this prefix...
|
||||||
goodPrefix = []byte{1}
|
goodPrefix = []byte{1}
|
||||||
// these pairs should be found...
|
// these pairs should be found...
|
||||||
lowerKVs = []kvSeen{
|
lowerKVs = []KeyValue{
|
||||||
{[]byte{1, 2, 3}, []byte("bra"), false},
|
{[]byte{1, 2, 3}, []byte("bra")},
|
||||||
{[]byte{1, 2, 5}, []byte("bar"), false},
|
{[]byte{1, 2, 5}, []byte("bar")},
|
||||||
{[]byte{1, 3, 3}, []byte("bra"), false},
|
{[]byte{1, 3, 3}, []byte("bra")},
|
||||||
{[]byte{1, 3, 5}, []byte("bra"), false},
|
{[]byte{1, 3, 5}, []byte("bra")},
|
||||||
}
|
}
|
||||||
// and these should be not.
|
// and these should be not.
|
||||||
deletedKVs = []kvSeen{
|
deletedKVs = []KeyValue{
|
||||||
{[]byte{1, 7, 3}, []byte("pow"), false},
|
{[]byte{1, 7, 3}, []byte("pow")},
|
||||||
{[]byte{1, 7, 4}, []byte("qaz"), false},
|
{[]byte{1, 7, 4}, []byte("qaz")},
|
||||||
}
|
}
|
||||||
// and these should be not.
|
// and these should be not.
|
||||||
updatedKVs = []kvSeen{
|
updatedKVs = []KeyValue{
|
||||||
{[]byte{1, 2, 4}, []byte("zaq"), false},
|
{[]byte{1, 2, 4}, []byte("zaq")},
|
||||||
{[]byte{1, 2, 6}, []byte("zaq"), false},
|
{[]byte{1, 2, 6}, []byte("zaq")},
|
||||||
{[]byte{1, 3, 2}, []byte("wop"), false},
|
{[]byte{1, 3, 2}, []byte("wop")},
|
||||||
{[]byte{1, 3, 4}, []byte("zaq"), false},
|
{[]byte{1, 3, 4}, []byte("zaq")},
|
||||||
}
|
}
|
||||||
ps = NewMemoryStore()
|
ps = NewMemoryStore()
|
||||||
ts = NewMemCachedStore(ps)
|
ts = NewMemCachedStore(ps)
|
||||||
)
|
)
|
||||||
for _, v := range lowerKVs {
|
for _, v := range lowerKVs {
|
||||||
require.NoError(t, ps.Put(v.key, v.val))
|
require.NoError(t, ps.Put(v.Key, v.Value))
|
||||||
}
|
}
|
||||||
for _, v := range deletedKVs {
|
for _, v := range deletedKVs {
|
||||||
require.NoError(t, ps.Put(v.key, v.val))
|
require.NoError(t, ps.Put(v.Key, v.Value))
|
||||||
require.NoError(t, ts.Delete(v.key))
|
require.NoError(t, ts.Delete(v.Key))
|
||||||
}
|
}
|
||||||
for _, v := range updatedKVs {
|
for _, v := range updatedKVs {
|
||||||
require.NoError(t, ps.Put(v.key, []byte("stub")))
|
require.NoError(t, ps.Put(v.Key, []byte("stub")))
|
||||||
require.NoError(t, ts.Put(v.key, v.val))
|
require.NoError(t, ts.Put(v.Key, v.Value))
|
||||||
}
|
}
|
||||||
var foundKVs []kvSeen
|
var foundKVs []KeyValue
|
||||||
ts.Seek(goodPrefix, func(k, v []byte) {
|
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) {
|
||||||
foundKVs = append(foundKVs, kvSeen{key: slice.Copy(k), val: slice.Copy(v)})
|
foundKVs = append(foundKVs, KeyValue{Key: slice.Copy(k), Value: slice.Copy(v)})
|
||||||
})
|
})
|
||||||
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
||||||
expected := append(lowerKVs, updatedKVs...)
|
expected := append(lowerKVs, updatedKVs...)
|
||||||
sort.Slice(expected, func(i, j int) bool {
|
sort.Slice(expected, func(i, j int) bool {
|
||||||
return bytes.Compare(expected[i].key, expected[j].key) < 0
|
return bytes.Compare(expected[i].Key, expected[j].Key) < 0
|
||||||
})
|
})
|
||||||
require.Equal(t, expected, foundKVs)
|
require.Equal(t, expected, foundKVs)
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,9 +104,9 @@ func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *MemoryStore) Seek(key []byte, f func(k, v []byte)) {
|
func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
s.mut.RLock()
|
s.mut.RLock()
|
||||||
s.seek(key, f)
|
s.seek(rng, f)
|
||||||
s.mut.RUnlock()
|
s.mut.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,12 +127,31 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// seek is an internal unlocked implementation of Seek.
|
// seek is an internal unlocked implementation of Seek. `start` denotes whether
|
||||||
func (s *MemoryStore) seek(key []byte, f func(k, v []byte)) {
|
// seeking starting from the provided prefix should be performed. Backwards
|
||||||
sk := string(key)
|
// seeking from some point is supported with corresponding SeekRange field set.
|
||||||
|
func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
|
sPrefix := string(rng.Prefix)
|
||||||
|
lPrefix := len(sPrefix)
|
||||||
|
sStart := string(rng.Start)
|
||||||
|
lStart := len(sStart)
|
||||||
var memList []KeyValue
|
var memList []KeyValue
|
||||||
|
|
||||||
|
isKeyOK := func(key string) bool {
|
||||||
|
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) >= 0)
|
||||||
|
}
|
||||||
|
if rng.Backwards {
|
||||||
|
isKeyOK = func(key string) bool {
|
||||||
|
return strings.HasPrefix(key, sPrefix) && (lStart == 0 || strings.Compare(key[lPrefix:], sStart) <= 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
less := func(k1, k2 []byte) bool {
|
||||||
|
res := bytes.Compare(k1, k2)
|
||||||
|
return res != 0 && rng.Backwards == (res > 0)
|
||||||
|
}
|
||||||
|
|
||||||
for k, v := range s.mem {
|
for k, v := range s.mem {
|
||||||
if strings.HasPrefix(k, sk) {
|
if isKeyOK(k) {
|
||||||
memList = append(memList, KeyValue{
|
memList = append(memList, KeyValue{
|
||||||
Key: []byte(k),
|
Key: []byte(k),
|
||||||
Value: v,
|
Value: v,
|
||||||
|
@ -140,7 +159,7 @@ func (s *MemoryStore) seek(key []byte, f func(k, v []byte)) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sort.Slice(memList, func(i, j int) bool {
|
sort.Slice(memList, func(i, j int) bool {
|
||||||
return bytes.Compare(memList[i].Key, memList[j].Key) < 0
|
return less(memList[i].Key, memList[j].Key)
|
||||||
})
|
})
|
||||||
for _, kv := range memList {
|
for _, kv := range memList {
|
||||||
f(kv.Key, kv.Value)
|
f(kv.Key, kv.Value)
|
||||||
|
|
|
@ -28,7 +28,7 @@ func BenchmarkMemorySeek(t *testing.B) {
|
||||||
t.ReportAllocs()
|
t.ReportAllocs()
|
||||||
t.ResetTimer()
|
t.ResetTimer()
|
||||||
for n := 0; n < t.N; n++ {
|
for n := 0; n < t.N; n++ {
|
||||||
ms.Seek(searchPrefix, func(k, v []byte) {})
|
ms.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) {})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,26 @@ const (
|
||||||
MaxStorageValueLen = 65535
|
MaxStorageValueLen = 65535
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// SeekRange represents options for Store.Seek operation.
|
||||||
|
type SeekRange struct {
|
||||||
|
// Prefix denotes the Seek's lookup key.
|
||||||
|
// Empty Prefix means seeking through all keys in the DB starting from
|
||||||
|
// the Start if specified.
|
||||||
|
Prefix []byte
|
||||||
|
// Start denotes value upended to the Prefix to start Seek from.
|
||||||
|
// Seeking starting from some key includes this key to the result;
|
||||||
|
// if no matching key was found then next suitable key is picked up.
|
||||||
|
// Start may be empty. Empty Start means seeking through all keys in
|
||||||
|
// the DB with matching Prefix.
|
||||||
|
// Empty Prefix and empty Start can be combined, which means seeking
|
||||||
|
// through all keys in the DB.
|
||||||
|
Start []byte
|
||||||
|
// Backwards denotes whether Seek direction should be reversed, i.e.
|
||||||
|
// whether seeking should be performed in a descending way.
|
||||||
|
// Backwards can be safely combined with Prefix and Start.
|
||||||
|
Backwards bool
|
||||||
|
}
|
||||||
|
|
||||||
// ErrKeyNotFound is an error returned by Store implementations
|
// ErrKeyNotFound is an error returned by Store implementations
|
||||||
// when a certain key is not found.
|
// when a certain key is not found.
|
||||||
var ErrKeyNotFound = errors.New("key not found")
|
var ErrKeyNotFound = errors.New("key not found")
|
||||||
|
@ -63,7 +83,7 @@ type (
|
||||||
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
||||||
// Key and value slices should not be modified. Seek can guarantee that key-value items are sorted by
|
// Key and value slices should not be modified. Seek can guarantee that key-value items are sorted by
|
||||||
// key in ascending way.
|
// key in ascending way.
|
||||||
Seek(k []byte, f func(k, v []byte))
|
Seek(rng SeekRange, f func(k, v []byte))
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"reflect"
|
"reflect"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||||
|
@ -10,13 +12,6 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
// kvSeen is used to test Seek implementations.
|
|
||||||
type kvSeen struct {
|
|
||||||
key []byte
|
|
||||||
val []byte
|
|
||||||
seen bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type dbSetup struct {
|
type dbSetup struct {
|
||||||
name string
|
name string
|
||||||
create func(testing.TB) Store
|
create func(testing.TB) Store
|
||||||
|
@ -72,51 +67,177 @@ func testStorePutBatch(t *testing.T, s Store) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func testStoreSeek(t *testing.T, s Store) {
|
func testStoreSeek(t *testing.T, s Store) {
|
||||||
var (
|
// Use the same set of kvs to test Seek with different prefix/start values.
|
||||||
// Given this prefix...
|
kvs := []KeyValue{
|
||||||
goodprefix = []byte{'f'}
|
{[]byte("10"), []byte("bar")},
|
||||||
// these pairs should be found...
|
{[]byte("11"), []byte("bara")},
|
||||||
goodkvs = []kvSeen{
|
{[]byte("20"), []byte("barb")},
|
||||||
{[]byte("foo"), []byte("bar"), false},
|
{[]byte("21"), []byte("barc")},
|
||||||
{[]byte("faa"), []byte("bra"), false},
|
{[]byte("22"), []byte("bard")},
|
||||||
{[]byte("foox"), []byte("barx"), false},
|
{[]byte("30"), []byte("bare")},
|
||||||
|
{[]byte("31"), []byte("barf")},
|
||||||
}
|
}
|
||||||
// and these should be not.
|
for _, v := range kvs {
|
||||||
badkvs = []kvSeen{
|
require.NoError(t, s.Put(v.Key, v.Value))
|
||||||
{[]byte("doo"), []byte("pow"), false},
|
|
||||||
{[]byte("mew"), []byte("qaz"), false},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, v := range goodkvs {
|
|
||||||
require.NoError(t, s.Put(v.key, v.val))
|
|
||||||
}
|
|
||||||
for _, v := range badkvs {
|
|
||||||
require.NoError(t, s.Put(v.key, v.val))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
numFound := 0
|
check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue, backwards bool) {
|
||||||
s.Seek(goodprefix, func(k, v []byte) {
|
// Seek result expected to be sorted in an ascending (for forwards seeking) or descending (for backwards seeking) way.
|
||||||
for i := 0; i < len(goodkvs); i++ {
|
cmpFunc := func(i, j int) bool {
|
||||||
if string(k) == string(goodkvs[i].key) {
|
return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) < 0
|
||||||
assert.Equal(t, string(goodkvs[i].val), string(v))
|
}
|
||||||
goodkvs[i].seen = true
|
if backwards {
|
||||||
|
cmpFunc = func(i, j int) bool {
|
||||||
|
return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) > 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := 0; i < len(badkvs); i++ {
|
sort.Slice(goodkvs, cmpFunc)
|
||||||
if string(k) == string(badkvs[i].key) {
|
|
||||||
badkvs[i].seen = true
|
rng := SeekRange{
|
||||||
|
Prefix: goodprefix,
|
||||||
|
Start: start,
|
||||||
}
|
}
|
||||||
|
if backwards {
|
||||||
|
rng.Backwards = true
|
||||||
}
|
}
|
||||||
numFound++
|
actual := make([]KeyValue, 0, len(goodkvs))
|
||||||
|
s.Seek(rng, func(k, v []byte) {
|
||||||
|
actual = append(actual, KeyValue{
|
||||||
|
Key: slice.Copy(k),
|
||||||
|
Value: slice.Copy(v),
|
||||||
})
|
})
|
||||||
assert.Equal(t, len(goodkvs), numFound)
|
})
|
||||||
for i := 0; i < len(goodkvs); i++ {
|
assert.Equal(t, goodkvs, actual)
|
||||||
assert.Equal(t, true, goodkvs[i].seen)
|
|
||||||
}
|
}
|
||||||
for i := 0; i < len(badkvs); i++ {
|
|
||||||
assert.Equal(t, false, badkvs[i].seen)
|
t.Run("non-empty prefix, empty start", func(t *testing.T) {
|
||||||
|
t.Run("forwards", func(t *testing.T) {
|
||||||
|
t.Run("good", func(t *testing.T) {
|
||||||
|
// Given this prefix...
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
// and empty start range...
|
||||||
|
start := []byte{}
|
||||||
|
// these pairs should be found.
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
kvs[4], // key = "22"
|
||||||
}
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false)
|
||||||
|
})
|
||||||
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("0")
|
||||||
|
start := []byte{}
|
||||||
|
check(t, goodprefix, start, []KeyValue{}, false)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("backwards", func(t *testing.T) {
|
||||||
|
t.Run("good", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte{}
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[4], // key = "22"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true)
|
||||||
|
})
|
||||||
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("0")
|
||||||
|
start := []byte{}
|
||||||
|
check(t, goodprefix, start, []KeyValue{}, true)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("non-empty prefix, non-empty start", func(t *testing.T) {
|
||||||
|
t.Run("forwards", func(t *testing.T) {
|
||||||
|
t.Run("good", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte("1") // start will be upended to goodprefix to start seek from
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
kvs[4], // key = "22"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false)
|
||||||
|
})
|
||||||
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte("3") // start is more than all keys prefixed by '2'.
|
||||||
|
check(t, goodprefix, start, []KeyValue{}, false)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("backwards", func(t *testing.T) {
|
||||||
|
t.Run("good", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte("1") // start will be upended to goodprefix to start seek from
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true)
|
||||||
|
})
|
||||||
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte(".") // start is less than all keys prefixed by '2'.
|
||||||
|
check(t, goodprefix, start, []KeyValue{}, true)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("empty prefix, non-empty start", func(t *testing.T) {
|
||||||
|
t.Run("forwards", func(t *testing.T) {
|
||||||
|
t.Run("good", func(t *testing.T) {
|
||||||
|
goodprefix := []byte{}
|
||||||
|
start := []byte("21")
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
kvs[4], // key = "22"
|
||||||
|
kvs[5], // key = "30"
|
||||||
|
kvs[6], // key = "31"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false)
|
||||||
|
})
|
||||||
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
|
goodprefix := []byte{}
|
||||||
|
start := []byte("32") // start is more than all keys.
|
||||||
|
check(t, goodprefix, start, []KeyValue{}, false)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("backwards", func(t *testing.T) {
|
||||||
|
t.Run("good", func(t *testing.T) {
|
||||||
|
goodprefix := []byte{}
|
||||||
|
start := []byte("21")
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
kvs[1], // key = "11"
|
||||||
|
kvs[0], // key = "10"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true)
|
||||||
|
})
|
||||||
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
|
goodprefix := []byte{}
|
||||||
|
start := []byte("0") // start is less than all keys.
|
||||||
|
check(t, goodprefix, start, []KeyValue{}, true)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("empty prefix, empty start", func(t *testing.T) {
|
||||||
|
goodprefix := []byte{}
|
||||||
|
start := []byte{}
|
||||||
|
goodkvs := make([]KeyValue, len(kvs))
|
||||||
|
copy(goodkvs, kvs)
|
||||||
|
t.Run("forwards", func(t *testing.T) {
|
||||||
|
check(t, goodprefix, start, goodkvs, false)
|
||||||
|
})
|
||||||
|
t.Run("backwards", func(t *testing.T) {
|
||||||
|
check(t, goodprefix, start, goodkvs, true)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue