diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 0dd9f174c..11d0d78de 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -677,36 +677,117 @@ func (bc *Blockchain) GetStateModule() blockchainer.StateRoot { // transactions with all appropriate side-effects and updates Blockchain state. // This is the only way to change Blockchain state. func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error { - cache := dao.NewCached(bc.dao) - writeBuf := io.NewBufBinWriter() - appExecResults := make([]*state.AppExecResult, 0, 2+len(block.Transactions)) - if err := cache.StoreAsBlock(block, writeBuf); err != nil { - return err - } - writeBuf.Reset() - - if err := cache.StoreAsCurrentBlock(block, writeBuf); err != nil { - return err - } - writeBuf.Reset() - - aer, err := bc.runPersist(bc.contracts.GetPersistScript(), block, cache, trigger.OnPersist) - if err != nil { - return fmt.Errorf("onPersist failed: %w", err) - } - appExecResults = append(appExecResults, aer) - err = cache.PutAppExecResult(aer, writeBuf) - if err != nil { - return fmt.Errorf("failed to store onPersist exec result: %w", err) - } - writeBuf.Reset() - - for _, tx := range block.Transactions { - if err := cache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil { - return err + var ( + cache = bc.dao.GetWrapped() + appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions)) + aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune. + aerdone = make(chan error) + blockdone = make(chan error) + ) + go func() { + var ( + kvcache = cache.GetWrapped() + writeBuf = io.NewBufBinWriter() + ) + if err := kvcache.StoreAsBlock(block, writeBuf); err != nil { + blockdone <- err + return } writeBuf.Reset() + if err := kvcache.StoreAsCurrentBlock(block, writeBuf); err != nil { + blockdone <- err + return + } + writeBuf.Reset() + + for _, tx := range block.Transactions { + if err := kvcache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil { + blockdone <- err + return + } + + writeBuf.Reset() + if bc.config.P2PSigExtensions { + for _, attr := range tx.GetAttributes(transaction.ConflictsT) { + hash := attr.Value.(*transaction.Conflicts).Hash + dummyTx := transaction.NewTrimmedTX(hash) + dummyTx.Version = transaction.DummyVersion + if err := kvcache.StoreAsTransaction(dummyTx, block.Index, writeBuf); err != nil { + blockdone <- fmt.Errorf("failed to store conflicting transaction %s for transaction %s: %w", hash.StringLE(), tx.Hash().StringLE(), err) + return + } + writeBuf.Reset() + } + } + } + if bc.config.RemoveUntraceableBlocks { + if block.Index > bc.config.MaxTraceableBlocks { + index := block.Index - bc.config.MaxTraceableBlocks // is at least 1 + err := kvcache.DeleteBlock(bc.headerHashes[index], writeBuf) + if err != nil { + bc.log.Warn("error while removing old block", + zap.Uint32("index", index), + zap.Error(err)) + } + writeBuf.Reset() + } + } + _, err := kvcache.Persist() + if err != nil { + blockdone <- err + } + close(blockdone) + }() + go func() { + var ( + kvcache = dao.NewCached(cache) + writeBuf = io.NewBufBinWriter() + err error + appendBlock bool + ) + for aer := range aerchan { + if aer.Container == block.Hash() && appendBlock { + err = kvcache.AppendAppExecResult(aer, writeBuf) + } else { + err = kvcache.PutAppExecResult(aer, writeBuf) + if aer.Container == block.Hash() { + appendBlock = true + } + } + if err != nil { + err = fmt.Errorf("failed to store exec result: %w", err) + break + } + if aer.Execution.VMState == vm.HaltState { + for j := range aer.Execution.Events { + bc.handleNotification(&aer.Execution.Events[j], kvcache, block, aer.Container) + } + } + writeBuf.Reset() + } + if err != nil { + aerdone <- err + return + } + _, err = kvcache.Persist() + if err != nil { + aerdone <- err + } + close(aerdone) + }() + aer, err := bc.runPersist(bc.contracts.GetPersistScript(), block, cache, trigger.OnPersist) + if err != nil { + // Release goroutines, don't care about errors, we already have one. + close(aerchan) + <-blockdone + <-aerdone + return fmt.Errorf("onPersist failed: %w", err) + } + appExecResults = append(appExecResults, aer) + aerchan <- aer + + for _, tx := range block.Transactions { systemInterop := bc.newInteropContext(trigger.Application, cache, block, tx) v := systemInterop.SpawnVM() v.LoadScriptWithFlags(tx.Script, callflag.All) @@ -719,11 +800,12 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error if !v.HasFailed() { _, err := systemInterop.DAO.Persist() if err != nil { + // Release goroutines, don't care about errors, we already have one. + close(aerchan) + <-blockdone + <-aerdone return fmt.Errorf("failed to persist invocation results: %w", err) } - for j := range systemInterop.Notifications { - bc.handleNotification(&systemInterop.Notifications[j], cache, block, tx.Hash()) - } } else { bc.log.Warn("contract invocation failed", zap.String("tx", tx.Hash().StringLE()), @@ -743,40 +825,27 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error }, } appExecResults = append(appExecResults, aer) - err = cache.PutAppExecResult(aer, writeBuf) - if err != nil { - return fmt.Errorf("failed to store tx exec result: %w", err) - } - writeBuf.Reset() - - if bc.config.P2PSigExtensions { - for _, attr := range tx.GetAttributes(transaction.ConflictsT) { - hash := attr.Value.(*transaction.Conflicts).Hash - dummyTx := transaction.NewTrimmedTX(hash) - dummyTx.Version = transaction.DummyVersion - if err = cache.StoreAsTransaction(dummyTx, block.Index, writeBuf); err != nil { - return fmt.Errorf("failed to store conflicting transaction %s for transaction %s: %w", hash.StringLE(), tx.Hash().StringLE(), err) - } - writeBuf.Reset() - } - } + aerchan <- aer } aer, err = bc.runPersist(bc.contracts.GetPostPersistScript(), block, cache, trigger.PostPersist) if err != nil { + // Release goroutines, don't care about errors, we already have one. + close(aerchan) + <-blockdone + <-aerdone return fmt.Errorf("postPersist failed: %w", err) } appExecResults = append(appExecResults, aer) - err = cache.AppendAppExecResult(aer, writeBuf) - if err != nil { - return fmt.Errorf("failed to store postPersist exec result: %w", err) - } - writeBuf.Reset() - - d := cache.DAO.(*dao.Simple) + aerchan <- aer + close(aerchan) + d := cache.(*dao.Simple) b := d.GetMPTBatch() mpt, sr, err := bc.stateRoot.AddMPTBatch(block.Index, b, d.Store) if err != nil { + // Release goroutines, don't care about errors, we already have one. + <-blockdone + <-aerdone // Here MPT can be left in a half-applied state. // However if this error occurs, this is a bug somewhere in code // because changes applied are the ones from HALTed transactions. @@ -785,27 +854,20 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error if bc.config.StateRootInHeader && bc.HeaderHeight() > sr.Index { h, err := bc.GetHeader(bc.GetHeaderHash(int(sr.Index) + 1)) if err != nil { - return fmt.Errorf("failed to get next header: %w", err) + err = fmt.Errorf("failed to get next header: %w", err) + } else if h.PrevStateRoot != sr.Root { + err = fmt.Errorf("local stateroot and next header's PrevStateRoot mismatch: %s vs %s", sr.Root.StringBE(), h.PrevStateRoot.StringBE()) } - if h.PrevStateRoot != sr.Root { - return fmt.Errorf("local stateroot and next header's PrevStateRoot mismatch: %s vs %s", sr.Root.StringBE(), h.PrevStateRoot.StringBE()) + if err != nil { + // Release goroutines, don't care about errors, we already have one. + <-blockdone + <-aerdone + return err } } if bc.config.SaveStorageBatch { - bc.lastBatch = cache.DAO.GetBatch() - } - if bc.config.RemoveUntraceableBlocks { - if block.Index > bc.config.MaxTraceableBlocks { - index := block.Index - bc.config.MaxTraceableBlocks // is at least 1 - err := cache.DeleteBlock(bc.headerHashes[index], writeBuf) - if err != nil { - bc.log.Warn("error while removing old block", - zap.Uint32("index", index), - zap.Error(err)) - } - writeBuf.Reset() - } + bc.lastBatch = d.GetBatch() } // Every persist cycle we also compact our in-memory MPT. It's flushed // already in AddMPTBatch, so collapsing it is safe. @@ -815,6 +877,16 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error mpt.Collapse(10) } + // Wait for _both_ goroutines to finish. + blockerr := <-blockdone + aererr := <-aerdone + if blockerr != nil { + return blockerr + } + if aererr != nil { + return aererr + } + bc.lock.Lock() _, err = cache.Persist() if err != nil { @@ -895,7 +967,7 @@ func (bc *Blockchain) IsExtensibleAllowed(u util.Uint160) bool { return n < len(us) } -func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache *dao.Cached, trig trigger.Type) (*state.AppExecResult, error) { +func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache dao.DAO, trig trigger.Type) (*state.AppExecResult, error) { systemInterop := bc.newInteropContext(trig, cache, block, nil) v := systemInterop.SpawnVM() v.LoadScriptWithFlags(script, callflag.All) @@ -905,9 +977,6 @@ func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache *dao.C } else if _, err := systemInterop.DAO.Persist(); err != nil { return nil, fmt.Errorf("can't save changes: %w", err) } - for i := range systemInterop.Notifications { - bc.handleNotification(&systemInterop.Notifications[i], cache, block, block.Hash()) - } return &state.AppExecResult{ Container: block.Hash(), // application logs can be retrieved by block hash Execution: state.Execution{