forked from TrueCloudLab/neoneo-go
Merge pull request #2330 from nspcc-dev/transfer-logs-opt
core: improve Seek and optimise seek time for transfer logs
This commit is contained in:
commit
42769d11ff
21 changed files with 391 additions and 139 deletions
|
@ -286,12 +286,12 @@ func (chain *FakeChain) GetTokenLastUpdated(acc util.Uint160) (map[int32]uint32,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForEachNEP17Transfer implements Blockchainer interface.
|
// ForEachNEP17Transfer implements Blockchainer interface.
|
||||||
func (chain *FakeChain) ForEachNEP11Transfer(util.Uint160, func(*state.NEP11Transfer) (bool, error)) error {
|
func (chain *FakeChain) ForEachNEP11Transfer(util.Uint160, uint64, func(*state.NEP11Transfer) (bool, error)) error {
|
||||||
panic("TODO")
|
panic("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForEachNEP17Transfer implements Blockchainer interface.
|
// ForEachNEP17Transfer implements Blockchainer interface.
|
||||||
func (chain *FakeChain) ForEachNEP17Transfer(util.Uint160, func(*state.NEP17Transfer) (bool, error)) error {
|
func (chain *FakeChain) ForEachNEP17Transfer(util.Uint160, uint64, func(*state.NEP17Transfer) (bool, error)) error {
|
||||||
panic("TODO")
|
panic("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,18 @@
|
||||||
package core
|
package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/internal/random"
|
||||||
|
"github.com/nspcc-dev/neo-go/internal/testchain"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
|
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/storage"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
|
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -22,3 +31,84 @@ func BenchmarkVerifyWitness(t *testing.B) {
|
||||||
_, _ = bc.VerifyWitness(tx.Signers[0].Account, tx, &tx.Scripts[0], 100000000)
|
_, _ = bc.VerifyWitness(tx.Signers[0].Account, tx, &tx.Scripts[0], 100000000)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkBlockchain_ForEachNEP17Transfer(t *testing.B) {
|
||||||
|
var stores = map[string]func(testing.TB) storage.Store{
|
||||||
|
"MemPS": func(t testing.TB) storage.Store {
|
||||||
|
return storage.NewMemoryStore()
|
||||||
|
},
|
||||||
|
"BoltPS": newBoltStoreForTesting,
|
||||||
|
"LevelPS": newLevelDBForTesting,
|
||||||
|
}
|
||||||
|
startFrom := []int{1, 100, 1000}
|
||||||
|
blocksToTake := []int{100, 1000}
|
||||||
|
for psName, newPS := range stores {
|
||||||
|
for _, startFromBlock := range startFrom {
|
||||||
|
for _, nBlocksToTake := range blocksToTake {
|
||||||
|
t.Run(fmt.Sprintf("%s_StartFromBlockN-%d_Take%dBlocks", psName, startFromBlock, nBlocksToTake), func(t *testing.B) {
|
||||||
|
ps := newPS(t)
|
||||||
|
t.Cleanup(func() { ps.Close() })
|
||||||
|
benchmarkForEachNEP17Transfer(t, ps, startFromBlock, nBlocksToTake)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkForEachNEP17Transfer(t *testing.B, ps storage.Store, startFromBlock, nBlocksToTake int) {
|
||||||
|
var (
|
||||||
|
nonce uint32 = 1
|
||||||
|
chainHeight = 2_100 // constant chain height to be able to compare paging results
|
||||||
|
transfersPerBlock = state.TokenTransferBatchSize/4 + // 4 blocks per batch
|
||||||
|
state.TokenTransferBatchSize/32 // shift
|
||||||
|
)
|
||||||
|
|
||||||
|
bc := newTestChainWithCustomCfgAndStore(t, ps, nil)
|
||||||
|
gasHash := bc.contracts.GAS.Hash
|
||||||
|
acc := random.Uint160()
|
||||||
|
|
||||||
|
for j := 0; j < chainHeight; j++ {
|
||||||
|
w := io.NewBufBinWriter()
|
||||||
|
for i := 0; i < transfersPerBlock; i++ {
|
||||||
|
emit.AppCall(w.BinWriter, gasHash, "transfer", callflag.All, testchain.MultisigScriptHash(), acc, 1, nil)
|
||||||
|
emit.Opcodes(w.BinWriter, opcode.ASSERT)
|
||||||
|
require.NoError(t, w.Err)
|
||||||
|
}
|
||||||
|
script := w.Bytes()
|
||||||
|
tx := transaction.New(script, int64(1100_0000*transfersPerBlock))
|
||||||
|
tx.ValidUntilBlock = bc.BlockHeight() + 1
|
||||||
|
tx.Nonce = nonce
|
||||||
|
nonce++
|
||||||
|
tx.Signers = []transaction.Signer{{
|
||||||
|
Account: testchain.MultisigScriptHash(),
|
||||||
|
Scopes: transaction.CalledByEntry,
|
||||||
|
AllowedContracts: nil,
|
||||||
|
AllowedGroups: nil,
|
||||||
|
}}
|
||||||
|
require.NoError(t, testchain.SignTx(bc, tx))
|
||||||
|
b := bc.newBlock(tx)
|
||||||
|
require.NoError(t, bc.AddBlock(b))
|
||||||
|
checkTxHalt(t, bc, tx.Hash())
|
||||||
|
}
|
||||||
|
|
||||||
|
newestB, err := bc.GetBlock(bc.GetHeaderHash(int(bc.BlockHeight()) - startFromBlock + 1))
|
||||||
|
require.NoError(t, err)
|
||||||
|
newestTimestamp := newestB.Timestamp
|
||||||
|
oldestB, err := bc.GetBlock(bc.GetHeaderHash(int(newestB.Index) - nBlocksToTake))
|
||||||
|
require.NoError(t, err)
|
||||||
|
oldestTimestamp := oldestB.Timestamp
|
||||||
|
|
||||||
|
t.ResetTimer()
|
||||||
|
t.ReportAllocs()
|
||||||
|
t.StartTimer()
|
||||||
|
for i := 0; i < t.N; i++ {
|
||||||
|
require.NoError(t, bc.ForEachNEP17Transfer(acc, newestTimestamp, func(t *state.NEP17Transfer) (bool, error) {
|
||||||
|
if t.Timestamp < oldestTimestamp {
|
||||||
|
// iterating from newest to oldest, already have reached the needed height
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
t.StopTimer()
|
||||||
|
}
|
||||||
|
|
|
@ -486,9 +486,10 @@ func (bc *Blockchain) removeOldStorageItems() {
|
||||||
|
|
||||||
b := bc.dao.Store.Batch()
|
b := bc.dao.Store.Batch()
|
||||||
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
|
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
|
||||||
bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) {
|
bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) bool {
|
||||||
// #1468, but don't need to copy here, because it is done by Store.
|
// #1468, but don't need to copy here, because it is done by Store.
|
||||||
b.Delete(k)
|
b.Delete(k)
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
b.Delete(storage.SYSCleanStorage.Bytes())
|
b.Delete(storage.SYSCleanStorage.Bytes())
|
||||||
|
|
||||||
|
@ -1022,14 +1023,14 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !trData.Info.NewNEP11Batch {
|
if !trData.Info.NewNEP11Batch {
|
||||||
err = kvcache.PutTokenTransferLog(acc, trData.Info.NextNEP11Batch, true, &trData.Log11)
|
err = kvcache.PutTokenTransferLog(acc, trData.Info.NextNEP11NewestTimestamp, trData.Info.NextNEP11Batch, true, &trData.Log11)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
aerdone <- err
|
aerdone <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !trData.Info.NewNEP17Batch {
|
if !trData.Info.NewNEP17Batch {
|
||||||
err = kvcache.PutTokenTransferLog(acc, trData.Info.NextNEP17Batch, false, &trData.Log17)
|
err = kvcache.PutTokenTransferLog(acc, trData.Info.NextNEP17NewestTimestamp, trData.Info.NextNEP17Batch, false, &trData.Log17)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
aerdone <- err
|
aerdone <- err
|
||||||
return
|
return
|
||||||
|
@ -1333,18 +1334,18 @@ func (bc *Blockchain) processTokenTransfer(cache dao.DAO, transCache map[util.Ui
|
||||||
}
|
}
|
||||||
if !from.Equals(util.Uint160{}) {
|
if !from.Equals(util.Uint160{}) {
|
||||||
_ = nep17xfer.Amount.Neg(amount) // We already have the Int.
|
_ = nep17xfer.Amount.Neg(amount) // We already have the Int.
|
||||||
if appendTokenTransfer(cache, transCache, from, transfer, id, b.Index, isNEP11) != nil {
|
if appendTokenTransfer(cache, transCache, from, transfer, id, b.Index, b.Timestamp, isNEP11) != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !to.Equals(util.Uint160{}) {
|
if !to.Equals(util.Uint160{}) {
|
||||||
_ = nep17xfer.Amount.Set(amount) // We already have the Int.
|
_ = nep17xfer.Amount.Set(amount) // We already have the Int.
|
||||||
_ = appendTokenTransfer(cache, transCache, to, transfer, id, b.Index, isNEP11) // Nothing useful we can do.
|
_ = appendTokenTransfer(cache, transCache, to, transfer, id, b.Index, b.Timestamp, isNEP11) // Nothing useful we can do.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendTokenTransfer(cache dao.DAO, transCache map[util.Uint160]transferData, addr util.Uint160, transfer io.Serializable,
|
func appendTokenTransfer(cache dao.DAO, transCache map[util.Uint160]transferData, addr util.Uint160, transfer io.Serializable,
|
||||||
token int32, bIndex uint32, isNEP11 bool) error {
|
token int32, bIndex uint32, bTimestamp uint64, isNEP11 bool) error {
|
||||||
transferData, ok := transCache[addr]
|
transferData, ok := transCache[addr]
|
||||||
if !ok {
|
if !ok {
|
||||||
balances, err := cache.GetTokenTransferInfo(addr)
|
balances, err := cache.GetTokenTransferInfo(addr)
|
||||||
|
@ -1352,14 +1353,14 @@ func appendTokenTransfer(cache dao.DAO, transCache map[util.Uint160]transferData
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !balances.NewNEP11Batch {
|
if !balances.NewNEP11Batch {
|
||||||
trLog, err := cache.GetTokenTransferLog(addr, balances.NextNEP11Batch, true)
|
trLog, err := cache.GetTokenTransferLog(addr, balances.NextNEP11NewestTimestamp, balances.NextNEP11Batch, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
transferData.Log11 = *trLog
|
transferData.Log11 = *trLog
|
||||||
}
|
}
|
||||||
if !balances.NewNEP17Batch {
|
if !balances.NewNEP17Batch {
|
||||||
trLog, err := cache.GetTokenTransferLog(addr, balances.NextNEP17Batch, false)
|
trLog, err := cache.GetTokenTransferLog(addr, balances.NextNEP17NewestTimestamp, balances.NextNEP17Batch, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1368,18 +1369,21 @@ func appendTokenTransfer(cache dao.DAO, transCache map[util.Uint160]transferData
|
||||||
transferData.Info = *balances
|
transferData.Info = *balances
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
log *state.TokenTransferLog
|
log *state.TokenTransferLog
|
||||||
newBatch *bool
|
newBatch *bool
|
||||||
nextBatch *uint32
|
nextBatch *uint32
|
||||||
|
currTimestamp *uint64
|
||||||
)
|
)
|
||||||
if !isNEP11 {
|
if !isNEP11 {
|
||||||
log = &transferData.Log17
|
log = &transferData.Log17
|
||||||
newBatch = &transferData.Info.NewNEP17Batch
|
newBatch = &transferData.Info.NewNEP17Batch
|
||||||
nextBatch = &transferData.Info.NextNEP17Batch
|
nextBatch = &transferData.Info.NextNEP17Batch
|
||||||
|
currTimestamp = &transferData.Info.NextNEP17NewestTimestamp
|
||||||
} else {
|
} else {
|
||||||
log = &transferData.Log11
|
log = &transferData.Log11
|
||||||
newBatch = &transferData.Info.NewNEP11Batch
|
newBatch = &transferData.Info.NewNEP11Batch
|
||||||
nextBatch = &transferData.Info.NextNEP11Batch
|
nextBatch = &transferData.Info.NextNEP11Batch
|
||||||
|
currTimestamp = &transferData.Info.NextNEP11NewestTimestamp
|
||||||
}
|
}
|
||||||
err := log.Append(transfer)
|
err := log.Append(transfer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1388,11 +1392,12 @@ func appendTokenTransfer(cache dao.DAO, transCache map[util.Uint160]transferData
|
||||||
transferData.Info.LastUpdated[token] = bIndex
|
transferData.Info.LastUpdated[token] = bIndex
|
||||||
*newBatch = log.Size() >= state.TokenTransferBatchSize
|
*newBatch = log.Size() >= state.TokenTransferBatchSize
|
||||||
if *newBatch {
|
if *newBatch {
|
||||||
err = cache.PutTokenTransferLog(addr, *nextBatch, isNEP11, log)
|
err = cache.PutTokenTransferLog(addr, *currTimestamp, *nextBatch, isNEP11, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
*nextBatch++
|
*nextBatch++
|
||||||
|
*currTimestamp = bTimestamp
|
||||||
// Put makes a copy of it anyway.
|
// Put makes a copy of it anyway.
|
||||||
log.Raw = log.Raw[:0]
|
log.Raw = log.Raw[:0]
|
||||||
}
|
}
|
||||||
|
@ -1400,48 +1405,18 @@ func appendTokenTransfer(cache dao.DAO, transCache map[util.Uint160]transferData
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForEachNEP17Transfer executes f for each NEP-17 transfer in log.
|
// ForEachNEP17Transfer executes f for each NEP-17 transfer in log starting from
|
||||||
func (bc *Blockchain) ForEachNEP17Transfer(acc util.Uint160, f func(*state.NEP17Transfer) (bool, error)) error {
|
// the transfer with the newest timestamp up to the oldest transfer. It continues
|
||||||
balances, err := bc.dao.GetTokenTransferInfo(acc)
|
// iteration until false is returned from f. The last non-nil error is returned.
|
||||||
if err != nil {
|
func (bc *Blockchain) ForEachNEP17Transfer(acc util.Uint160, newestTimestamp uint64, f func(*state.NEP17Transfer) (bool, error)) error {
|
||||||
return nil
|
return bc.dao.SeekNEP17TransferLog(acc, newestTimestamp, f)
|
||||||
}
|
|
||||||
for i := int(balances.NextNEP17Batch); i >= 0; i-- {
|
|
||||||
lg, err := bc.dao.GetTokenTransferLog(acc, uint32(i), false)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
cont, err := lg.ForEachNEP17(f)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if !cont {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForEachNEP11Transfer executes f for each NEP-11 transfer in log.
|
// ForEachNEP11Transfer executes f for each NEP-11 transfer in log starting from
|
||||||
func (bc *Blockchain) ForEachNEP11Transfer(acc util.Uint160, f func(*state.NEP11Transfer) (bool, error)) error {
|
// the transfer with the newest timestamp up to the oldest transfer. It continues
|
||||||
balances, err := bc.dao.GetTokenTransferInfo(acc)
|
// iteration until false is returned from f. The last non-nil error is returned.
|
||||||
if err != nil {
|
func (bc *Blockchain) ForEachNEP11Transfer(acc util.Uint160, newestTimestamp uint64, f func(*state.NEP11Transfer) (bool, error)) error {
|
||||||
return nil
|
return bc.dao.SeekNEP11TransferLog(acc, newestTimestamp, f)
|
||||||
}
|
|
||||||
for i := int(balances.NextNEP11Batch); i >= 0; i-- {
|
|
||||||
lg, err := bc.dao.GetTokenTransferLog(acc, uint32(i), true)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
cont, err := lg.ForEachNEP11(f)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if !cont {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNEP17Contracts returns the list of deployed NEP-17 contracts.
|
// GetNEP17Contracts returns the list of deployed NEP-17 contracts.
|
||||||
|
|
|
@ -1768,11 +1768,12 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
|
||||||
if bcSpout.dao.Version.StoragePrefix == tempPrefix {
|
if bcSpout.dao.Version.StoragePrefix == tempPrefix {
|
||||||
tempPrefix = storage.STStorage
|
tempPrefix = storage.STStorage
|
||||||
}
|
}
|
||||||
bcSpout.dao.Store.Seek(storage.SeekRange{Prefix: bcSpout.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) {
|
bcSpout.dao.Store.Seek(storage.SeekRange{Prefix: bcSpout.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
key[0] = byte(tempPrefix)
|
key[0] = byte(tempPrefix)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
batch.Put(key, value)
|
batch.Put(key, value)
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
require.NoError(t, bcSpout.dao.Store.PutBatch(batch))
|
require.NoError(t, bcSpout.dao.Store.PutBatch(batch))
|
||||||
|
|
||||||
|
|
|
@ -36,8 +36,8 @@ type Blockchainer interface {
|
||||||
GetContractScriptHash(id int32) (util.Uint160, error)
|
GetContractScriptHash(id int32) (util.Uint160, error)
|
||||||
GetEnrollments() ([]state.Validator, error)
|
GetEnrollments() ([]state.Validator, error)
|
||||||
GetGoverningTokenBalance(acc util.Uint160) (*big.Int, uint32)
|
GetGoverningTokenBalance(acc util.Uint160) (*big.Int, uint32)
|
||||||
ForEachNEP11Transfer(util.Uint160, func(*state.NEP11Transfer) (bool, error)) error
|
ForEachNEP11Transfer(acc util.Uint160, newestTimestamp uint64, f func(*state.NEP11Transfer) (bool, error)) error
|
||||||
ForEachNEP17Transfer(util.Uint160, func(*state.NEP17Transfer) (bool, error)) error
|
ForEachNEP17Transfer(acc util.Uint160, newestTimestamp uint64, f func(*state.NEP17Transfer) (bool, error)) error
|
||||||
GetHeaderHash(int) util.Uint256
|
GetHeaderHash(int) util.Uint256
|
||||||
GetHeader(hash util.Uint256) (*block.Header, error)
|
GetHeader(hash util.Uint256) (*block.Header, error)
|
||||||
CurrentHeaderHash() util.Uint256
|
CurrentHeaderHash() util.Uint256
|
||||||
|
|
|
@ -42,7 +42,7 @@ type DAO interface {
|
||||||
GetCurrentHeaderHeight() (i uint32, h util.Uint256, err error)
|
GetCurrentHeaderHeight() (i uint32, h util.Uint256, err error)
|
||||||
GetHeaderHashes() ([]util.Uint256, error)
|
GetHeaderHashes() ([]util.Uint256, error)
|
||||||
GetTokenTransferInfo(acc util.Uint160) (*state.TokenTransferInfo, error)
|
GetTokenTransferInfo(acc util.Uint160) (*state.TokenTransferInfo, error)
|
||||||
GetTokenTransferLog(acc util.Uint160, index uint32, isNEP11 bool) (*state.TokenTransferLog, error)
|
GetTokenTransferLog(acc util.Uint160, start uint64, index uint32, isNEP11 bool) (*state.TokenTransferLog, error)
|
||||||
GetStateSyncPoint() (uint32, error)
|
GetStateSyncPoint() (uint32, error)
|
||||||
GetStateSyncCurrentBlockHeight() (uint32, error)
|
GetStateSyncCurrentBlockHeight() (uint32, error)
|
||||||
GetStorageItem(id int32, key []byte) state.StorageItem
|
GetStorageItem(id int32, key []byte) state.StorageItem
|
||||||
|
@ -57,12 +57,12 @@ type DAO interface {
|
||||||
PutContractID(id int32, hash util.Uint160) error
|
PutContractID(id int32, hash util.Uint160) error
|
||||||
PutCurrentHeader(hashAndIndex []byte) error
|
PutCurrentHeader(hashAndIndex []byte) error
|
||||||
PutTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo) error
|
PutTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo) error
|
||||||
PutTokenTransferLog(acc util.Uint160, index uint32, isNEP11 bool, lg *state.TokenTransferLog) error
|
PutTokenTransferLog(acc util.Uint160, start uint64, index uint32, isNEP11 bool, lg *state.TokenTransferLog) error
|
||||||
PutStateSyncPoint(p uint32) error
|
PutStateSyncPoint(p uint32) error
|
||||||
PutStateSyncCurrentBlockHeight(h uint32) error
|
PutStateSyncCurrentBlockHeight(h uint32) error
|
||||||
PutStorageItem(id int32, key []byte, si state.StorageItem) error
|
PutStorageItem(id int32, key []byte, si state.StorageItem) error
|
||||||
PutVersion(v Version) error
|
PutVersion(v Version) error
|
||||||
Seek(id int32, rng storage.SeekRange, f func(k, v []byte))
|
Seek(id int32, rng storage.SeekRange, f func(k, v []byte) bool)
|
||||||
SeekAsync(ctx context.Context, id int32, rng storage.SeekRange) chan storage.KeyValue
|
SeekAsync(ctx context.Context, id int32, rng storage.SeekRange) chan storage.KeyValue
|
||||||
StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error
|
StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error
|
||||||
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
|
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
|
||||||
|
@ -180,21 +180,66 @@ func (dao *Simple) putTokenTransferInfo(acc util.Uint160, bs *state.TokenTransfe
|
||||||
|
|
||||||
// -- start transfer log.
|
// -- start transfer log.
|
||||||
|
|
||||||
func getTokenTransferLogKey(acc util.Uint160, index uint32, isNEP11 bool) []byte {
|
func getTokenTransferLogKey(acc util.Uint160, newestTimestamp uint64, index uint32, isNEP11 bool) []byte {
|
||||||
key := make([]byte, 1+util.Uint160Size+4)
|
key := make([]byte, 1+util.Uint160Size+8+4)
|
||||||
if isNEP11 {
|
if isNEP11 {
|
||||||
key[0] = byte(storage.STNEP11Transfers)
|
key[0] = byte(storage.STNEP11Transfers)
|
||||||
} else {
|
} else {
|
||||||
key[0] = byte(storage.STNEP17Transfers)
|
key[0] = byte(storage.STNEP17Transfers)
|
||||||
}
|
}
|
||||||
copy(key[1:], acc.BytesBE())
|
copy(key[1:], acc.BytesBE())
|
||||||
binary.LittleEndian.PutUint32(key[util.Uint160Size:], index)
|
binary.BigEndian.PutUint64(key[1+util.Uint160Size:], newestTimestamp)
|
||||||
|
binary.BigEndian.PutUint32(key[1+util.Uint160Size+8:], index)
|
||||||
return key
|
return key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SeekNEP17TransferLog executes f for each NEP-17 transfer in log starting from
|
||||||
|
// the transfer with the newest timestamp up to the oldest transfer. It continues
|
||||||
|
// iteration until false is returned from f. The last non-nil error is returned.
|
||||||
|
func (dao *Simple) SeekNEP17TransferLog(acc util.Uint160, newestTimestamp uint64, f func(*state.NEP17Transfer) (bool, error)) error {
|
||||||
|
key := getTokenTransferLogKey(acc, newestTimestamp, 0, false)
|
||||||
|
prefixLen := 1 + util.Uint160Size
|
||||||
|
var seekErr error
|
||||||
|
dao.Store.Seek(storage.SeekRange{
|
||||||
|
Prefix: key[:prefixLen],
|
||||||
|
Start: key[prefixLen : prefixLen+8],
|
||||||
|
Backwards: true,
|
||||||
|
}, func(k, v []byte) bool {
|
||||||
|
lg := &state.TokenTransferLog{Raw: v}
|
||||||
|
cont, err := lg.ForEachNEP17(f)
|
||||||
|
if err != nil {
|
||||||
|
seekErr = err
|
||||||
|
}
|
||||||
|
return cont
|
||||||
|
})
|
||||||
|
return seekErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// SeekNEP11TransferLog executes f for each NEP-11 transfer in log starting from
|
||||||
|
// the transfer with the newest timestamp up to the oldest transfer. It continues
|
||||||
|
// iteration until false is returned from f. The last non-nil error is returned.
|
||||||
|
func (dao *Simple) SeekNEP11TransferLog(acc util.Uint160, newestTimestamp uint64, f func(*state.NEP11Transfer) (bool, error)) error {
|
||||||
|
key := getTokenTransferLogKey(acc, newestTimestamp, 0, true)
|
||||||
|
prefixLen := 1 + util.Uint160Size
|
||||||
|
var seekErr error
|
||||||
|
dao.Store.Seek(storage.SeekRange{
|
||||||
|
Prefix: key[:prefixLen],
|
||||||
|
Start: key[prefixLen : prefixLen+8],
|
||||||
|
Backwards: true,
|
||||||
|
}, func(k, v []byte) bool {
|
||||||
|
lg := &state.TokenTransferLog{Raw: v}
|
||||||
|
cont, err := lg.ForEachNEP11(f)
|
||||||
|
if err != nil {
|
||||||
|
seekErr = err
|
||||||
|
}
|
||||||
|
return cont
|
||||||
|
})
|
||||||
|
return seekErr
|
||||||
|
}
|
||||||
|
|
||||||
// GetTokenTransferLog retrieves transfer log from the cache.
|
// GetTokenTransferLog retrieves transfer log from the cache.
|
||||||
func (dao *Simple) GetTokenTransferLog(acc util.Uint160, index uint32, isNEP11 bool) (*state.TokenTransferLog, error) {
|
func (dao *Simple) GetTokenTransferLog(acc util.Uint160, newestTimestamp uint64, index uint32, isNEP11 bool) (*state.TokenTransferLog, error) {
|
||||||
key := getTokenTransferLogKey(acc, index, isNEP11)
|
key := getTokenTransferLogKey(acc, newestTimestamp, index, isNEP11)
|
||||||
value, err := dao.Store.Get(key)
|
value, err := dao.Store.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == storage.ErrKeyNotFound {
|
if err == storage.ErrKeyNotFound {
|
||||||
|
@ -206,8 +251,8 @@ func (dao *Simple) GetTokenTransferLog(acc util.Uint160, index uint32, isNEP11 b
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutTokenTransferLog saves given transfer log in the cache.
|
// PutTokenTransferLog saves given transfer log in the cache.
|
||||||
func (dao *Simple) PutTokenTransferLog(acc util.Uint160, index uint32, isNEP11 bool, lg *state.TokenTransferLog) error {
|
func (dao *Simple) PutTokenTransferLog(acc util.Uint160, start uint64, index uint32, isNEP11 bool, lg *state.TokenTransferLog) error {
|
||||||
key := getTokenTransferLogKey(acc, index, isNEP11)
|
key := getTokenTransferLogKey(acc, start, index, isNEP11)
|
||||||
return dao.Store.Put(key, lg.Raw)
|
return dao.Store.Put(key, lg.Raw)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,13 +337,14 @@ func (dao *Simple) GetStorageItems(id int32) ([]state.StorageItemWithKey, error)
|
||||||
func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.StorageItemWithKey, error) {
|
func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.StorageItemWithKey, error) {
|
||||||
var siArr []state.StorageItemWithKey
|
var siArr []state.StorageItemWithKey
|
||||||
|
|
||||||
saveToArr := func(k, v []byte) {
|
saveToArr := func(k, v []byte) bool {
|
||||||
// Cut prefix and hash.
|
// Cut prefix and hash.
|
||||||
// #1468, but don't need to copy here, because it is done by Store.
|
// #1468, but don't need to copy here, because it is done by Store.
|
||||||
siArr = append(siArr, state.StorageItemWithKey{
|
siArr = append(siArr, state.StorageItemWithKey{
|
||||||
Key: k,
|
Key: k,
|
||||||
Item: state.StorageItem(v),
|
Item: state.StorageItem(v),
|
||||||
})
|
})
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
dao.Seek(id, storage.SeekRange{Prefix: prefix}, saveToArr)
|
dao.Seek(id, storage.SeekRange{Prefix: prefix}, saveToArr)
|
||||||
return siArr, nil
|
return siArr, nil
|
||||||
|
@ -306,11 +352,11 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S
|
||||||
|
|
||||||
// Seek executes f for all storage items matching a given `rng` (matching given prefix and
|
// Seek executes f for all storage items matching a given `rng` (matching given prefix and
|
||||||
// starting from the point specified). If key or value is to be used outside of f, they
|
// starting from the point specified). If key or value is to be used outside of f, they
|
||||||
// may not be copied.
|
// may not be copied. Seek continues iterating until false is returned from f.
|
||||||
func (dao *Simple) Seek(id int32, rng storage.SeekRange, f func(k, v []byte)) {
|
func (dao *Simple) Seek(id int32, rng storage.SeekRange, f func(k, v []byte) bool) {
|
||||||
rng.Prefix = makeStorageItemKey(dao.Version.StoragePrefix, id, rng.Prefix)
|
rng.Prefix = makeStorageItemKey(dao.Version.StoragePrefix, id, rng.Prefix)
|
||||||
dao.Store.Seek(rng, func(k, v []byte) {
|
dao.Store.Seek(rng, func(k, v []byte) bool {
|
||||||
f(k[len(rng.Prefix):], v)
|
return f(k[len(rng.Prefix):], v)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,13 +523,14 @@ func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
|
||||||
hashMap := make(map[uint32][]util.Uint256)
|
hashMap := make(map[uint32][]util.Uint256)
|
||||||
dao.Store.Seek(storage.SeekRange{
|
dao.Store.Seek(storage.SeekRange{
|
||||||
Prefix: storage.IXHeaderHashList.Bytes(),
|
Prefix: storage.IXHeaderHashList.Bytes(),
|
||||||
}, func(k, v []byte) {
|
}, func(k, v []byte) bool {
|
||||||
storedCount := binary.LittleEndian.Uint32(k[1:])
|
storedCount := binary.LittleEndian.Uint32(k[1:])
|
||||||
hashes, err := read2000Uint256Hashes(v)
|
hashes, err := read2000Uint256Hashes(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
hashMap[storedCount] = hashes
|
hashMap[storedCount] = hashes
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -503,13 +503,14 @@ func (m *Management) InitializeCache(d dao.DAO) error {
|
||||||
defer m.mtx.Unlock()
|
defer m.mtx.Unlock()
|
||||||
|
|
||||||
var initErr error
|
var initErr error
|
||||||
d.Seek(m.ID, storage.SeekRange{Prefix: []byte{prefixContract}}, func(_, v []byte) {
|
d.Seek(m.ID, storage.SeekRange{Prefix: []byte{prefixContract}}, func(_, v []byte) bool {
|
||||||
var cs = new(state.Contract)
|
var cs = new(state.Contract)
|
||||||
initErr = stackitem.DeserializeConvertible(v, cs)
|
initErr = stackitem.DeserializeConvertible(v, cs)
|
||||||
if initErr != nil {
|
if initErr != nil {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
m.updateContractCache(cs)
|
m.updateContractCache(cs)
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
return initErr
|
return initErr
|
||||||
}
|
}
|
||||||
|
|
|
@ -396,8 +396,8 @@ func (n *NEO) getGASPerVote(d dao.DAO, key []byte, indexes []uint32) []big.Int {
|
||||||
Prefix: key,
|
Prefix: key,
|
||||||
Start: start,
|
Start: start,
|
||||||
Backwards: true,
|
Backwards: true,
|
||||||
}, func(k, v []byte) {
|
}, func(k, v []byte) bool {
|
||||||
if collected < need && len(k) == 4 {
|
if len(k) == 4 {
|
||||||
num := binary.BigEndian.Uint32(k)
|
num := binary.BigEndian.Uint32(k)
|
||||||
for i, ind := range indexes {
|
for i, ind := range indexes {
|
||||||
if reward[i].Sign() == 0 && num <= ind {
|
if reward[i].Sign() == 0 && num <= ind {
|
||||||
|
@ -406,6 +406,7 @@ func (n *NEO) getGASPerVote(d dao.DAO, key []byte, indexes []uint32) []big.Int {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return collected < need
|
||||||
})
|
})
|
||||||
return reward
|
return reward
|
||||||
}
|
}
|
||||||
|
@ -601,8 +602,9 @@ func (n *NEO) dropCandidateIfZero(d dao.DAO, pub *keys.PublicKey, c *candidate)
|
||||||
|
|
||||||
var toRemove []string
|
var toRemove []string
|
||||||
voterKey := makeVoterKey(pub.Bytes())
|
voterKey := makeVoterKey(pub.Bytes())
|
||||||
d.Seek(n.ID, storage.SeekRange{Prefix: voterKey}, func(k, v []byte) {
|
d.Seek(n.ID, storage.SeekRange{Prefix: voterKey}, func(k, v []byte) bool {
|
||||||
toRemove = append(toRemove, string(k))
|
toRemove = append(toRemove, string(k))
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
for i := range toRemove {
|
for i := range toRemove {
|
||||||
if err := d.DeleteStorageItem(n.ID, []byte(toRemove[i])); err != nil {
|
if err := d.DeleteStorageItem(n.ID, []byte(toRemove[i])); err != nil {
|
||||||
|
|
|
@ -53,6 +53,10 @@ type TokenTransferInfo struct {
|
||||||
NextNEP11Batch uint32
|
NextNEP11Batch uint32
|
||||||
// NextNEP17Batch stores the index of the next NEP-17 transfer batch.
|
// NextNEP17Batch stores the index of the next NEP-17 transfer batch.
|
||||||
NextNEP17Batch uint32
|
NextNEP17Batch uint32
|
||||||
|
// NextNEP11NewestTimestamp stores the block timestamp of the first NEP-11 transfer in raw.
|
||||||
|
NextNEP11NewestTimestamp uint64
|
||||||
|
// NextNEP17NewestTimestamp stores the block timestamp of the first NEP-17 transfer in raw.
|
||||||
|
NextNEP17NewestTimestamp uint64
|
||||||
// NewNEP11Batch is true if batch with the `NextNEP11Batch` index should be created.
|
// NewNEP11Batch is true if batch with the `NextNEP11Batch` index should be created.
|
||||||
NewNEP11Batch bool
|
NewNEP11Batch bool
|
||||||
// NewNEP17Batch is true if batch with the `NextNEP17Batch` index should be created.
|
// NewNEP17Batch is true if batch with the `NextNEP17Batch` index should be created.
|
||||||
|
@ -72,6 +76,8 @@ func NewTokenTransferInfo() *TokenTransferInfo {
|
||||||
func (bs *TokenTransferInfo) DecodeBinary(r *io.BinReader) {
|
func (bs *TokenTransferInfo) DecodeBinary(r *io.BinReader) {
|
||||||
bs.NextNEP11Batch = r.ReadU32LE()
|
bs.NextNEP11Batch = r.ReadU32LE()
|
||||||
bs.NextNEP17Batch = r.ReadU32LE()
|
bs.NextNEP17Batch = r.ReadU32LE()
|
||||||
|
bs.NextNEP11NewestTimestamp = r.ReadU64LE()
|
||||||
|
bs.NextNEP17NewestTimestamp = r.ReadU64LE()
|
||||||
bs.NewNEP11Batch = r.ReadBool()
|
bs.NewNEP11Batch = r.ReadBool()
|
||||||
bs.NewNEP17Batch = r.ReadBool()
|
bs.NewNEP17Batch = r.ReadBool()
|
||||||
lenBalances := r.ReadVarUint()
|
lenBalances := r.ReadVarUint()
|
||||||
|
@ -87,6 +93,8 @@ func (bs *TokenTransferInfo) DecodeBinary(r *io.BinReader) {
|
||||||
func (bs *TokenTransferInfo) EncodeBinary(w *io.BinWriter) {
|
func (bs *TokenTransferInfo) EncodeBinary(w *io.BinWriter) {
|
||||||
w.WriteU32LE(bs.NextNEP11Batch)
|
w.WriteU32LE(bs.NextNEP11Batch)
|
||||||
w.WriteU32LE(bs.NextNEP17Batch)
|
w.WriteU32LE(bs.NextNEP17Batch)
|
||||||
|
w.WriteU64LE(bs.NextNEP11NewestTimestamp)
|
||||||
|
w.WriteU64LE(bs.NextNEP17NewestTimestamp)
|
||||||
w.WriteBool(bs.NewNEP11Batch)
|
w.WriteBool(bs.NewNEP11Batch)
|
||||||
w.WriteBool(bs.NewNEP17Batch)
|
w.WriteBool(bs.NewNEP17Batch)
|
||||||
w.WriteVarUint(uint64(len(bs.LastUpdated)))
|
w.WriteVarUint(uint64(len(bs.LastUpdated)))
|
||||||
|
|
|
@ -134,9 +134,10 @@ func (s *Module) CleanStorage() error {
|
||||||
return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load())
|
return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load())
|
||||||
}
|
}
|
||||||
b := s.Store.Batch()
|
b := s.Store.Batch()
|
||||||
s.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, _ []byte) {
|
s.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, _ []byte) bool {
|
||||||
// #1468, but don't need to copy here, because it is done by Store.
|
// #1468, but don't need to copy here, because it is done by Store.
|
||||||
b.Delete(k)
|
b.Delete(k)
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
err := s.Store.PutBatch(b)
|
err := s.Store.PutBatch(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -32,7 +32,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
||||||
nodes = make(map[util.Uint256][]byte)
|
nodes = make(map[util.Uint256][]byte)
|
||||||
expectedItems []storage.KeyValue
|
expectedItems []storage.KeyValue
|
||||||
)
|
)
|
||||||
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) {
|
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
expectedItems = append(expectedItems, storage.KeyValue{
|
expectedItems = append(expectedItems, storage.KeyValue{
|
||||||
|
@ -43,6 +43,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
nodeBytes := value[:len(value)-4]
|
nodeBytes := value[:len(value)-4]
|
||||||
nodes[hash] = nodeBytes
|
nodes[hash] = nodeBytes
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
actualStorage := storage.NewMemCachedStore(storage.NewMemoryStore())
|
actualStorage := storage.NewMemCachedStore(storage.NewMemoryStore())
|
||||||
|
@ -95,13 +96,14 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
||||||
|
|
||||||
// Compare resulting storage items and refcounts.
|
// Compare resulting storage items and refcounts.
|
||||||
var actualItems []storage.KeyValue
|
var actualItems []storage.KeyValue
|
||||||
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) {
|
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
actualItems = append(actualItems, storage.KeyValue{
|
actualItems = append(actualItems, storage.KeyValue{
|
||||||
Key: key,
|
Key: key,
|
||||||
Value: value,
|
Value: value,
|
||||||
})
|
})
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
require.ElementsMatch(t, expectedItems, actualItems)
|
require.ElementsMatch(t, expectedItems, actualItems)
|
||||||
}
|
}
|
||||||
|
|
|
@ -424,7 +424,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
// compare storage states
|
// compare storage states
|
||||||
fetchStorage := func(bc *Blockchain) []storage.KeyValue {
|
fetchStorage := func(bc *Blockchain) []storage.KeyValue {
|
||||||
var kv []storage.KeyValue
|
var kv []storage.KeyValue
|
||||||
bc.dao.Store.Seek(storage.SeekRange{Prefix: bc.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) {
|
bc.dao.Store.Seek(storage.SeekRange{Prefix: bc.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
if key[0] == byte(storage.STTempStorage) {
|
if key[0] == byte(storage.STTempStorage) {
|
||||||
|
@ -434,6 +434,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
Key: key,
|
Key: key,
|
||||||
Value: value,
|
Value: value,
|
||||||
})
|
})
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
return kv
|
return kv
|
||||||
}
|
}
|
||||||
|
@ -444,8 +445,9 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
// no temp items should be left
|
// no temp items should be left
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
var haveItems bool
|
var haveItems bool
|
||||||
bcBolt.dao.Store.Seek(storage.SeekRange{Prefix: storage.STStorage.Bytes()}, func(_, _ []byte) {
|
bcBolt.dao.Store.Seek(storage.SeekRange{Prefix: storage.STStorage.Bytes()}, func(_, _ []byte) bool {
|
||||||
haveItems = true
|
haveItems = true
|
||||||
|
return false
|
||||||
})
|
})
|
||||||
return !haveItems
|
return !haveItems
|
||||||
}, time.Second*5, time.Millisecond*100)
|
}, time.Second*5, time.Millisecond*100)
|
||||||
|
|
|
@ -109,7 +109,7 @@ func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
||||||
copy(start, rng.Prefix)
|
copy(start, rng.Prefix)
|
||||||
copy(start[len(rng.Prefix):], rng.Start)
|
copy(start[len(rng.Prefix):], rng.Start)
|
||||||
|
@ -120,13 +120,15 @@ func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte)) {
|
func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte) bool) {
|
||||||
prefix := util.BytesPrefix(key)
|
prefix := util.BytesPrefix(key)
|
||||||
prefix.Start = start
|
prefix.Start = start
|
||||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||||
c := tx.Bucket(Bucket).Cursor()
|
c := tx.Bucket(Bucket).Cursor()
|
||||||
for k, v := c.Seek(prefix.Start); k != nil && (len(prefix.Limit) == 0 || bytes.Compare(k, prefix.Limit) <= 0); k, v = c.Next() {
|
for k, v := c.Seek(prefix.Start); k != nil && (len(prefix.Limit) == 0 || bytes.Compare(k, prefix.Limit) <= 0); k, v = c.Next() {
|
||||||
f(k, v)
|
if !f(k, v) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -135,7 +137,7 @@ func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte)) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte)) {
|
func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte) bool) {
|
||||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||||
c := tx.Bucket(Bucket).Cursor()
|
c := tx.Bucket(Bucket).Cursor()
|
||||||
// Move cursor to the first kv pair which is followed by the pair matching the specified prefix.
|
// Move cursor to the first kv pair which is followed by the pair matching the specified prefix.
|
||||||
|
@ -146,7 +148,9 @@ func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte
|
||||||
rng := util.BytesPrefix(start) // in fact, we only need limit based on start slice to iterate backwards starting from this limit
|
rng := util.BytesPrefix(start) // in fact, we only need limit based on start slice to iterate backwards starting from this limit
|
||||||
c.Seek(rng.Limit)
|
c.Seek(rng.Limit)
|
||||||
for k, v := c.Prev(); k != nil && bytes.HasPrefix(k, key); k, v = c.Prev() {
|
for k, v := c.Prev(); k != nil && bytes.HasPrefix(k, key); k, v = c.Prev() {
|
||||||
f(k, v)
|
if !f(k, v) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -85,7 +85,7 @@ func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
||||||
copy(start, rng.Prefix)
|
copy(start, rng.Prefix)
|
||||||
copy(start[len(rng.Prefix):], rng.Start)
|
copy(start[len(rng.Prefix):], rng.Start)
|
||||||
|
@ -96,23 +96,27 @@ func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *LevelDBStore) seek(key []byte, start []byte, f func(k, v []byte)) {
|
func (s *LevelDBStore) seek(key []byte, start []byte, f func(k, v []byte) bool) {
|
||||||
prefix := util.BytesPrefix(key)
|
prefix := util.BytesPrefix(key)
|
||||||
prefix.Start = start
|
prefix.Start = start
|
||||||
iter := s.db.NewIterator(prefix, nil)
|
iter := s.db.NewIterator(prefix, nil)
|
||||||
for iter.Next() {
|
for iter.Next() {
|
||||||
f(iter.Key(), iter.Value())
|
if !f(iter.Key(), iter.Value()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
iter.Release()
|
iter.Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *LevelDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte)) {
|
func (s *LevelDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte) bool) {
|
||||||
iRange := util.BytesPrefix(start)
|
iRange := util.BytesPrefix(start)
|
||||||
iRange.Start = key
|
iRange.Start = key
|
||||||
|
|
||||||
iter := s.db.NewIterator(iRange, nil)
|
iter := s.db.NewIterator(iRange, nil)
|
||||||
for ok := iter.Last(); ok; ok = iter.Prev() {
|
for ok := iter.Last(); ok; ok = iter.Prev() {
|
||||||
f(iter.Key(), iter.Value())
|
if !f(iter.Key(), iter.Value()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
iter.Release()
|
iter.Release()
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,7 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
s.seek(context.Background(), rng, false, f)
|
s.seek(context.Background(), rng, false, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,11 +100,12 @@ func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue {
|
func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue {
|
||||||
res := make(chan KeyValue)
|
res := make(chan KeyValue)
|
||||||
go func() {
|
go func() {
|
||||||
s.seek(ctx, rng, cutPrefix, func(k, v []byte) {
|
s.seek(ctx, rng, cutPrefix, func(k, v []byte) bool {
|
||||||
res <- KeyValue{
|
res <- KeyValue{
|
||||||
Key: k,
|
Key: k,
|
||||||
Value: v,
|
Value: v,
|
||||||
}
|
}
|
||||||
|
return true // always continue, we have context for early stop.
|
||||||
})
|
})
|
||||||
close(res)
|
close(res)
|
||||||
}()
|
}()
|
||||||
|
@ -117,7 +118,7 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix
|
||||||
// key needs to be cut off the resulting keys. `rng` specifies prefix items must match
|
// key needs to be cut off the resulting keys. `rng` specifies prefix items must match
|
||||||
// and point to start seeking from. Backwards seeking from some point is supported
|
// and point to start seeking from. Backwards seeking from some point is supported
|
||||||
// with corresponding `rng` field set.
|
// with corresponding `rng` field set.
|
||||||
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte)) {
|
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) {
|
||||||
// Create memory store `mem` and `del` snapshot not to hold the lock.
|
// Create memory store `mem` and `del` snapshot not to hold the lock.
|
||||||
var memRes []KeyValueExists
|
var memRes []KeyValueExists
|
||||||
sPrefix := string(rng.Prefix)
|
sPrefix := string(rng.Prefix)
|
||||||
|
@ -176,21 +177,21 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
||||||
haveMem = true
|
haveMem = true
|
||||||
iMem++
|
iMem++
|
||||||
}
|
}
|
||||||
// Merge results of seek operations in ascending order.
|
// Merge results of seek operations in ascending order. It returns whether iterating
|
||||||
mergeFunc := func(k, v []byte) {
|
// should be continued.
|
||||||
|
mergeFunc := func(k, v []byte) bool {
|
||||||
if done {
|
if done {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
kvPs := KeyValue{
|
kvPs := KeyValue{
|
||||||
Key: slice.Copy(k),
|
Key: slice.Copy(k),
|
||||||
Value: slice.Copy(v),
|
Value: slice.Copy(v),
|
||||||
}
|
}
|
||||||
loop:
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
done = true
|
done = true
|
||||||
break loop
|
return false
|
||||||
default:
|
default:
|
||||||
var isMem = haveMem && less(kvMem.Key, kvPs.Key)
|
var isMem = haveMem && less(kvMem.Key, kvPs.Key)
|
||||||
if isMem {
|
if isMem {
|
||||||
|
@ -198,7 +199,10 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
||||||
if cutPrefix {
|
if cutPrefix {
|
||||||
kvMem.Key = kvMem.Key[lPrefix:]
|
kvMem.Key = kvMem.Key[lPrefix:]
|
||||||
}
|
}
|
||||||
f(kvMem.Key, kvMem.Value)
|
if !f(kvMem.Key, kvMem.Value) {
|
||||||
|
done = true
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if iMem < len(memRes) {
|
if iMem < len(memRes) {
|
||||||
kvMem = memRes[iMem]
|
kvMem = memRes[iMem]
|
||||||
|
@ -212,9 +216,12 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
||||||
if cutPrefix {
|
if cutPrefix {
|
||||||
kvPs.Key = kvPs.Key[lPrefix:]
|
kvPs.Key = kvPs.Key[lPrefix:]
|
||||||
}
|
}
|
||||||
f(kvPs.Key, kvPs.Value)
|
if !f(kvPs.Key, kvPs.Value) {
|
||||||
|
done = true
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break loop
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -233,7 +240,9 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
||||||
if cutPrefix {
|
if cutPrefix {
|
||||||
kvMem.Key = kvMem.Key[lPrefix:]
|
kvMem.Key = kvMem.Key[lPrefix:]
|
||||||
}
|
}
|
||||||
f(kvMem.Key, kvMem.Value)
|
if !f(kvMem.Key, kvMem.Value) {
|
||||||
|
break loop
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -167,8 +167,9 @@ func TestCachedSeek(t *testing.T) {
|
||||||
require.NoError(t, ts.Put(v.Key, v.Value))
|
require.NoError(t, ts.Put(v.Key, v.Value))
|
||||||
}
|
}
|
||||||
foundKVs := make(map[string][]byte)
|
foundKVs := make(map[string][]byte)
|
||||||
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) {
|
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) bool {
|
||||||
foundKVs[string(k)] = v
|
foundKVs[string(k)] = v
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
||||||
for _, kv := range lowerKVs {
|
for _, kv := range lowerKVs {
|
||||||
|
@ -232,7 +233,7 @@ func benchmarkCachedSeek(t *testing.B, ps Store, psElementsCount, tsElementsCoun
|
||||||
t.ReportAllocs()
|
t.ReportAllocs()
|
||||||
t.ResetTimer()
|
t.ResetTimer()
|
||||||
for n := 0; n < t.N; n++ {
|
for n := 0; n < t.N; n++ {
|
||||||
ts.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) {})
|
ts.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) bool { return true })
|
||||||
}
|
}
|
||||||
t.StopTimer()
|
t.StopTimer()
|
||||||
}
|
}
|
||||||
|
@ -290,7 +291,7 @@ func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error {
|
||||||
b.onPutBatch()
|
b.onPutBatch()
|
||||||
return ErrKeyNotFound
|
return ErrKeyNotFound
|
||||||
}
|
}
|
||||||
func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
}
|
}
|
||||||
func (b *BadStore) Close() error {
|
func (b *BadStore) Close() error {
|
||||||
return nil
|
return nil
|
||||||
|
@ -365,8 +366,9 @@ func TestCachedSeekSorting(t *testing.T) {
|
||||||
require.NoError(t, ts.Put(v.Key, v.Value))
|
require.NoError(t, ts.Put(v.Key, v.Value))
|
||||||
}
|
}
|
||||||
var foundKVs []KeyValue
|
var foundKVs []KeyValue
|
||||||
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) {
|
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) bool {
|
||||||
foundKVs = append(foundKVs, KeyValue{Key: slice.Copy(k), Value: slice.Copy(v)})
|
foundKVs = append(foundKVs, KeyValue{Key: slice.Copy(k), Value: slice.Copy(v)})
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
||||||
expected := append(lowerKVs, updatedKVs...)
|
expected := append(lowerKVs, updatedKVs...)
|
||||||
|
|
|
@ -104,7 +104,7 @@ func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
s.mut.RLock()
|
s.mut.RLock()
|
||||||
s.seek(rng, f)
|
s.seek(rng, f)
|
||||||
s.mut.RUnlock()
|
s.mut.RUnlock()
|
||||||
|
@ -130,7 +130,7 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
|
||||||
// seek is an internal unlocked implementation of Seek. `start` denotes whether
|
// seek is an internal unlocked implementation of Seek. `start` denotes whether
|
||||||
// seeking starting from the provided prefix should be performed. Backwards
|
// seeking starting from the provided prefix should be performed. Backwards
|
||||||
// seeking from some point is supported with corresponding SeekRange field set.
|
// seeking from some point is supported with corresponding SeekRange field set.
|
||||||
func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) {
|
func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
sPrefix := string(rng.Prefix)
|
sPrefix := string(rng.Prefix)
|
||||||
lPrefix := len(sPrefix)
|
lPrefix := len(sPrefix)
|
||||||
sStart := string(rng.Start)
|
sStart := string(rng.Start)
|
||||||
|
@ -162,7 +162,9 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
return less(memList[i].Key, memList[j].Key)
|
return less(memList[i].Key, memList[j].Key)
|
||||||
})
|
})
|
||||||
for _, kv := range memList {
|
for _, kv := range memList {
|
||||||
f(kv.Key, kv.Value)
|
if !f(kv.Key, kv.Value) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ func BenchmarkMemorySeek(t *testing.B) {
|
||||||
t.ReportAllocs()
|
t.ReportAllocs()
|
||||||
t.ResetTimer()
|
t.ResetTimer()
|
||||||
for n := 0; n < t.N; n++ {
|
for n := 0; n < t.N; n++ {
|
||||||
ms.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) {})
|
ms.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) bool { return false })
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,7 @@ type SeekRange struct {
|
||||||
// Empty Prefix means seeking through all keys in the DB starting from
|
// Empty Prefix means seeking through all keys in the DB starting from
|
||||||
// the Start if specified.
|
// the Start if specified.
|
||||||
Prefix []byte
|
Prefix []byte
|
||||||
// Start denotes value upended to the Prefix to start Seek from.
|
// Start denotes value appended to the Prefix to start Seek from.
|
||||||
// Seeking starting from some key includes this key to the result;
|
// Seeking starting from some key includes this key to the result;
|
||||||
// if no matching key was found then next suitable key is picked up.
|
// if no matching key was found then next suitable key is picked up.
|
||||||
// Start may be empty. Empty Start means seeking through all keys in
|
// Start may be empty. Empty Start means seeking through all keys in
|
||||||
|
@ -90,9 +90,10 @@ type (
|
||||||
// PutChangeSet allows to push prepared changeset to the Store.
|
// PutChangeSet allows to push prepared changeset to the Store.
|
||||||
PutChangeSet(puts map[string][]byte, dels map[string]bool) error
|
PutChangeSet(puts map[string][]byte, dels map[string]bool) error
|
||||||
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
||||||
// Key and value slices should not be modified. Seek can guarantee that key-value items are sorted by
|
// Seek continues iteration until false is returned from f.
|
||||||
// key in ascending way.
|
// Key and value slices should not be modified.
|
||||||
Seek(rng SeekRange, f func(k, v []byte))
|
// Seek can guarantee that key-value items are sorted by key in ascending way.
|
||||||
|
Seek(rng SeekRange, f func(k, v []byte) bool)
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,7 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
require.NoError(t, s.Put(v.Key, v.Value))
|
require.NoError(t, s.Put(v.Key, v.Value))
|
||||||
}
|
}
|
||||||
|
|
||||||
check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue, backwards bool) {
|
check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue, backwards bool, cont func(k, v []byte) bool) {
|
||||||
// Seek result expected to be sorted in an ascending (for forwards seeking) or descending (for backwards seeking) way.
|
// Seek result expected to be sorted in an ascending (for forwards seeking) or descending (for backwards seeking) way.
|
||||||
cmpFunc := func(i, j int) bool {
|
cmpFunc := func(i, j int) bool {
|
||||||
return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) < 0
|
return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) < 0
|
||||||
|
@ -101,11 +101,15 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
rng.Backwards = true
|
rng.Backwards = true
|
||||||
}
|
}
|
||||||
actual := make([]KeyValue, 0, len(goodkvs))
|
actual := make([]KeyValue, 0, len(goodkvs))
|
||||||
s.Seek(rng, func(k, v []byte) {
|
s.Seek(rng, func(k, v []byte) bool {
|
||||||
actual = append(actual, KeyValue{
|
actual = append(actual, KeyValue{
|
||||||
Key: slice.Copy(k),
|
Key: slice.Copy(k),
|
||||||
Value: slice.Copy(v),
|
Value: slice.Copy(v),
|
||||||
})
|
})
|
||||||
|
if cont == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return cont(k, v)
|
||||||
})
|
})
|
||||||
assert.Equal(t, goodkvs, actual)
|
assert.Equal(t, goodkvs, actual)
|
||||||
}
|
}
|
||||||
|
@ -123,12 +127,26 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
kvs[3], // key = "21"
|
kvs[3], // key = "21"
|
||||||
kvs[4], // key = "22"
|
kvs[4], // key = "22"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, false)
|
check(t, goodprefix, start, goodkvs, false, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte("0")
|
goodprefix := []byte("0")
|
||||||
start := []byte{}
|
start := []byte{}
|
||||||
check(t, goodprefix, start, []KeyValue{}, false)
|
check(t, goodprefix, start, []KeyValue{}, false, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
// Given this prefix...
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
// and empty start range...
|
||||||
|
start := []byte{}
|
||||||
|
// these pairs should be found.
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false, func(k, v []byte) bool {
|
||||||
|
return string(k) < "21"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -141,12 +159,23 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
kvs[3], // key = "21"
|
kvs[3], // key = "21"
|
||||||
kvs[2], // key = "20"
|
kvs[2], // key = "20"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, true)
|
check(t, goodprefix, start, goodkvs, true, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte("0")
|
goodprefix := []byte("0")
|
||||||
start := []byte{}
|
start := []byte{}
|
||||||
check(t, goodprefix, start, []KeyValue{}, true)
|
check(t, goodprefix, start, []KeyValue{}, true, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte{}
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[4], // key = "22"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true, func(k, v []byte) bool {
|
||||||
|
return string(k) > "21"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -155,33 +184,55 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
t.Run("forwards", func(t *testing.T) {
|
t.Run("forwards", func(t *testing.T) {
|
||||||
t.Run("good", func(t *testing.T) {
|
t.Run("good", func(t *testing.T) {
|
||||||
goodprefix := []byte("2")
|
goodprefix := []byte("2")
|
||||||
start := []byte("1") // start will be upended to goodprefix to start seek from
|
start := []byte("1") // start will be appended to goodprefix to start seek from
|
||||||
goodkvs := []KeyValue{
|
goodkvs := []KeyValue{
|
||||||
kvs[3], // key = "21"
|
kvs[3], // key = "21"
|
||||||
kvs[4], // key = "22"
|
kvs[4], // key = "22"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, false)
|
check(t, goodprefix, start, goodkvs, false, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte("2")
|
goodprefix := []byte("2")
|
||||||
start := []byte("3") // start is more than all keys prefixed by '2'.
|
start := []byte("3") // start is more than all keys prefixed by '2'.
|
||||||
check(t, goodprefix, start, []KeyValue{}, false)
|
check(t, goodprefix, start, []KeyValue{}, false, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte("0") // start will be appended to goodprefix to start seek from
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false, func(k, v []byte) bool {
|
||||||
|
return string(k) < "21"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("backwards", func(t *testing.T) {
|
t.Run("backwards", func(t *testing.T) {
|
||||||
t.Run("good", func(t *testing.T) {
|
t.Run("good", func(t *testing.T) {
|
||||||
goodprefix := []byte("2")
|
goodprefix := []byte("2")
|
||||||
start := []byte("1") // start will be upended to goodprefix to start seek from
|
start := []byte("1") // start will be appended to goodprefix to start seek from
|
||||||
goodkvs := []KeyValue{
|
goodkvs := []KeyValue{
|
||||||
kvs[3], // key = "21"
|
kvs[3], // key = "21"
|
||||||
kvs[2], // key = "20"
|
kvs[2], // key = "20"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, true)
|
check(t, goodprefix, start, goodkvs, true, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte("2")
|
goodprefix := []byte("2")
|
||||||
start := []byte(".") // start is less than all keys prefixed by '2'.
|
start := []byte(".") // start is less than all keys prefixed by '2'.
|
||||||
check(t, goodprefix, start, []KeyValue{}, true)
|
check(t, goodprefix, start, []KeyValue{}, true, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte("2") // start will be appended to goodprefix to start seek from
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[4], // key = "24"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true, func(k, v []byte) bool {
|
||||||
|
return string(k) > "21"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -197,12 +248,24 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
kvs[5], // key = "30"
|
kvs[5], // key = "30"
|
||||||
kvs[6], // key = "31"
|
kvs[6], // key = "31"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, false)
|
check(t, goodprefix, start, goodkvs, false, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte{}
|
goodprefix := []byte{}
|
||||||
start := []byte("32") // start is more than all keys.
|
start := []byte("32") // start is more than all keys.
|
||||||
check(t, goodprefix, start, []KeyValue{}, false)
|
check(t, goodprefix, start, []KeyValue{}, false, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodprefix := []byte{}
|
||||||
|
start := []byte("21")
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
kvs[4], // key = "22"
|
||||||
|
kvs[5], // key = "30"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false, func(k, v []byte) bool {
|
||||||
|
return string(k) < "30"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("backwards", func(t *testing.T) {
|
t.Run("backwards", func(t *testing.T) {
|
||||||
|
@ -215,12 +278,24 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
kvs[1], // key = "11"
|
kvs[1], // key = "11"
|
||||||
kvs[0], // key = "10"
|
kvs[0], // key = "10"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, true)
|
check(t, goodprefix, start, goodkvs, true, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte{}
|
goodprefix := []byte{}
|
||||||
start := []byte("0") // start is less than all keys.
|
start := []byte("0") // start is less than all keys.
|
||||||
check(t, goodprefix, start, []KeyValue{}, true)
|
check(t, goodprefix, start, []KeyValue{}, true, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodprefix := []byte{}
|
||||||
|
start := []byte("21")
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
kvs[1], // key = "11"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true, func(k, v []byte) bool {
|
||||||
|
return string(k) > "11"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -231,10 +306,36 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
goodkvs := make([]KeyValue, len(kvs))
|
goodkvs := make([]KeyValue, len(kvs))
|
||||||
copy(goodkvs, kvs)
|
copy(goodkvs, kvs)
|
||||||
t.Run("forwards", func(t *testing.T) {
|
t.Run("forwards", func(t *testing.T) {
|
||||||
check(t, goodprefix, start, goodkvs, false)
|
t.Run("good", func(t *testing.T) {
|
||||||
|
check(t, goodprefix, start, goodkvs, false, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[0], // key = "10"
|
||||||
|
kvs[1], // key = "11"
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false, func(k, v []byte) bool {
|
||||||
|
return string(k) < "21"
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
t.Run("backwards", func(t *testing.T) {
|
t.Run("backwards", func(t *testing.T) {
|
||||||
check(t, goodprefix, start, goodkvs, true)
|
t.Run("good", func(t *testing.T) {
|
||||||
|
check(t, goodprefix, start, goodkvs, true, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[6], // key = "31"
|
||||||
|
kvs[5], // key = "30"
|
||||||
|
kvs[4], // key = "22"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true, func(k, v []byte) bool {
|
||||||
|
return string(k) > "21"
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -1019,7 +1019,7 @@ func (s *Server) getTokenTransfers(ps request.Params, isNEP11 bool) (interface{}
|
||||||
return received, sent, !(limit != 0 && resCount >= limit), nil
|
return received, sent, !(limit != 0 && resCount >= limit), nil
|
||||||
}
|
}
|
||||||
if !isNEP11 {
|
if !isNEP11 {
|
||||||
err = s.chain.ForEachNEP17Transfer(u, func(tr *state.NEP17Transfer) (bool, error) {
|
err = s.chain.ForEachNEP17Transfer(u, end, func(tr *state.NEP17Transfer) (bool, error) {
|
||||||
r, s, res, err := handleTransfer(tr)
|
r, s, res, err := handleTransfer(tr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if r != nil {
|
if r != nil {
|
||||||
|
@ -1032,7 +1032,7 @@ func (s *Server) getTokenTransfers(ps request.Params, isNEP11 bool) (interface{}
|
||||||
return res, err
|
return res, err
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
err = s.chain.ForEachNEP11Transfer(u, func(tr *state.NEP11Transfer) (bool, error) {
|
err = s.chain.ForEachNEP11Transfer(u, end, func(tr *state.NEP11Transfer) (bool, error) {
|
||||||
r, s, res, err := handleTransfer(&tr.NEP17Transfer)
|
r, s, res, err := handleTransfer(&tr.NEP17Transfer)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
id := hex.EncodeToString(tr.ID)
|
id := hex.EncodeToString(tr.ID)
|
||||||
|
@ -1047,7 +1047,7 @@ func (s *Server) getTokenTransfers(ps request.Params, isNEP11 bool) (interface{}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, response.NewInternalServerError("invalid transfer log", err)
|
return nil, response.NewInternalServerError(fmt.Sprintf("invalid transfer log: %v", err), err)
|
||||||
}
|
}
|
||||||
return bs, nil
|
return bs, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue