core: allow early Seek stop
This simple approach allows to improve the performance of BoltDB and LevelDB in both terms of speed and allocations for retrieving GasPerVote value from the storage. MemoryPS's speed suffers a bit, but we don't use it for production environment. Part of #2322. Benchmark results: name old time/op new time/op delta NEO_GetGASPerVote/MemPS_10RewardRecords_1RewardDistance-8 25.3µs ± 1% 26.4µs ± 9% +4.41% (p=0.043 n=10+9) NEO_GetGASPerVote/MemPS_10RewardRecords_10RewardDistance-8 27.9µs ± 1% 30.1µs ±15% +7.97% (p=0.000 n=10+9) NEO_GetGASPerVote/MemPS_10RewardRecords_100RewardDistance-8 55.1µs ± 1% 60.2µs ± 7% +9.27% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_10RewardRecords_1000RewardDistance-8 353µs ± 2% 416µs ±13% +17.88% (p=0.000 n=8+8) NEO_GetGASPerVote/MemPS_100RewardRecords_1RewardDistance-8 195µs ± 1% 216µs ± 7% +10.42% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_100RewardRecords_10RewardDistance-8 200µs ± 4% 214µs ± 9% +6.99% (p=0.002 n=9+8) NEO_GetGASPerVote/MemPS_100RewardRecords_100RewardDistance-8 223µs ± 2% 247µs ± 9% +10.60% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_100RewardRecords_1000RewardDistance-8 612µs ±23% 855µs ±52% +39.60% (p=0.001 n=9+10) NEO_GetGASPerVote/MemPS_1000RewardRecords_1RewardDistance-8 11.3ms ±53% 10.7ms ±50% ~ (p=0.739 n=10+10) NEO_GetGASPerVote/MemPS_1000RewardRecords_10RewardDistance-8 12.0ms ±37% 10.4ms ±65% ~ (p=0.853 n=10+10) NEO_GetGASPerVote/MemPS_1000RewardRecords_100RewardDistance-8 11.3ms ±40% 10.4ms ±49% ~ (p=0.631 n=10+10) NEO_GetGASPerVote/MemPS_1000RewardRecords_1000RewardDistance-8 3.80ms ±45% 3.69ms ±27% ~ (p=0.931 n=6+5) NEO_GetGASPerVote/BoltPS_10RewardRecords_1RewardDistance-8 23.0µs ± 9% 22.6µs ± 4% ~ (p=0.059 n=8+9) NEO_GetGASPerVote/BoltPS_10RewardRecords_10RewardDistance-8 25.9µs ± 5% 24.8µs ± 4% -4.17% (p=0.006 n=10+8) NEO_GetGASPerVote/BoltPS_10RewardRecords_100RewardDistance-8 42.7µs ±13% 38.9µs ± 1% -8.85% (p=0.000 n=9+8) NEO_GetGASPerVote/BoltPS_10RewardRecords_1000RewardDistance-8 80.8µs ±12% 84.9µs ± 9% ~ (p=0.114 n=8+9) NEO_GetGASPerVote/BoltPS_100RewardRecords_1RewardDistance-8 64.3µs ±16% 22.1µs ±23% -65.64% (p=0.000 n=10+10) NEO_GetGASPerVote/BoltPS_100RewardRecords_10RewardDistance-8 61.0µs ±34% 23.2µs ± 8% -62.04% (p=0.000 n=10+9) NEO_GetGASPerVote/BoltPS_100RewardRecords_100RewardDistance-8 62.2µs ±14% 25.7µs ±13% -58.66% (p=0.000 n=9+10) NEO_GetGASPerVote/BoltPS_100RewardRecords_1000RewardDistance-8 359µs ±60% 325µs ±60% ~ (p=0.739 n=10+10) NEO_GetGASPerVote/BoltPS_1000RewardRecords_1RewardDistance-8 242µs ±21% 13µs ±28% -94.49% (p=0.000 n=10+8) NEO_GetGASPerVote/BoltPS_1000RewardRecords_10RewardDistance-8 229µs ±23% 18µs ±70% -92.02% (p=0.000 n=10+9) NEO_GetGASPerVote/BoltPS_1000RewardRecords_100RewardDistance-8 238µs ±28% 20µs ±109% -91.38% (p=0.000 n=10+9) NEO_GetGASPerVote/BoltPS_1000RewardRecords_1000RewardDistance-8 265µs ±20% 77µs ±62% -71.04% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_10RewardRecords_1RewardDistance-8 25.5µs ± 3% 24.7µs ± 7% ~ (p=0.143 n=10+10) NEO_GetGASPerVote/LevelPS_10RewardRecords_10RewardDistance-8 27.4µs ± 2% 27.9µs ± 6% ~ (p=0.280 n=10+10) NEO_GetGASPerVote/LevelPS_10RewardRecords_100RewardDistance-8 50.2µs ± 7% 47.4µs ±10% ~ (p=0.156 n=9+10) NEO_GetGASPerVote/LevelPS_10RewardRecords_1000RewardDistance-8 98.2µs ± 9% 94.6µs ±10% ~ (p=0.218 n=10+10) NEO_GetGASPerVote/LevelPS_100RewardRecords_1RewardDistance-8 82.9µs ±13% 32.1µs ±22% -61.30% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_100RewardRecords_10RewardDistance-8 92.2µs ±11% 33.7µs ±12% -63.42% (p=0.000 n=10+9) NEO_GetGASPerVote/LevelPS_100RewardRecords_100RewardDistance-8 88.3µs ±22% 39.4µs ±14% -55.36% (p=0.000 n=10+9) NEO_GetGASPerVote/LevelPS_100RewardRecords_1000RewardDistance-8 106µs ±18% 78µs ±24% -26.20% (p=0.000 n=9+10) NEO_GetGASPerVote/LevelPS_1000RewardRecords_1RewardDistance-8 360µs ±24% 29µs ±53% -91.91% (p=0.000 n=10+9) NEO_GetGASPerVote/LevelPS_1000RewardRecords_10RewardDistance-8 353µs ±16% 50µs ±70% -85.72% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_1000RewardRecords_100RewardDistance-8 381µs ±20% 47µs ±111% -87.64% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_1000RewardRecords_1000RewardDistance-8 434µs ±19% 113µs ±41% -74.04% (p=0.000 n=10+10) name old alloc/op new alloc/op delta NEO_GetGASPerVote/MemPS_10RewardRecords_1RewardDistance-8 4.82kB ± 0% 4.26kB ± 1% -11.62% (p=0.000 n=10+9) NEO_GetGASPerVote/MemPS_10RewardRecords_10RewardDistance-8 4.99kB ± 0% 4.41kB ± 1% -11.56% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_10RewardRecords_100RewardDistance-8 8.45kB ± 0% 7.87kB ± 0% -6.88% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_10RewardRecords_1000RewardDistance-8 55.0kB ± 0% 54.5kB ± 0% -0.81% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_100RewardRecords_1RewardDistance-8 29.1kB ± 0% 21.7kB ± 2% -25.56% (p=0.000 n=9+9) NEO_GetGASPerVote/MemPS_100RewardRecords_10RewardDistance-8 29.3kB ± 1% 21.8kB ± 2% -25.74% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_100RewardRecords_100RewardDistance-8 31.3kB ± 1% 23.6kB ± 1% -24.50% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_100RewardRecords_1000RewardDistance-8 92.5kB ± 5% 84.7kB ± 3% -8.50% (p=0.000 n=10+9) NEO_GetGASPerVote/MemPS_1000RewardRecords_1RewardDistance-8 324kB ±29% 222kB ±44% -31.33% (p=0.007 n=10+10) NEO_GetGASPerVote/MemPS_1000RewardRecords_10RewardDistance-8 308kB ±32% 174kB ±14% -43.56% (p=0.000 n=10+8) NEO_GetGASPerVote/MemPS_1000RewardRecords_100RewardDistance-8 298kB ±23% 178kB ±36% -40.26% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_1000RewardRecords_1000RewardDistance-8 362kB ± 6% 248kB ± 6% -31.54% (p=0.004 n=6+5) NEO_GetGASPerVote/BoltPS_10RewardRecords_1RewardDistance-8 5.15kB ± 3% 4.64kB ± 2% -9.92% (p=0.000 n=10+9) NEO_GetGASPerVote/BoltPS_10RewardRecords_10RewardDistance-8 5.36kB ± 1% 4.75kB ± 5% -11.42% (p=0.000 n=10+10) NEO_GetGASPerVote/BoltPS_10RewardRecords_100RewardDistance-8 8.15kB ± 4% 7.53kB ± 1% -7.62% (p=0.000 n=10+9) NEO_GetGASPerVote/BoltPS_10RewardRecords_1000RewardDistance-8 33.2kB ± 5% 33.2kB ± 7% ~ (p=0.829 n=8+10) NEO_GetGASPerVote/BoltPS_100RewardRecords_1RewardDistance-8 20.1kB ± 7% 5.8kB ±13% -70.90% (p=0.000 n=10+10) NEO_GetGASPerVote/BoltPS_100RewardRecords_10RewardDistance-8 19.8kB ±14% 6.2kB ± 5% -68.87% (p=0.000 n=10+9) NEO_GetGASPerVote/BoltPS_100RewardRecords_100RewardDistance-8 21.7kB ± 6% 8.0kB ± 7% -63.20% (p=0.000 n=9+10) NEO_GetGASPerVote/BoltPS_100RewardRecords_1000RewardDistance-8 98.5kB ±44% 81.8kB ±48% ~ (p=0.143 n=10+10) NEO_GetGASPerVote/BoltPS_1000RewardRecords_1RewardDistance-8 130kB ± 4% 4kB ± 9% -96.69% (p=0.000 n=10+10) NEO_GetGASPerVote/BoltPS_1000RewardRecords_10RewardDistance-8 131kB ± 4% 5kB ±21% -96.48% (p=0.000 n=9+9) NEO_GetGASPerVote/BoltPS_1000RewardRecords_100RewardDistance-8 132kB ± 4% 6kB ±10% -95.39% (p=0.000 n=10+8) NEO_GetGASPerVote/BoltPS_1000RewardRecords_1000RewardDistance-8 151kB ± 4% 26kB ±10% -82.46% (p=0.000 n=9+9) NEO_GetGASPerVote/LevelPS_10RewardRecords_1RewardDistance-8 5.92kB ± 3% 5.32kB ± 2% -10.01% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_10RewardRecords_10RewardDistance-8 6.09kB ± 2% 5.48kB ± 2% -10.00% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_10RewardRecords_100RewardDistance-8 9.61kB ± 1% 9.00kB ± 0% -6.29% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_10RewardRecords_1000RewardDistance-8 33.4kB ± 7% 32.2kB ± 5% -3.60% (p=0.037 n=10+10) NEO_GetGASPerVote/LevelPS_100RewardRecords_1RewardDistance-8 22.3kB ±10% 9.0kB ±16% -59.78% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_100RewardRecords_10RewardDistance-8 23.6kB ± 6% 8.5kB ±20% -63.76% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_100RewardRecords_100RewardDistance-8 24.2kB ± 9% 11.5kB ± 4% -52.34% (p=0.000 n=10+8) NEO_GetGASPerVote/LevelPS_100RewardRecords_1000RewardDistance-8 44.2kB ± 6% 30.8kB ± 9% -30.24% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_1000RewardRecords_1RewardDistance-8 144kB ± 4% 10kB ±24% -93.39% (p=0.000 n=9+10) NEO_GetGASPerVote/LevelPS_1000RewardRecords_10RewardDistance-8 146kB ± 1% 11kB ±37% -92.14% (p=0.000 n=7+10) NEO_GetGASPerVote/LevelPS_1000RewardRecords_100RewardDistance-8 149kB ± 3% 11kB ±12% -92.28% (p=0.000 n=10+9) NEO_GetGASPerVote/LevelPS_1000RewardRecords_1000RewardDistance-8 171kB ± 4% 34kB ±12% -80.00% (p=0.000 n=10+10) name old allocs/op new allocs/op delta NEO_GetGASPerVote/MemPS_10RewardRecords_1RewardDistance-8 95.0 ± 0% 74.0 ± 0% -22.11% (p=0.001 n=8+9) NEO_GetGASPerVote/MemPS_10RewardRecords_10RewardDistance-8 100 ± 0% 78 ± 1% -21.70% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_10RewardRecords_100RewardDistance-8 153 ± 0% 131 ± 2% -14.25% (p=0.000 n=6+10) NEO_GetGASPerVote/MemPS_10RewardRecords_1000RewardDistance-8 799 ± 2% 797 ± 4% ~ (p=0.956 n=10+10) NEO_GetGASPerVote/MemPS_100RewardRecords_1RewardDistance-8 438 ± 6% 167 ± 0% -61.86% (p=0.000 n=10+9) NEO_GetGASPerVote/MemPS_100RewardRecords_10RewardDistance-8 446 ± 5% 172 ± 0% -61.38% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_100RewardRecords_100RewardDistance-8 506 ± 4% 232 ± 1% -54.21% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_100RewardRecords_1000RewardDistance-8 1.31k ± 5% 0.97k ± 4% -26.20% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_1000RewardRecords_1RewardDistance-8 5.06k ± 1% 1.09k ± 2% -78.53% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_1000RewardRecords_10RewardDistance-8 5.02k ± 3% 1.08k ± 0% -78.45% (p=0.000 n=10+8) NEO_GetGASPerVote/MemPS_1000RewardRecords_100RewardDistance-8 5.09k ± 3% 1.15k ± 2% -77.48% (p=0.000 n=10+10) NEO_GetGASPerVote/MemPS_1000RewardRecords_1000RewardDistance-8 5.83k ± 1% 1.87k ± 3% -68.02% (p=0.004 n=6+5) NEO_GetGASPerVote/BoltPS_10RewardRecords_1RewardDistance-8 103 ± 2% 82 ± 1% -20.83% (p=0.000 n=10+10) NEO_GetGASPerVote/BoltPS_10RewardRecords_10RewardDistance-8 107 ± 0% 86 ± 0% -19.63% (p=0.000 n=8+8) NEO_GetGASPerVote/BoltPS_10RewardRecords_100RewardDistance-8 164 ± 1% 139 ± 0% -15.45% (p=0.000 n=10+9) NEO_GetGASPerVote/BoltPS_10RewardRecords_1000RewardDistance-8 820 ± 1% 789 ± 1% -3.70% (p=0.000 n=9+10) NEO_GetGASPerVote/BoltPS_100RewardRecords_1RewardDistance-8 475 ± 0% 94 ± 3% -80.15% (p=0.000 n=10+9) NEO_GetGASPerVote/BoltPS_100RewardRecords_10RewardDistance-8 481 ± 0% 100 ± 2% -79.26% (p=0.000 n=9+9) NEO_GetGASPerVote/BoltPS_100RewardRecords_100RewardDistance-8 549 ± 0% 161 ± 2% -70.69% (p=0.000 n=10+10) NEO_GetGASPerVote/BoltPS_100RewardRecords_1000RewardDistance-8 1.61k ±19% 1.19k ±25% -26.05% (p=0.000 n=10+10) NEO_GetGASPerVote/BoltPS_1000RewardRecords_1RewardDistance-8 4.12k ± 0% 0.08k ± 2% -98.02% (p=0.000 n=10+10) NEO_GetGASPerVote/BoltPS_1000RewardRecords_10RewardDistance-8 4.14k ± 0% 0.09k ± 3% -97.90% (p=0.000 n=9+9) NEO_GetGASPerVote/BoltPS_1000RewardRecords_100RewardDistance-8 4.19k ± 0% 0.15k ± 3% -96.52% (p=0.000 n=9+9) NEO_GetGASPerVote/BoltPS_1000RewardRecords_1000RewardDistance-8 4.82k ± 1% 0.74k ± 1% -84.58% (p=0.000 n=10+9) NEO_GetGASPerVote/LevelPS_10RewardRecords_1RewardDistance-8 112 ± 4% 90 ± 3% -19.45% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_10RewardRecords_10RewardDistance-8 116 ± 2% 95 ± 2% -17.90% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_10RewardRecords_100RewardDistance-8 170 ± 3% 148 ± 3% -12.99% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_10RewardRecords_1000RewardDistance-8 800 ± 2% 772 ± 2% -3.50% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_100RewardRecords_1RewardDistance-8 480 ± 3% 118 ± 3% -75.32% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_100RewardRecords_10RewardDistance-8 479 ± 2% 123 ± 3% -74.33% (p=0.000 n=10+9) NEO_GetGASPerVote/LevelPS_100RewardRecords_100RewardDistance-8 542 ± 1% 183 ± 3% -66.34% (p=0.000 n=10+9) NEO_GetGASPerVote/LevelPS_100RewardRecords_1000RewardDistance-8 1.19k ± 1% 0.79k ± 1% -33.41% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_1000RewardRecords_1RewardDistance-8 4.21k ± 1% 0.13k ±21% -96.83% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_1000RewardRecords_10RewardDistance-8 4.23k ± 1% 0.15k ±17% -96.48% (p=0.000 n=10+10) NEO_GetGASPerVote/LevelPS_1000RewardRecords_100RewardDistance-8 4.27k ± 0% 0.19k ± 6% -95.51% (p=0.000 n=10+9) NEO_GetGASPerVote/LevelPS_1000RewardRecords_1000RewardDistance-8 4.89k ± 1% 0.79k ± 2% -83.80% (p=0.000 n=10+10)
This commit is contained in:
parent
0afe8826ba
commit
cd42b8b20c
16 changed files with 206 additions and 71 deletions
|
@ -486,9 +486,10 @@ func (bc *Blockchain) removeOldStorageItems() {
|
||||||
|
|
||||||
b := bc.dao.Store.Batch()
|
b := bc.dao.Store.Batch()
|
||||||
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
|
prefix := statesync.TemporaryPrefix(bc.dao.Version.StoragePrefix)
|
||||||
bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) {
|
bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(prefix)}}, func(k, _ []byte) bool {
|
||||||
// #1468, but don't need to copy here, because it is done by Store.
|
// #1468, but don't need to copy here, because it is done by Store.
|
||||||
b.Delete(k)
|
b.Delete(k)
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
b.Delete(storage.SYSCleanStorage.Bytes())
|
b.Delete(storage.SYSCleanStorage.Bytes())
|
||||||
|
|
||||||
|
|
|
@ -1768,11 +1768,12 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
|
||||||
if bcSpout.dao.Version.StoragePrefix == tempPrefix {
|
if bcSpout.dao.Version.StoragePrefix == tempPrefix {
|
||||||
tempPrefix = storage.STStorage
|
tempPrefix = storage.STStorage
|
||||||
}
|
}
|
||||||
bcSpout.dao.Store.Seek(storage.SeekRange{Prefix: bcSpout.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) {
|
bcSpout.dao.Store.Seek(storage.SeekRange{Prefix: bcSpout.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
key[0] = byte(tempPrefix)
|
key[0] = byte(tempPrefix)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
batch.Put(key, value)
|
batch.Put(key, value)
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
require.NoError(t, bcSpout.dao.Store.PutBatch(batch))
|
require.NoError(t, bcSpout.dao.Store.PutBatch(batch))
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,7 @@ type DAO interface {
|
||||||
PutStateSyncCurrentBlockHeight(h uint32) error
|
PutStateSyncCurrentBlockHeight(h uint32) error
|
||||||
PutStorageItem(id int32, key []byte, si state.StorageItem) error
|
PutStorageItem(id int32, key []byte, si state.StorageItem) error
|
||||||
PutVersion(v Version) error
|
PutVersion(v Version) error
|
||||||
Seek(id int32, rng storage.SeekRange, f func(k, v []byte))
|
Seek(id int32, rng storage.SeekRange, f func(k, v []byte) bool)
|
||||||
SeekAsync(ctx context.Context, id int32, rng storage.SeekRange) chan storage.KeyValue
|
SeekAsync(ctx context.Context, id int32, rng storage.SeekRange) chan storage.KeyValue
|
||||||
StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error
|
StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error
|
||||||
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
|
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
|
||||||
|
@ -292,13 +292,14 @@ func (dao *Simple) GetStorageItems(id int32) ([]state.StorageItemWithKey, error)
|
||||||
func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.StorageItemWithKey, error) {
|
func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.StorageItemWithKey, error) {
|
||||||
var siArr []state.StorageItemWithKey
|
var siArr []state.StorageItemWithKey
|
||||||
|
|
||||||
saveToArr := func(k, v []byte) {
|
saveToArr := func(k, v []byte) bool {
|
||||||
// Cut prefix and hash.
|
// Cut prefix and hash.
|
||||||
// #1468, but don't need to copy here, because it is done by Store.
|
// #1468, but don't need to copy here, because it is done by Store.
|
||||||
siArr = append(siArr, state.StorageItemWithKey{
|
siArr = append(siArr, state.StorageItemWithKey{
|
||||||
Key: k,
|
Key: k,
|
||||||
Item: state.StorageItem(v),
|
Item: state.StorageItem(v),
|
||||||
})
|
})
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
dao.Seek(id, storage.SeekRange{Prefix: prefix}, saveToArr)
|
dao.Seek(id, storage.SeekRange{Prefix: prefix}, saveToArr)
|
||||||
return siArr, nil
|
return siArr, nil
|
||||||
|
@ -306,11 +307,11 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S
|
||||||
|
|
||||||
// Seek executes f for all storage items matching a given `rng` (matching given prefix and
|
// Seek executes f for all storage items matching a given `rng` (matching given prefix and
|
||||||
// starting from the point specified). If key or value is to be used outside of f, they
|
// starting from the point specified). If key or value is to be used outside of f, they
|
||||||
// may not be copied.
|
// may not be copied. Seek continues iterating until false is returned from f.
|
||||||
func (dao *Simple) Seek(id int32, rng storage.SeekRange, f func(k, v []byte)) {
|
func (dao *Simple) Seek(id int32, rng storage.SeekRange, f func(k, v []byte) bool) {
|
||||||
rng.Prefix = makeStorageItemKey(dao.Version.StoragePrefix, id, rng.Prefix)
|
rng.Prefix = makeStorageItemKey(dao.Version.StoragePrefix, id, rng.Prefix)
|
||||||
dao.Store.Seek(rng, func(k, v []byte) {
|
dao.Store.Seek(rng, func(k, v []byte) bool {
|
||||||
f(k[len(rng.Prefix):], v)
|
return f(k[len(rng.Prefix):], v)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,13 +478,14 @@ func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
|
||||||
hashMap := make(map[uint32][]util.Uint256)
|
hashMap := make(map[uint32][]util.Uint256)
|
||||||
dao.Store.Seek(storage.SeekRange{
|
dao.Store.Seek(storage.SeekRange{
|
||||||
Prefix: storage.IXHeaderHashList.Bytes(),
|
Prefix: storage.IXHeaderHashList.Bytes(),
|
||||||
}, func(k, v []byte) {
|
}, func(k, v []byte) bool {
|
||||||
storedCount := binary.LittleEndian.Uint32(k[1:])
|
storedCount := binary.LittleEndian.Uint32(k[1:])
|
||||||
hashes, err := read2000Uint256Hashes(v)
|
hashes, err := read2000Uint256Hashes(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
hashMap[storedCount] = hashes
|
hashMap[storedCount] = hashes
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -503,13 +503,14 @@ func (m *Management) InitializeCache(d dao.DAO) error {
|
||||||
defer m.mtx.Unlock()
|
defer m.mtx.Unlock()
|
||||||
|
|
||||||
var initErr error
|
var initErr error
|
||||||
d.Seek(m.ID, storage.SeekRange{Prefix: []byte{prefixContract}}, func(_, v []byte) {
|
d.Seek(m.ID, storage.SeekRange{Prefix: []byte{prefixContract}}, func(_, v []byte) bool {
|
||||||
var cs = new(state.Contract)
|
var cs = new(state.Contract)
|
||||||
initErr = stackitem.DeserializeConvertible(v, cs)
|
initErr = stackitem.DeserializeConvertible(v, cs)
|
||||||
if initErr != nil {
|
if initErr != nil {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
m.updateContractCache(cs)
|
m.updateContractCache(cs)
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
return initErr
|
return initErr
|
||||||
}
|
}
|
||||||
|
|
|
@ -396,8 +396,8 @@ func (n *NEO) getGASPerVote(d dao.DAO, key []byte, indexes []uint32) []big.Int {
|
||||||
Prefix: key,
|
Prefix: key,
|
||||||
Start: start,
|
Start: start,
|
||||||
Backwards: true,
|
Backwards: true,
|
||||||
}, func(k, v []byte) {
|
}, func(k, v []byte) bool {
|
||||||
if collected < need && len(k) == 4 {
|
if len(k) == 4 {
|
||||||
num := binary.BigEndian.Uint32(k)
|
num := binary.BigEndian.Uint32(k)
|
||||||
for i, ind := range indexes {
|
for i, ind := range indexes {
|
||||||
if reward[i].Sign() == 0 && num <= ind {
|
if reward[i].Sign() == 0 && num <= ind {
|
||||||
|
@ -406,6 +406,7 @@ func (n *NEO) getGASPerVote(d dao.DAO, key []byte, indexes []uint32) []big.Int {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return collected < need
|
||||||
})
|
})
|
||||||
return reward
|
return reward
|
||||||
}
|
}
|
||||||
|
@ -601,8 +602,9 @@ func (n *NEO) dropCandidateIfZero(d dao.DAO, pub *keys.PublicKey, c *candidate)
|
||||||
|
|
||||||
var toRemove []string
|
var toRemove []string
|
||||||
voterKey := makeVoterKey(pub.Bytes())
|
voterKey := makeVoterKey(pub.Bytes())
|
||||||
d.Seek(n.ID, storage.SeekRange{Prefix: voterKey}, func(k, v []byte) {
|
d.Seek(n.ID, storage.SeekRange{Prefix: voterKey}, func(k, v []byte) bool {
|
||||||
toRemove = append(toRemove, string(k))
|
toRemove = append(toRemove, string(k))
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
for i := range toRemove {
|
for i := range toRemove {
|
||||||
if err := d.DeleteStorageItem(n.ID, []byte(toRemove[i])); err != nil {
|
if err := d.DeleteStorageItem(n.ID, []byte(toRemove[i])); err != nil {
|
||||||
|
|
|
@ -134,9 +134,10 @@ func (s *Module) CleanStorage() error {
|
||||||
return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load())
|
return fmt.Errorf("can't clean MPT data for non-genesis block: expected local stateroot height 0, got %d", s.localHeight.Load())
|
||||||
}
|
}
|
||||||
b := s.Store.Batch()
|
b := s.Store.Batch()
|
||||||
s.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, _ []byte) {
|
s.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, _ []byte) bool {
|
||||||
// #1468, but don't need to copy here, because it is done by Store.
|
// #1468, but don't need to copy here, because it is done by Store.
|
||||||
b.Delete(k)
|
b.Delete(k)
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
err := s.Store.PutBatch(b)
|
err := s.Store.PutBatch(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -32,7 +32,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
||||||
nodes = make(map[util.Uint256][]byte)
|
nodes = make(map[util.Uint256][]byte)
|
||||||
expectedItems []storage.KeyValue
|
expectedItems []storage.KeyValue
|
||||||
)
|
)
|
||||||
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) {
|
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
expectedItems = append(expectedItems, storage.KeyValue{
|
expectedItems = append(expectedItems, storage.KeyValue{
|
||||||
|
@ -43,6 +43,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
nodeBytes := value[:len(value)-4]
|
nodeBytes := value[:len(value)-4]
|
||||||
nodes[hash] = nodeBytes
|
nodes[hash] = nodeBytes
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
actualStorage := storage.NewMemCachedStore(storage.NewMemoryStore())
|
actualStorage := storage.NewMemCachedStore(storage.NewMemoryStore())
|
||||||
|
@ -95,13 +96,14 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
||||||
|
|
||||||
// Compare resulting storage items and refcounts.
|
// Compare resulting storage items and refcounts.
|
||||||
var actualItems []storage.KeyValue
|
var actualItems []storage.KeyValue
|
||||||
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) {
|
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
actualItems = append(actualItems, storage.KeyValue{
|
actualItems = append(actualItems, storage.KeyValue{
|
||||||
Key: key,
|
Key: key,
|
||||||
Value: value,
|
Value: value,
|
||||||
})
|
})
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
require.ElementsMatch(t, expectedItems, actualItems)
|
require.ElementsMatch(t, expectedItems, actualItems)
|
||||||
}
|
}
|
||||||
|
|
|
@ -424,7 +424,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
// compare storage states
|
// compare storage states
|
||||||
fetchStorage := func(bc *Blockchain) []storage.KeyValue {
|
fetchStorage := func(bc *Blockchain) []storage.KeyValue {
|
||||||
var kv []storage.KeyValue
|
var kv []storage.KeyValue
|
||||||
bc.dao.Store.Seek(storage.SeekRange{Prefix: bc.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) {
|
bc.dao.Store.Seek(storage.SeekRange{Prefix: bc.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
if key[0] == byte(storage.STTempStorage) {
|
if key[0] == byte(storage.STTempStorage) {
|
||||||
|
@ -434,6 +434,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
Key: key,
|
Key: key,
|
||||||
Value: value,
|
Value: value,
|
||||||
})
|
})
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
return kv
|
return kv
|
||||||
}
|
}
|
||||||
|
@ -444,8 +445,9 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
// no temp items should be left
|
// no temp items should be left
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
var haveItems bool
|
var haveItems bool
|
||||||
bcBolt.dao.Store.Seek(storage.SeekRange{Prefix: storage.STStorage.Bytes()}, func(_, _ []byte) {
|
bcBolt.dao.Store.Seek(storage.SeekRange{Prefix: storage.STStorage.Bytes()}, func(_, _ []byte) bool {
|
||||||
haveItems = true
|
haveItems = true
|
||||||
|
return false
|
||||||
})
|
})
|
||||||
return !haveItems
|
return !haveItems
|
||||||
}, time.Second*5, time.Millisecond*100)
|
}, time.Second*5, time.Millisecond*100)
|
||||||
|
|
|
@ -109,7 +109,7 @@ func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
||||||
copy(start, rng.Prefix)
|
copy(start, rng.Prefix)
|
||||||
copy(start[len(rng.Prefix):], rng.Start)
|
copy(start[len(rng.Prefix):], rng.Start)
|
||||||
|
@ -120,13 +120,15 @@ func (s *BoltDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte)) {
|
func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte) bool) {
|
||||||
prefix := util.BytesPrefix(key)
|
prefix := util.BytesPrefix(key)
|
||||||
prefix.Start = start
|
prefix.Start = start
|
||||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||||
c := tx.Bucket(Bucket).Cursor()
|
c := tx.Bucket(Bucket).Cursor()
|
||||||
for k, v := c.Seek(prefix.Start); k != nil && (len(prefix.Limit) == 0 || bytes.Compare(k, prefix.Limit) <= 0); k, v = c.Next() {
|
for k, v := c.Seek(prefix.Start); k != nil && (len(prefix.Limit) == 0 || bytes.Compare(k, prefix.Limit) <= 0); k, v = c.Next() {
|
||||||
f(k, v)
|
if !f(k, v) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -135,7 +137,7 @@ func (s *BoltDBStore) seek(key []byte, start []byte, f func(k, v []byte)) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte)) {
|
func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte) bool) {
|
||||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||||
c := tx.Bucket(Bucket).Cursor()
|
c := tx.Bucket(Bucket).Cursor()
|
||||||
// Move cursor to the first kv pair which is followed by the pair matching the specified prefix.
|
// Move cursor to the first kv pair which is followed by the pair matching the specified prefix.
|
||||||
|
@ -146,7 +148,9 @@ func (s *BoltDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte
|
||||||
rng := util.BytesPrefix(start) // in fact, we only need limit based on start slice to iterate backwards starting from this limit
|
rng := util.BytesPrefix(start) // in fact, we only need limit based on start slice to iterate backwards starting from this limit
|
||||||
c.Seek(rng.Limit)
|
c.Seek(rng.Limit)
|
||||||
for k, v := c.Prev(); k != nil && bytes.HasPrefix(k, key); k, v = c.Prev() {
|
for k, v := c.Prev(); k != nil && bytes.HasPrefix(k, key); k, v = c.Prev() {
|
||||||
f(k, v)
|
if !f(k, v) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -85,7 +85,7 @@ func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
start := make([]byte, len(rng.Prefix)+len(rng.Start))
|
||||||
copy(start, rng.Prefix)
|
copy(start, rng.Prefix)
|
||||||
copy(start[len(rng.Prefix):], rng.Start)
|
copy(start[len(rng.Prefix):], rng.Start)
|
||||||
|
@ -96,23 +96,27 @@ func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *LevelDBStore) seek(key []byte, start []byte, f func(k, v []byte)) {
|
func (s *LevelDBStore) seek(key []byte, start []byte, f func(k, v []byte) bool) {
|
||||||
prefix := util.BytesPrefix(key)
|
prefix := util.BytesPrefix(key)
|
||||||
prefix.Start = start
|
prefix.Start = start
|
||||||
iter := s.db.NewIterator(prefix, nil)
|
iter := s.db.NewIterator(prefix, nil)
|
||||||
for iter.Next() {
|
for iter.Next() {
|
||||||
f(iter.Key(), iter.Value())
|
if !f(iter.Key(), iter.Value()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
iter.Release()
|
iter.Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *LevelDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte)) {
|
func (s *LevelDBStore) seekBackwards(key []byte, start []byte, f func(k, v []byte) bool) {
|
||||||
iRange := util.BytesPrefix(start)
|
iRange := util.BytesPrefix(start)
|
||||||
iRange.Start = key
|
iRange.Start = key
|
||||||
|
|
||||||
iter := s.db.NewIterator(iRange, nil)
|
iter := s.db.NewIterator(iRange, nil)
|
||||||
for ok := iter.Last(); ok; ok = iter.Prev() {
|
for ok := iter.Last(); ok; ok = iter.Prev() {
|
||||||
f(iter.Key(), iter.Value())
|
if !f(iter.Key(), iter.Value()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
iter.Release()
|
iter.Release()
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,7 @@ func (s *MemCachedStore) GetBatch() *MemBatch {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
s.seek(context.Background(), rng, false, f)
|
s.seek(context.Background(), rng, false, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,11 +100,12 @@ func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue {
|
func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix bool) chan KeyValue {
|
||||||
res := make(chan KeyValue)
|
res := make(chan KeyValue)
|
||||||
go func() {
|
go func() {
|
||||||
s.seek(ctx, rng, cutPrefix, func(k, v []byte) {
|
s.seek(ctx, rng, cutPrefix, func(k, v []byte) bool {
|
||||||
res <- KeyValue{
|
res <- KeyValue{
|
||||||
Key: k,
|
Key: k,
|
||||||
Value: v,
|
Value: v,
|
||||||
}
|
}
|
||||||
|
return true // always continue, we have context for early stop.
|
||||||
})
|
})
|
||||||
close(res)
|
close(res)
|
||||||
}()
|
}()
|
||||||
|
@ -117,7 +118,7 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix
|
||||||
// key needs to be cut off the resulting keys. `rng` specifies prefix items must match
|
// key needs to be cut off the resulting keys. `rng` specifies prefix items must match
|
||||||
// and point to start seeking from. Backwards seeking from some point is supported
|
// and point to start seeking from. Backwards seeking from some point is supported
|
||||||
// with corresponding `rng` field set.
|
// with corresponding `rng` field set.
|
||||||
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte)) {
|
func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) {
|
||||||
// Create memory store `mem` and `del` snapshot not to hold the lock.
|
// Create memory store `mem` and `del` snapshot not to hold the lock.
|
||||||
var memRes []KeyValueExists
|
var memRes []KeyValueExists
|
||||||
sPrefix := string(rng.Prefix)
|
sPrefix := string(rng.Prefix)
|
||||||
|
@ -176,21 +177,21 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
||||||
haveMem = true
|
haveMem = true
|
||||||
iMem++
|
iMem++
|
||||||
}
|
}
|
||||||
// Merge results of seek operations in ascending order.
|
// Merge results of seek operations in ascending order. It returns whether iterating
|
||||||
mergeFunc := func(k, v []byte) {
|
// should be continued.
|
||||||
|
mergeFunc := func(k, v []byte) bool {
|
||||||
if done {
|
if done {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
kvPs := KeyValue{
|
kvPs := KeyValue{
|
||||||
Key: slice.Copy(k),
|
Key: slice.Copy(k),
|
||||||
Value: slice.Copy(v),
|
Value: slice.Copy(v),
|
||||||
}
|
}
|
||||||
loop:
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
done = true
|
done = true
|
||||||
break loop
|
return false
|
||||||
default:
|
default:
|
||||||
var isMem = haveMem && less(kvMem.Key, kvPs.Key)
|
var isMem = haveMem && less(kvMem.Key, kvPs.Key)
|
||||||
if isMem {
|
if isMem {
|
||||||
|
@ -198,7 +199,10 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
||||||
if cutPrefix {
|
if cutPrefix {
|
||||||
kvMem.Key = kvMem.Key[lPrefix:]
|
kvMem.Key = kvMem.Key[lPrefix:]
|
||||||
}
|
}
|
||||||
f(kvMem.Key, kvMem.Value)
|
if !f(kvMem.Key, kvMem.Value) {
|
||||||
|
done = true
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if iMem < len(memRes) {
|
if iMem < len(memRes) {
|
||||||
kvMem = memRes[iMem]
|
kvMem = memRes[iMem]
|
||||||
|
@ -212,9 +216,12 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
||||||
if cutPrefix {
|
if cutPrefix {
|
||||||
kvPs.Key = kvPs.Key[lPrefix:]
|
kvPs.Key = kvPs.Key[lPrefix:]
|
||||||
}
|
}
|
||||||
f(kvPs.Key, kvPs.Value)
|
if !f(kvPs.Key, kvPs.Value) {
|
||||||
|
done = true
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
break loop
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -233,7 +240,9 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool
|
||||||
if cutPrefix {
|
if cutPrefix {
|
||||||
kvMem.Key = kvMem.Key[lPrefix:]
|
kvMem.Key = kvMem.Key[lPrefix:]
|
||||||
}
|
}
|
||||||
f(kvMem.Key, kvMem.Value)
|
if !f(kvMem.Key, kvMem.Value) {
|
||||||
|
break loop
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -167,8 +167,9 @@ func TestCachedSeek(t *testing.T) {
|
||||||
require.NoError(t, ts.Put(v.Key, v.Value))
|
require.NoError(t, ts.Put(v.Key, v.Value))
|
||||||
}
|
}
|
||||||
foundKVs := make(map[string][]byte)
|
foundKVs := make(map[string][]byte)
|
||||||
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) {
|
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) bool {
|
||||||
foundKVs[string(k)] = v
|
foundKVs[string(k)] = v
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
||||||
for _, kv := range lowerKVs {
|
for _, kv := range lowerKVs {
|
||||||
|
@ -232,7 +233,7 @@ func benchmarkCachedSeek(t *testing.B, ps Store, psElementsCount, tsElementsCoun
|
||||||
t.ReportAllocs()
|
t.ReportAllocs()
|
||||||
t.ResetTimer()
|
t.ResetTimer()
|
||||||
for n := 0; n < t.N; n++ {
|
for n := 0; n < t.N; n++ {
|
||||||
ts.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) {})
|
ts.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) bool { return true })
|
||||||
}
|
}
|
||||||
t.StopTimer()
|
t.StopTimer()
|
||||||
}
|
}
|
||||||
|
@ -290,7 +291,7 @@ func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error {
|
||||||
b.onPutBatch()
|
b.onPutBatch()
|
||||||
return ErrKeyNotFound
|
return ErrKeyNotFound
|
||||||
}
|
}
|
||||||
func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
}
|
}
|
||||||
func (b *BadStore) Close() error {
|
func (b *BadStore) Close() error {
|
||||||
return nil
|
return nil
|
||||||
|
@ -365,8 +366,9 @@ func TestCachedSeekSorting(t *testing.T) {
|
||||||
require.NoError(t, ts.Put(v.Key, v.Value))
|
require.NoError(t, ts.Put(v.Key, v.Value))
|
||||||
}
|
}
|
||||||
var foundKVs []KeyValue
|
var foundKVs []KeyValue
|
||||||
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) {
|
ts.Seek(SeekRange{Prefix: goodPrefix}, func(k, v []byte) bool {
|
||||||
foundKVs = append(foundKVs, KeyValue{Key: slice.Copy(k), Value: slice.Copy(v)})
|
foundKVs = append(foundKVs, KeyValue{Key: slice.Copy(k), Value: slice.Copy(v)})
|
||||||
|
return true
|
||||||
})
|
})
|
||||||
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
assert.Equal(t, len(foundKVs), len(lowerKVs)+len(updatedKVs))
|
||||||
expected := append(lowerKVs, updatedKVs...)
|
expected := append(lowerKVs, updatedKVs...)
|
||||||
|
|
|
@ -104,7 +104,7 @@ func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek implements the Store interface.
|
// Seek implements the Store interface.
|
||||||
func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte)) {
|
func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
s.mut.RLock()
|
s.mut.RLock()
|
||||||
s.seek(rng, f)
|
s.seek(rng, f)
|
||||||
s.mut.RUnlock()
|
s.mut.RUnlock()
|
||||||
|
@ -130,7 +130,7 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) {
|
||||||
// seek is an internal unlocked implementation of Seek. `start` denotes whether
|
// seek is an internal unlocked implementation of Seek. `start` denotes whether
|
||||||
// seeking starting from the provided prefix should be performed. Backwards
|
// seeking starting from the provided prefix should be performed. Backwards
|
||||||
// seeking from some point is supported with corresponding SeekRange field set.
|
// seeking from some point is supported with corresponding SeekRange field set.
|
||||||
func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) {
|
func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) {
|
||||||
sPrefix := string(rng.Prefix)
|
sPrefix := string(rng.Prefix)
|
||||||
lPrefix := len(sPrefix)
|
lPrefix := len(sPrefix)
|
||||||
sStart := string(rng.Start)
|
sStart := string(rng.Start)
|
||||||
|
@ -162,7 +162,9 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte)) {
|
||||||
return less(memList[i].Key, memList[j].Key)
|
return less(memList[i].Key, memList[j].Key)
|
||||||
})
|
})
|
||||||
for _, kv := range memList {
|
for _, kv := range memList {
|
||||||
f(kv.Key, kv.Value)
|
if !f(kv.Key, kv.Value) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ func BenchmarkMemorySeek(t *testing.B) {
|
||||||
t.ReportAllocs()
|
t.ReportAllocs()
|
||||||
t.ResetTimer()
|
t.ResetTimer()
|
||||||
for n := 0; n < t.N; n++ {
|
for n := 0; n < t.N; n++ {
|
||||||
ms.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) {})
|
ms.Seek(SeekRange{Prefix: searchPrefix}, func(k, v []byte) bool { return false })
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ type SeekRange struct {
|
||||||
// Empty Prefix means seeking through all keys in the DB starting from
|
// Empty Prefix means seeking through all keys in the DB starting from
|
||||||
// the Start if specified.
|
// the Start if specified.
|
||||||
Prefix []byte
|
Prefix []byte
|
||||||
// Start denotes value upended to the Prefix to start Seek from.
|
// Start denotes value appended to the Prefix to start Seek from.
|
||||||
// Seeking starting from some key includes this key to the result;
|
// Seeking starting from some key includes this key to the result;
|
||||||
// if no matching key was found then next suitable key is picked up.
|
// if no matching key was found then next suitable key is picked up.
|
||||||
// Start may be empty. Empty Start means seeking through all keys in
|
// Start may be empty. Empty Start means seeking through all keys in
|
||||||
|
@ -81,9 +81,10 @@ type (
|
||||||
// PutChangeSet allows to push prepared changeset to the Store.
|
// PutChangeSet allows to push prepared changeset to the Store.
|
||||||
PutChangeSet(puts map[string][]byte, dels map[string]bool) error
|
PutChangeSet(puts map[string][]byte, dels map[string]bool) error
|
||||||
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
// Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f.
|
||||||
// Key and value slices should not be modified. Seek can guarantee that key-value items are sorted by
|
// Seek continues iteration until false is returned from f.
|
||||||
// key in ascending way.
|
// Key and value slices should not be modified.
|
||||||
Seek(rng SeekRange, f func(k, v []byte))
|
// Seek can guarantee that key-value items are sorted by key in ascending way.
|
||||||
|
Seek(rng SeekRange, f func(k, v []byte) bool)
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,7 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
require.NoError(t, s.Put(v.Key, v.Value))
|
require.NoError(t, s.Put(v.Key, v.Value))
|
||||||
}
|
}
|
||||||
|
|
||||||
check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue, backwards bool) {
|
check := func(t *testing.T, goodprefix, start []byte, goodkvs []KeyValue, backwards bool, cont func(k, v []byte) bool) {
|
||||||
// Seek result expected to be sorted in an ascending (for forwards seeking) or descending (for backwards seeking) way.
|
// Seek result expected to be sorted in an ascending (for forwards seeking) or descending (for backwards seeking) way.
|
||||||
cmpFunc := func(i, j int) bool {
|
cmpFunc := func(i, j int) bool {
|
||||||
return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) < 0
|
return bytes.Compare(goodkvs[i].Key, goodkvs[j].Key) < 0
|
||||||
|
@ -101,11 +101,15 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
rng.Backwards = true
|
rng.Backwards = true
|
||||||
}
|
}
|
||||||
actual := make([]KeyValue, 0, len(goodkvs))
|
actual := make([]KeyValue, 0, len(goodkvs))
|
||||||
s.Seek(rng, func(k, v []byte) {
|
s.Seek(rng, func(k, v []byte) bool {
|
||||||
actual = append(actual, KeyValue{
|
actual = append(actual, KeyValue{
|
||||||
Key: slice.Copy(k),
|
Key: slice.Copy(k),
|
||||||
Value: slice.Copy(v),
|
Value: slice.Copy(v),
|
||||||
})
|
})
|
||||||
|
if cont == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return cont(k, v)
|
||||||
})
|
})
|
||||||
assert.Equal(t, goodkvs, actual)
|
assert.Equal(t, goodkvs, actual)
|
||||||
}
|
}
|
||||||
|
@ -123,12 +127,26 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
kvs[3], // key = "21"
|
kvs[3], // key = "21"
|
||||||
kvs[4], // key = "22"
|
kvs[4], // key = "22"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, false)
|
check(t, goodprefix, start, goodkvs, false, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte("0")
|
goodprefix := []byte("0")
|
||||||
start := []byte{}
|
start := []byte{}
|
||||||
check(t, goodprefix, start, []KeyValue{}, false)
|
check(t, goodprefix, start, []KeyValue{}, false, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
// Given this prefix...
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
// and empty start range...
|
||||||
|
start := []byte{}
|
||||||
|
// these pairs should be found.
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false, func(k, v []byte) bool {
|
||||||
|
return string(k) < "21"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -141,12 +159,23 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
kvs[3], // key = "21"
|
kvs[3], // key = "21"
|
||||||
kvs[2], // key = "20"
|
kvs[2], // key = "20"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, true)
|
check(t, goodprefix, start, goodkvs, true, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte("0")
|
goodprefix := []byte("0")
|
||||||
start := []byte{}
|
start := []byte{}
|
||||||
check(t, goodprefix, start, []KeyValue{}, true)
|
check(t, goodprefix, start, []KeyValue{}, true, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte{}
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[4], // key = "22"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true, func(k, v []byte) bool {
|
||||||
|
return string(k) > "21"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -155,33 +184,55 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
t.Run("forwards", func(t *testing.T) {
|
t.Run("forwards", func(t *testing.T) {
|
||||||
t.Run("good", func(t *testing.T) {
|
t.Run("good", func(t *testing.T) {
|
||||||
goodprefix := []byte("2")
|
goodprefix := []byte("2")
|
||||||
start := []byte("1") // start will be upended to goodprefix to start seek from
|
start := []byte("1") // start will be appended to goodprefix to start seek from
|
||||||
goodkvs := []KeyValue{
|
goodkvs := []KeyValue{
|
||||||
kvs[3], // key = "21"
|
kvs[3], // key = "21"
|
||||||
kvs[4], // key = "22"
|
kvs[4], // key = "22"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, false)
|
check(t, goodprefix, start, goodkvs, false, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte("2")
|
goodprefix := []byte("2")
|
||||||
start := []byte("3") // start is more than all keys prefixed by '2'.
|
start := []byte("3") // start is more than all keys prefixed by '2'.
|
||||||
check(t, goodprefix, start, []KeyValue{}, false)
|
check(t, goodprefix, start, []KeyValue{}, false, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte("0") // start will be appended to goodprefix to start seek from
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false, func(k, v []byte) bool {
|
||||||
|
return string(k) < "21"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("backwards", func(t *testing.T) {
|
t.Run("backwards", func(t *testing.T) {
|
||||||
t.Run("good", func(t *testing.T) {
|
t.Run("good", func(t *testing.T) {
|
||||||
goodprefix := []byte("2")
|
goodprefix := []byte("2")
|
||||||
start := []byte("1") // start will be upended to goodprefix to start seek from
|
start := []byte("1") // start will be appended to goodprefix to start seek from
|
||||||
goodkvs := []KeyValue{
|
goodkvs := []KeyValue{
|
||||||
kvs[3], // key = "21"
|
kvs[3], // key = "21"
|
||||||
kvs[2], // key = "20"
|
kvs[2], // key = "20"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, true)
|
check(t, goodprefix, start, goodkvs, true, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte("2")
|
goodprefix := []byte("2")
|
||||||
start := []byte(".") // start is less than all keys prefixed by '2'.
|
start := []byte(".") // start is less than all keys prefixed by '2'.
|
||||||
check(t, goodprefix, start, []KeyValue{}, true)
|
check(t, goodprefix, start, []KeyValue{}, true, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodprefix := []byte("2")
|
||||||
|
start := []byte("2") // start will be appended to goodprefix to start seek from
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[4], // key = "24"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true, func(k, v []byte) bool {
|
||||||
|
return string(k) > "21"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -197,12 +248,24 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
kvs[5], // key = "30"
|
kvs[5], // key = "30"
|
||||||
kvs[6], // key = "31"
|
kvs[6], // key = "31"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, false)
|
check(t, goodprefix, start, goodkvs, false, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte{}
|
goodprefix := []byte{}
|
||||||
start := []byte("32") // start is more than all keys.
|
start := []byte("32") // start is more than all keys.
|
||||||
check(t, goodprefix, start, []KeyValue{}, false)
|
check(t, goodprefix, start, []KeyValue{}, false, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodprefix := []byte{}
|
||||||
|
start := []byte("21")
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
kvs[4], // key = "22"
|
||||||
|
kvs[5], // key = "30"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false, func(k, v []byte) bool {
|
||||||
|
return string(k) < "30"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("backwards", func(t *testing.T) {
|
t.Run("backwards", func(t *testing.T) {
|
||||||
|
@ -215,12 +278,24 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
kvs[1], // key = "11"
|
kvs[1], // key = "11"
|
||||||
kvs[0], // key = "10"
|
kvs[0], // key = "10"
|
||||||
}
|
}
|
||||||
check(t, goodprefix, start, goodkvs, true)
|
check(t, goodprefix, start, goodkvs, true, nil)
|
||||||
})
|
})
|
||||||
t.Run("no matching items", func(t *testing.T) {
|
t.Run("no matching items", func(t *testing.T) {
|
||||||
goodprefix := []byte{}
|
goodprefix := []byte{}
|
||||||
start := []byte("0") // start is less than all keys.
|
start := []byte("0") // start is less than all keys.
|
||||||
check(t, goodprefix, start, []KeyValue{}, true)
|
check(t, goodprefix, start, []KeyValue{}, true, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodprefix := []byte{}
|
||||||
|
start := []byte("21")
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
kvs[1], // key = "11"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true, func(k, v []byte) bool {
|
||||||
|
return string(k) > "11"
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -231,10 +306,36 @@ func testStoreSeek(t *testing.T, s Store) {
|
||||||
goodkvs := make([]KeyValue, len(kvs))
|
goodkvs := make([]KeyValue, len(kvs))
|
||||||
copy(goodkvs, kvs)
|
copy(goodkvs, kvs)
|
||||||
t.Run("forwards", func(t *testing.T) {
|
t.Run("forwards", func(t *testing.T) {
|
||||||
check(t, goodprefix, start, goodkvs, false)
|
t.Run("good", func(t *testing.T) {
|
||||||
|
check(t, goodprefix, start, goodkvs, false, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[0], // key = "10"
|
||||||
|
kvs[1], // key = "11"
|
||||||
|
kvs[2], // key = "20"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, false, func(k, v []byte) bool {
|
||||||
|
return string(k) < "21"
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
t.Run("backwards", func(t *testing.T) {
|
t.Run("backwards", func(t *testing.T) {
|
||||||
check(t, goodprefix, start, goodkvs, true)
|
t.Run("good", func(t *testing.T) {
|
||||||
|
check(t, goodprefix, start, goodkvs, true, nil)
|
||||||
|
})
|
||||||
|
t.Run("early stop", func(t *testing.T) {
|
||||||
|
goodkvs := []KeyValue{
|
||||||
|
kvs[6], // key = "31"
|
||||||
|
kvs[5], // key = "30"
|
||||||
|
kvs[4], // key = "22"
|
||||||
|
kvs[3], // key = "21"
|
||||||
|
}
|
||||||
|
check(t, goodprefix, start, goodkvs, true, func(k, v []byte) bool {
|
||||||
|
return string(k) > "21"
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue