From 49be7538503bf07152f9fbd9c8e2ea93485752c2 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 29 Dec 2020 18:25:21 +0300 Subject: [PATCH 1/3] core: spread storeBlock actions to three goroutines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Block processing consists of: * saving block/transactions to the DB * executing blocks/transactions * processing notifications/saving AERs * updating MPT * atomically updating Blockchain state Of these the first one is completely independent of others, it can be done in a separate routine easily. The third one technically depends on the second, it just doesn't have data until something is executed. At the same time it doesn't affect future executions in any way, so we can offload AER/notification processing to separate goroutine (while the main thread proceeds with other transactions). MPT update depends on all executions, so it can't be offloaded, but it can be done concurrently to AER processing. And only the last thing actually needs all previous ones to be finished, so it's a natural synchronization point. So we spawn two additional routines and let the main one execute transactions and update MPT as fast as it can. While technically all of these routines could share single DAO (they are working with different KV sets) benchmarking shows that using separate DAOs and then persisting them to lower one actually works about 7-8%% better. At the same time we can simplify DAOs used, Cached one is only relevant for AER processing because it caches NEP-17 tracking data, everything else can do just fine with Simple. The change was tested for performance with neo-bench (single node, 10 workers, LevelDB) on two machines and block dump processing (RC4 testnet up to 50825 with VerifyBlocks set to false) on i7-8565U. neo-bench creates huge blocks with lots of transactions while RC4 dump mostly consists of empty blocks. Reference results (06c3dda5d1e713eb7fca59dd07621d22cb31dd53): Ryzen 9 5950X: RPS ≈ 20059.569 21186.328 20158.983 ≈ 20468 ± 3.05% TPS ≈ 19544.993 20585.450 19658.338 ≈ 19930 ± 2.86% CPU ≈ 18.682% 23.877% 22.852% ≈ 21.8 ± 12.62% Mem ≈ 618.981MB 559.246MB 541.539MB ≈ 573 ± 7.08% Core i7-8565U: RPS ≈ 5927.082 6526.739 6372.115 ≈ 6275 ± 4.96% TPS ≈ 5899.531 6477.187 6329.515 ≈ 6235 ± 4.81% CPU ≈ 56.346% 61.955% 58.125% ≈ 58.8 ± 4.87% Mem ≈ 212.191MB 224.974MB 205.479MB ≈ 214 ± 4.62% DB restore: real 0m12.683s 0m13.222s 0m13.382s ≈ 13.096 ± 2.80% user 0m18.501s 0m19.163s 0m19.489s ≈ 19.051 ± 2.64% sys 0m1.404s 0m1.396s 0m1.666s ≈ 1.489 ± 10.32% After the change: Ryzen 9 5950X: RPS ≈ 23056.899 22822.015 23006.543 ≈ 22962 ± 0.54% TPS ≈ 22594.785 22292.071 22800.857 ≈ 22562 ± 1.13% CPU ≈ 24.262% 23.185% 25.921% ≈ 24.5 ± 5.65% Mem ≈ 614.254MB 613.204MB 555.491MB ≈ 594 ± 5.66% Core i7-8565U: RPS ≈ 6378.702 6423.927 6363.788 ≈ 6389 ± 0.49% TPS ≈ 6327.072 6372.552 6311.179 ≈ 6337 ± 0.50% CPU ≈ 57.599% 58.622% 59.737% ≈ 58.7 ± 1.82% Mem ≈ 198.697MB 188.746MB 200.235MB ≈ 196 ± 3.18% DB restore: real 0m13.576s 0m13.334s 0m12.757s ≈ 13.222 ± 3.18% user 0m19.113s 0m19.490s 0m20.197s ≈ 19.600 ± 2.81% sys 0m2.211s 0m1.558s 0m1.559s ≈ 1.776 ± 21.21% On Ryzen 9 we've got 12% better RPS, 13% better TPS with 12% CPU and 3% RAM more used. Core i7-8565U changes don't seem to be statistically significant: 1.8% more RPS, 1.6% more TPS with about the same CPU and 8.5% less RAM used. It also is 1% worse in DB restore time. The result is somewhat expected, on a powerful machine with lots of spare cores we get 10%+ better results while on average resource-constrained laptop it doesn't change much (the machine is already saturated). Overall, this seems to be worthwhile. --- pkg/core/blockchain.go | 217 +++++++++++++++++++++++++++-------------- 1 file changed, 143 insertions(+), 74 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 0dd9f174c..11d0d78de 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -677,36 +677,117 @@ func (bc *Blockchain) GetStateModule() blockchainer.StateRoot { // transactions with all appropriate side-effects and updates Blockchain state. // This is the only way to change Blockchain state. func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error { - cache := dao.NewCached(bc.dao) - writeBuf := io.NewBufBinWriter() - appExecResults := make([]*state.AppExecResult, 0, 2+len(block.Transactions)) - if err := cache.StoreAsBlock(block, writeBuf); err != nil { - return err - } - writeBuf.Reset() - - if err := cache.StoreAsCurrentBlock(block, writeBuf); err != nil { - return err - } - writeBuf.Reset() - - aer, err := bc.runPersist(bc.contracts.GetPersistScript(), block, cache, trigger.OnPersist) - if err != nil { - return fmt.Errorf("onPersist failed: %w", err) - } - appExecResults = append(appExecResults, aer) - err = cache.PutAppExecResult(aer, writeBuf) - if err != nil { - return fmt.Errorf("failed to store onPersist exec result: %w", err) - } - writeBuf.Reset() - - for _, tx := range block.Transactions { - if err := cache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil { - return err + var ( + cache = bc.dao.GetWrapped() + appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions)) + aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune. + aerdone = make(chan error) + blockdone = make(chan error) + ) + go func() { + var ( + kvcache = cache.GetWrapped() + writeBuf = io.NewBufBinWriter() + ) + if err := kvcache.StoreAsBlock(block, writeBuf); err != nil { + blockdone <- err + return } writeBuf.Reset() + if err := kvcache.StoreAsCurrentBlock(block, writeBuf); err != nil { + blockdone <- err + return + } + writeBuf.Reset() + + for _, tx := range block.Transactions { + if err := kvcache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil { + blockdone <- err + return + } + + writeBuf.Reset() + if bc.config.P2PSigExtensions { + for _, attr := range tx.GetAttributes(transaction.ConflictsT) { + hash := attr.Value.(*transaction.Conflicts).Hash + dummyTx := transaction.NewTrimmedTX(hash) + dummyTx.Version = transaction.DummyVersion + if err := kvcache.StoreAsTransaction(dummyTx, block.Index, writeBuf); err != nil { + blockdone <- fmt.Errorf("failed to store conflicting transaction %s for transaction %s: %w", hash.StringLE(), tx.Hash().StringLE(), err) + return + } + writeBuf.Reset() + } + } + } + if bc.config.RemoveUntraceableBlocks { + if block.Index > bc.config.MaxTraceableBlocks { + index := block.Index - bc.config.MaxTraceableBlocks // is at least 1 + err := kvcache.DeleteBlock(bc.headerHashes[index], writeBuf) + if err != nil { + bc.log.Warn("error while removing old block", + zap.Uint32("index", index), + zap.Error(err)) + } + writeBuf.Reset() + } + } + _, err := kvcache.Persist() + if err != nil { + blockdone <- err + } + close(blockdone) + }() + go func() { + var ( + kvcache = dao.NewCached(cache) + writeBuf = io.NewBufBinWriter() + err error + appendBlock bool + ) + for aer := range aerchan { + if aer.Container == block.Hash() && appendBlock { + err = kvcache.AppendAppExecResult(aer, writeBuf) + } else { + err = kvcache.PutAppExecResult(aer, writeBuf) + if aer.Container == block.Hash() { + appendBlock = true + } + } + if err != nil { + err = fmt.Errorf("failed to store exec result: %w", err) + break + } + if aer.Execution.VMState == vm.HaltState { + for j := range aer.Execution.Events { + bc.handleNotification(&aer.Execution.Events[j], kvcache, block, aer.Container) + } + } + writeBuf.Reset() + } + if err != nil { + aerdone <- err + return + } + _, err = kvcache.Persist() + if err != nil { + aerdone <- err + } + close(aerdone) + }() + aer, err := bc.runPersist(bc.contracts.GetPersistScript(), block, cache, trigger.OnPersist) + if err != nil { + // Release goroutines, don't care about errors, we already have one. + close(aerchan) + <-blockdone + <-aerdone + return fmt.Errorf("onPersist failed: %w", err) + } + appExecResults = append(appExecResults, aer) + aerchan <- aer + + for _, tx := range block.Transactions { systemInterop := bc.newInteropContext(trigger.Application, cache, block, tx) v := systemInterop.SpawnVM() v.LoadScriptWithFlags(tx.Script, callflag.All) @@ -719,11 +800,12 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error if !v.HasFailed() { _, err := systemInterop.DAO.Persist() if err != nil { + // Release goroutines, don't care about errors, we already have one. + close(aerchan) + <-blockdone + <-aerdone return fmt.Errorf("failed to persist invocation results: %w", err) } - for j := range systemInterop.Notifications { - bc.handleNotification(&systemInterop.Notifications[j], cache, block, tx.Hash()) - } } else { bc.log.Warn("contract invocation failed", zap.String("tx", tx.Hash().StringLE()), @@ -743,40 +825,27 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error }, } appExecResults = append(appExecResults, aer) - err = cache.PutAppExecResult(aer, writeBuf) - if err != nil { - return fmt.Errorf("failed to store tx exec result: %w", err) - } - writeBuf.Reset() - - if bc.config.P2PSigExtensions { - for _, attr := range tx.GetAttributes(transaction.ConflictsT) { - hash := attr.Value.(*transaction.Conflicts).Hash - dummyTx := transaction.NewTrimmedTX(hash) - dummyTx.Version = transaction.DummyVersion - if err = cache.StoreAsTransaction(dummyTx, block.Index, writeBuf); err != nil { - return fmt.Errorf("failed to store conflicting transaction %s for transaction %s: %w", hash.StringLE(), tx.Hash().StringLE(), err) - } - writeBuf.Reset() - } - } + aerchan <- aer } aer, err = bc.runPersist(bc.contracts.GetPostPersistScript(), block, cache, trigger.PostPersist) if err != nil { + // Release goroutines, don't care about errors, we already have one. + close(aerchan) + <-blockdone + <-aerdone return fmt.Errorf("postPersist failed: %w", err) } appExecResults = append(appExecResults, aer) - err = cache.AppendAppExecResult(aer, writeBuf) - if err != nil { - return fmt.Errorf("failed to store postPersist exec result: %w", err) - } - writeBuf.Reset() - - d := cache.DAO.(*dao.Simple) + aerchan <- aer + close(aerchan) + d := cache.(*dao.Simple) b := d.GetMPTBatch() mpt, sr, err := bc.stateRoot.AddMPTBatch(block.Index, b, d.Store) if err != nil { + // Release goroutines, don't care about errors, we already have one. + <-blockdone + <-aerdone // Here MPT can be left in a half-applied state. // However if this error occurs, this is a bug somewhere in code // because changes applied are the ones from HALTed transactions. @@ -785,27 +854,20 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error if bc.config.StateRootInHeader && bc.HeaderHeight() > sr.Index { h, err := bc.GetHeader(bc.GetHeaderHash(int(sr.Index) + 1)) if err != nil { - return fmt.Errorf("failed to get next header: %w", err) + err = fmt.Errorf("failed to get next header: %w", err) + } else if h.PrevStateRoot != sr.Root { + err = fmt.Errorf("local stateroot and next header's PrevStateRoot mismatch: %s vs %s", sr.Root.StringBE(), h.PrevStateRoot.StringBE()) } - if h.PrevStateRoot != sr.Root { - return fmt.Errorf("local stateroot and next header's PrevStateRoot mismatch: %s vs %s", sr.Root.StringBE(), h.PrevStateRoot.StringBE()) + if err != nil { + // Release goroutines, don't care about errors, we already have one. + <-blockdone + <-aerdone + return err } } if bc.config.SaveStorageBatch { - bc.lastBatch = cache.DAO.GetBatch() - } - if bc.config.RemoveUntraceableBlocks { - if block.Index > bc.config.MaxTraceableBlocks { - index := block.Index - bc.config.MaxTraceableBlocks // is at least 1 - err := cache.DeleteBlock(bc.headerHashes[index], writeBuf) - if err != nil { - bc.log.Warn("error while removing old block", - zap.Uint32("index", index), - zap.Error(err)) - } - writeBuf.Reset() - } + bc.lastBatch = d.GetBatch() } // Every persist cycle we also compact our in-memory MPT. It's flushed // already in AddMPTBatch, so collapsing it is safe. @@ -815,6 +877,16 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error mpt.Collapse(10) } + // Wait for _both_ goroutines to finish. + blockerr := <-blockdone + aererr := <-aerdone + if blockerr != nil { + return blockerr + } + if aererr != nil { + return aererr + } + bc.lock.Lock() _, err = cache.Persist() if err != nil { @@ -895,7 +967,7 @@ func (bc *Blockchain) IsExtensibleAllowed(u util.Uint160) bool { return n < len(us) } -func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache *dao.Cached, trig trigger.Type) (*state.AppExecResult, error) { +func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache dao.DAO, trig trigger.Type) (*state.AppExecResult, error) { systemInterop := bc.newInteropContext(trig, cache, block, nil) v := systemInterop.SpawnVM() v.LoadScriptWithFlags(script, callflag.All) @@ -905,9 +977,6 @@ func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache *dao.C } else if _, err := systemInterop.DAO.Persist(); err != nil { return nil, fmt.Errorf("can't save changes: %w", err) } - for i := range systemInterop.Notifications { - bc.handleNotification(&systemInterop.Notifications[i], cache, block, block.Hash()) - } return &state.AppExecResult{ Container: block.Hash(), // application logs can be retrieved by block hash Execution: state.Execution{ From fa7314ea906700aa54378cc4a09bf0e456be3fd3 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 29 Jul 2021 22:47:33 +0300 Subject: [PATCH 2/3] dao: drop dropNEP17Cache from Cached It's not used now. --- pkg/core/dao/cacheddao.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/pkg/core/dao/cacheddao.go b/pkg/core/dao/cacheddao.go index 342b9d17b..f147deb85 100644 --- a/pkg/core/dao/cacheddao.go +++ b/pkg/core/dao/cacheddao.go @@ -15,15 +15,13 @@ type Cached struct { DAO balances map[util.Uint160]*state.NEP17TransferInfo transfers map[util.Uint160]map[uint32]*state.NEP17TransferLog - - dropNEP17Cache bool } // NewCached returns new Cached wrapping around given backing store. func NewCached(d DAO) *Cached { balances := make(map[util.Uint160]*state.NEP17TransferInfo) transfers := make(map[util.Uint160]map[uint32]*state.NEP17TransferLog) - return &Cached{d.GetWrapped(), balances, transfers, false} + return &Cached{d.GetWrapped(), balances, transfers} } // GetNEP17TransferInfo retrieves NEP17TransferInfo for the acc. @@ -87,9 +85,6 @@ func (cd *Cached) Persist() (int, error) { // usage scenario it should be good enough if cd doesn't modify object // caches (accounts/transfer data) in any way. if ok { - if cd.dropNEP17Cache { - lowerCache.balances = make(map[util.Uint160]*state.NEP17TransferInfo) - } var simpleCache *Simple for simpleCache == nil { simpleCache, ok = lowerCache.DAO.(*Simple) @@ -127,6 +122,5 @@ func (cd *Cached) GetWrapped() DAO { return &Cached{cd.DAO.GetWrapped(), cd.balances, cd.transfers, - false, } } From 3cebd2b1293ab2b6c9fb77f9db137f073ae0736a Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 29 Jul 2021 22:33:53 +0300 Subject: [PATCH 3/3] interop: use non-Cached wrapped DAO Cached only caches NEP-17 tracking data now, it makes no sense here. --- pkg/core/interop/context.go | 4 ++-- pkg/core/native/ledger.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/core/interop/context.go b/pkg/core/interop/context.go index 6e7f94618..1b0f003f8 100644 --- a/pkg/core/interop/context.go +++ b/pkg/core/interop/context.go @@ -42,7 +42,7 @@ type Context struct { Block *block.Block NonceData [16]byte Tx *transaction.Transaction - DAO *dao.Cached + DAO dao.DAO Notifications []state.NotificationEvent Log *zap.Logger VM *vm.VM @@ -54,7 +54,7 @@ type Context struct { func NewContext(trigger trigger.Type, bc blockchainer.Blockchainer, d dao.DAO, getContract func(dao.DAO, util.Uint160) (*state.Contract, error), natives []Contract, block *block.Block, tx *transaction.Transaction, log *zap.Logger) *Context { - dao := dao.NewCached(d) + dao := d.GetWrapped() nes := make([]state.NotificationEvent, 0) return &Context{ Chain: bc, diff --git a/pkg/core/native/ledger.go b/pkg/core/native/ledger.go index 8c901162c..2d958d629 100644 --- a/pkg/core/native/ledger.go +++ b/pkg/core/native/ledger.go @@ -180,7 +180,7 @@ func getBlockHashFromItem(bc blockchainer.Blockchainer, item stackitem.Item) uti // getTransactionAndHeight returns transaction and its height if it's present // on the chain. It panics if anything goes wrong. -func getTransactionAndHeight(cd *dao.Cached, item stackitem.Item) (*transaction.Transaction, uint32, error) { +func getTransactionAndHeight(d dao.DAO, item stackitem.Item) (*transaction.Transaction, uint32, error) { hashbytes, err := item.TryBytes() if err != nil { panic(err) @@ -189,7 +189,7 @@ func getTransactionAndHeight(cd *dao.Cached, item stackitem.Item) (*transaction. if err != nil { panic(err) } - return cd.GetTransaction(hash) + return d.GetTransaction(hash) } // BlockToStackItem converts block.Block to stackitem.Item.