core: spread storeBlock actions to three goroutines
Block processing consists of:
* saving block/transactions to the DB
* executing blocks/transactions
* processing notifications/saving AERs
* updating MPT
* atomically updating Blockchain state
Of these the first one is completely independent of others, it can be done in
a separate routine easily. The third one technically depends on the second,
it just doesn't have data until something is executed. At the same time it
doesn't affect future executions in any way, so we can offload
AER/notification processing to separate goroutine (while the main thread
proceeds with other transactions).
MPT update depends on all executions, so it can't be offloaded, but it can be
done concurrently to AER processing. And only the last thing actually needs
all previous ones to be finished, so it's a natural synchronization point.
So we spawn two additional routines and let the main one execute transactions
and update MPT as fast as it can. While technically all of these routines
could share single DAO (they are working with different KV sets) benchmarking
shows that using separate DAOs and then persisting them to lower one actually
works about 7-8%% better. At the same time we can simplify DAOs used, Cached
one is only relevant for AER processing because it caches NEP-17 tracking
data, everything else can do just fine with Simple.
The change was tested for performance with neo-bench (single node, 10 workers,
LevelDB) on two machines and block dump processing (RC4 testnet up to 50825
with VerifyBlocks set to false) on i7-8565U. neo-bench creates huge blocks
with lots of transactions while RC4 dump mostly consists of empty blocks.
Reference results (06c3dda5d1
):
Ryzen 9 5950X:
RPS ≈ 20059.569 21186.328 20158.983 ≈ 20468 ± 3.05%
TPS ≈ 19544.993 20585.450 19658.338 ≈ 19930 ± 2.86%
CPU ≈ 18.682% 23.877% 22.852% ≈ 21.8 ± 12.62%
Mem ≈ 618.981MB 559.246MB 541.539MB ≈ 573 ± 7.08%
Core i7-8565U:
RPS ≈ 5927.082 6526.739 6372.115 ≈ 6275 ± 4.96%
TPS ≈ 5899.531 6477.187 6329.515 ≈ 6235 ± 4.81%
CPU ≈ 56.346% 61.955% 58.125% ≈ 58.8 ± 4.87%
Mem ≈ 212.191MB 224.974MB 205.479MB ≈ 214 ± 4.62%
DB restore:
real 0m12.683s 0m13.222s 0m13.382s ≈ 13.096 ± 2.80%
user 0m18.501s 0m19.163s 0m19.489s ≈ 19.051 ± 2.64%
sys 0m1.404s 0m1.396s 0m1.666s ≈ 1.489 ± 10.32%
After the change:
Ryzen 9 5950X:
RPS ≈ 23056.899 22822.015 23006.543 ≈ 22962 ± 0.54%
TPS ≈ 22594.785 22292.071 22800.857 ≈ 22562 ± 1.13%
CPU ≈ 24.262% 23.185% 25.921% ≈ 24.5 ± 5.65%
Mem ≈ 614.254MB 613.204MB 555.491MB ≈ 594 ± 5.66%
Core i7-8565U:
RPS ≈ 6378.702 6423.927 6363.788 ≈ 6389 ± 0.49%
TPS ≈ 6327.072 6372.552 6311.179 ≈ 6337 ± 0.50%
CPU ≈ 57.599% 58.622% 59.737% ≈ 58.7 ± 1.82%
Mem ≈ 198.697MB 188.746MB 200.235MB ≈ 196 ± 3.18%
DB restore:
real 0m13.576s 0m13.334s 0m12.757s ≈ 13.222 ± 3.18%
user 0m19.113s 0m19.490s 0m20.197s ≈ 19.600 ± 2.81%
sys 0m2.211s 0m1.558s 0m1.559s ≈ 1.776 ± 21.21%
On Ryzen 9 we've got 12% better RPS, 13% better TPS with 12% CPU and 3% RAM
more used. Core i7-8565U changes don't seem to be statistically significant:
1.8% more RPS, 1.6% more TPS with about the same CPU and 8.5% less RAM
used. It also is 1% worse in DB restore time.
The result is somewhat expected, on a powerful machine with lots of spare
cores we get 10%+ better results while on average resource-constrained laptop it
doesn't change much (the machine is already saturated). Overall, this seems to
be worthwhile.
This commit is contained in:
parent
06c3dda5d1
commit
49be753850
1 changed files with 143 additions and 74 deletions
|
@ -677,36 +677,117 @@ func (bc *Blockchain) GetStateModule() blockchainer.StateRoot {
|
|||
// transactions with all appropriate side-effects and updates Blockchain state.
|
||||
// This is the only way to change Blockchain state.
|
||||
func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error {
|
||||
cache := dao.NewCached(bc.dao)
|
||||
writeBuf := io.NewBufBinWriter()
|
||||
appExecResults := make([]*state.AppExecResult, 0, 2+len(block.Transactions))
|
||||
if err := cache.StoreAsBlock(block, writeBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
writeBuf.Reset()
|
||||
|
||||
if err := cache.StoreAsCurrentBlock(block, writeBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
writeBuf.Reset()
|
||||
|
||||
aer, err := bc.runPersist(bc.contracts.GetPersistScript(), block, cache, trigger.OnPersist)
|
||||
if err != nil {
|
||||
return fmt.Errorf("onPersist failed: %w", err)
|
||||
}
|
||||
appExecResults = append(appExecResults, aer)
|
||||
err = cache.PutAppExecResult(aer, writeBuf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store onPersist exec result: %w", err)
|
||||
}
|
||||
writeBuf.Reset()
|
||||
|
||||
for _, tx := range block.Transactions {
|
||||
if err := cache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil {
|
||||
return err
|
||||
var (
|
||||
cache = bc.dao.GetWrapped()
|
||||
appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions))
|
||||
aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune.
|
||||
aerdone = make(chan error)
|
||||
blockdone = make(chan error)
|
||||
)
|
||||
go func() {
|
||||
var (
|
||||
kvcache = cache.GetWrapped()
|
||||
writeBuf = io.NewBufBinWriter()
|
||||
)
|
||||
if err := kvcache.StoreAsBlock(block, writeBuf); err != nil {
|
||||
blockdone <- err
|
||||
return
|
||||
}
|
||||
writeBuf.Reset()
|
||||
|
||||
if err := kvcache.StoreAsCurrentBlock(block, writeBuf); err != nil {
|
||||
blockdone <- err
|
||||
return
|
||||
}
|
||||
writeBuf.Reset()
|
||||
|
||||
for _, tx := range block.Transactions {
|
||||
if err := kvcache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil {
|
||||
blockdone <- err
|
||||
return
|
||||
}
|
||||
|
||||
writeBuf.Reset()
|
||||
if bc.config.P2PSigExtensions {
|
||||
for _, attr := range tx.GetAttributes(transaction.ConflictsT) {
|
||||
hash := attr.Value.(*transaction.Conflicts).Hash
|
||||
dummyTx := transaction.NewTrimmedTX(hash)
|
||||
dummyTx.Version = transaction.DummyVersion
|
||||
if err := kvcache.StoreAsTransaction(dummyTx, block.Index, writeBuf); err != nil {
|
||||
blockdone <- fmt.Errorf("failed to store conflicting transaction %s for transaction %s: %w", hash.StringLE(), tx.Hash().StringLE(), err)
|
||||
return
|
||||
}
|
||||
writeBuf.Reset()
|
||||
}
|
||||
}
|
||||
}
|
||||
if bc.config.RemoveUntraceableBlocks {
|
||||
if block.Index > bc.config.MaxTraceableBlocks {
|
||||
index := block.Index - bc.config.MaxTraceableBlocks // is at least 1
|
||||
err := kvcache.DeleteBlock(bc.headerHashes[index], writeBuf)
|
||||
if err != nil {
|
||||
bc.log.Warn("error while removing old block",
|
||||
zap.Uint32("index", index),
|
||||
zap.Error(err))
|
||||
}
|
||||
writeBuf.Reset()
|
||||
}
|
||||
}
|
||||
_, err := kvcache.Persist()
|
||||
if err != nil {
|
||||
blockdone <- err
|
||||
}
|
||||
close(blockdone)
|
||||
}()
|
||||
go func() {
|
||||
var (
|
||||
kvcache = dao.NewCached(cache)
|
||||
writeBuf = io.NewBufBinWriter()
|
||||
err error
|
||||
appendBlock bool
|
||||
)
|
||||
for aer := range aerchan {
|
||||
if aer.Container == block.Hash() && appendBlock {
|
||||
err = kvcache.AppendAppExecResult(aer, writeBuf)
|
||||
} else {
|
||||
err = kvcache.PutAppExecResult(aer, writeBuf)
|
||||
if aer.Container == block.Hash() {
|
||||
appendBlock = true
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to store exec result: %w", err)
|
||||
break
|
||||
}
|
||||
if aer.Execution.VMState == vm.HaltState {
|
||||
for j := range aer.Execution.Events {
|
||||
bc.handleNotification(&aer.Execution.Events[j], kvcache, block, aer.Container)
|
||||
}
|
||||
}
|
||||
writeBuf.Reset()
|
||||
}
|
||||
if err != nil {
|
||||
aerdone <- err
|
||||
return
|
||||
}
|
||||
_, err = kvcache.Persist()
|
||||
if err != nil {
|
||||
aerdone <- err
|
||||
}
|
||||
close(aerdone)
|
||||
}()
|
||||
aer, err := bc.runPersist(bc.contracts.GetPersistScript(), block, cache, trigger.OnPersist)
|
||||
if err != nil {
|
||||
// Release goroutines, don't care about errors, we already have one.
|
||||
close(aerchan)
|
||||
<-blockdone
|
||||
<-aerdone
|
||||
return fmt.Errorf("onPersist failed: %w", err)
|
||||
}
|
||||
appExecResults = append(appExecResults, aer)
|
||||
aerchan <- aer
|
||||
|
||||
for _, tx := range block.Transactions {
|
||||
systemInterop := bc.newInteropContext(trigger.Application, cache, block, tx)
|
||||
v := systemInterop.SpawnVM()
|
||||
v.LoadScriptWithFlags(tx.Script, callflag.All)
|
||||
|
@ -719,11 +800,12 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
|
|||
if !v.HasFailed() {
|
||||
_, err := systemInterop.DAO.Persist()
|
||||
if err != nil {
|
||||
// Release goroutines, don't care about errors, we already have one.
|
||||
close(aerchan)
|
||||
<-blockdone
|
||||
<-aerdone
|
||||
return fmt.Errorf("failed to persist invocation results: %w", err)
|
||||
}
|
||||
for j := range systemInterop.Notifications {
|
||||
bc.handleNotification(&systemInterop.Notifications[j], cache, block, tx.Hash())
|
||||
}
|
||||
} else {
|
||||
bc.log.Warn("contract invocation failed",
|
||||
zap.String("tx", tx.Hash().StringLE()),
|
||||
|
@ -743,40 +825,27 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
|
|||
},
|
||||
}
|
||||
appExecResults = append(appExecResults, aer)
|
||||
err = cache.PutAppExecResult(aer, writeBuf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store tx exec result: %w", err)
|
||||
}
|
||||
writeBuf.Reset()
|
||||
|
||||
if bc.config.P2PSigExtensions {
|
||||
for _, attr := range tx.GetAttributes(transaction.ConflictsT) {
|
||||
hash := attr.Value.(*transaction.Conflicts).Hash
|
||||
dummyTx := transaction.NewTrimmedTX(hash)
|
||||
dummyTx.Version = transaction.DummyVersion
|
||||
if err = cache.StoreAsTransaction(dummyTx, block.Index, writeBuf); err != nil {
|
||||
return fmt.Errorf("failed to store conflicting transaction %s for transaction %s: %w", hash.StringLE(), tx.Hash().StringLE(), err)
|
||||
}
|
||||
writeBuf.Reset()
|
||||
}
|
||||
}
|
||||
aerchan <- aer
|
||||
}
|
||||
|
||||
aer, err = bc.runPersist(bc.contracts.GetPostPersistScript(), block, cache, trigger.PostPersist)
|
||||
if err != nil {
|
||||
// Release goroutines, don't care about errors, we already have one.
|
||||
close(aerchan)
|
||||
<-blockdone
|
||||
<-aerdone
|
||||
return fmt.Errorf("postPersist failed: %w", err)
|
||||
}
|
||||
appExecResults = append(appExecResults, aer)
|
||||
err = cache.AppendAppExecResult(aer, writeBuf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store postPersist exec result: %w", err)
|
||||
}
|
||||
writeBuf.Reset()
|
||||
|
||||
d := cache.DAO.(*dao.Simple)
|
||||
aerchan <- aer
|
||||
close(aerchan)
|
||||
d := cache.(*dao.Simple)
|
||||
b := d.GetMPTBatch()
|
||||
mpt, sr, err := bc.stateRoot.AddMPTBatch(block.Index, b, d.Store)
|
||||
if err != nil {
|
||||
// Release goroutines, don't care about errors, we already have one.
|
||||
<-blockdone
|
||||
<-aerdone
|
||||
// Here MPT can be left in a half-applied state.
|
||||
// However if this error occurs, this is a bug somewhere in code
|
||||
// because changes applied are the ones from HALTed transactions.
|
||||
|
@ -785,27 +854,20 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
|
|||
if bc.config.StateRootInHeader && bc.HeaderHeight() > sr.Index {
|
||||
h, err := bc.GetHeader(bc.GetHeaderHash(int(sr.Index) + 1))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get next header: %w", err)
|
||||
err = fmt.Errorf("failed to get next header: %w", err)
|
||||
} else if h.PrevStateRoot != sr.Root {
|
||||
err = fmt.Errorf("local stateroot and next header's PrevStateRoot mismatch: %s vs %s", sr.Root.StringBE(), h.PrevStateRoot.StringBE())
|
||||
}
|
||||
if h.PrevStateRoot != sr.Root {
|
||||
return fmt.Errorf("local stateroot and next header's PrevStateRoot mismatch: %s vs %s", sr.Root.StringBE(), h.PrevStateRoot.StringBE())
|
||||
if err != nil {
|
||||
// Release goroutines, don't care about errors, we already have one.
|
||||
<-blockdone
|
||||
<-aerdone
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if bc.config.SaveStorageBatch {
|
||||
bc.lastBatch = cache.DAO.GetBatch()
|
||||
}
|
||||
if bc.config.RemoveUntraceableBlocks {
|
||||
if block.Index > bc.config.MaxTraceableBlocks {
|
||||
index := block.Index - bc.config.MaxTraceableBlocks // is at least 1
|
||||
err := cache.DeleteBlock(bc.headerHashes[index], writeBuf)
|
||||
if err != nil {
|
||||
bc.log.Warn("error while removing old block",
|
||||
zap.Uint32("index", index),
|
||||
zap.Error(err))
|
||||
}
|
||||
writeBuf.Reset()
|
||||
}
|
||||
bc.lastBatch = d.GetBatch()
|
||||
}
|
||||
// Every persist cycle we also compact our in-memory MPT. It's flushed
|
||||
// already in AddMPTBatch, so collapsing it is safe.
|
||||
|
@ -815,6 +877,16 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
|
|||
mpt.Collapse(10)
|
||||
}
|
||||
|
||||
// Wait for _both_ goroutines to finish.
|
||||
blockerr := <-blockdone
|
||||
aererr := <-aerdone
|
||||
if blockerr != nil {
|
||||
return blockerr
|
||||
}
|
||||
if aererr != nil {
|
||||
return aererr
|
||||
}
|
||||
|
||||
bc.lock.Lock()
|
||||
_, err = cache.Persist()
|
||||
if err != nil {
|
||||
|
@ -895,7 +967,7 @@ func (bc *Blockchain) IsExtensibleAllowed(u util.Uint160) bool {
|
|||
return n < len(us)
|
||||
}
|
||||
|
||||
func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache *dao.Cached, trig trigger.Type) (*state.AppExecResult, error) {
|
||||
func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache dao.DAO, trig trigger.Type) (*state.AppExecResult, error) {
|
||||
systemInterop := bc.newInteropContext(trig, cache, block, nil)
|
||||
v := systemInterop.SpawnVM()
|
||||
v.LoadScriptWithFlags(script, callflag.All)
|
||||
|
@ -905,9 +977,6 @@ func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache *dao.C
|
|||
} else if _, err := systemInterop.DAO.Persist(); err != nil {
|
||||
return nil, fmt.Errorf("can't save changes: %w", err)
|
||||
}
|
||||
for i := range systemInterop.Notifications {
|
||||
bc.handleNotification(&systemInterop.Notifications[i], cache, block, block.Hash())
|
||||
}
|
||||
return &state.AppExecResult{
|
||||
Container: block.Hash(), // application logs can be retrieved by block hash
|
||||
Execution: state.Execution{
|
||||
|
|
Loading…
Reference in a new issue