Merge pull request #2101 from nspcc-dev/goroutiner

Improve big block processing
This commit is contained in:
Roman Khimov 2021-07-30 19:21:13 +03:00 committed by GitHub
commit bbe4e9cd7b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 148 additions and 85 deletions

View file

@ -677,36 +677,117 @@ func (bc *Blockchain) GetStateModule() blockchainer.StateRoot {
// transactions with all appropriate side-effects and updates Blockchain state.
// This is the only way to change Blockchain state.
func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error {
cache := dao.NewCached(bc.dao)
writeBuf := io.NewBufBinWriter()
appExecResults := make([]*state.AppExecResult, 0, 2+len(block.Transactions))
if err := cache.StoreAsBlock(block, writeBuf); err != nil {
return err
}
writeBuf.Reset()
if err := cache.StoreAsCurrentBlock(block, writeBuf); err != nil {
return err
}
writeBuf.Reset()
aer, err := bc.runPersist(bc.contracts.GetPersistScript(), block, cache, trigger.OnPersist)
if err != nil {
return fmt.Errorf("onPersist failed: %w", err)
}
appExecResults = append(appExecResults, aer)
err = cache.PutAppExecResult(aer, writeBuf)
if err != nil {
return fmt.Errorf("failed to store onPersist exec result: %w", err)
}
writeBuf.Reset()
for _, tx := range block.Transactions {
if err := cache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil {
return err
var (
cache = bc.dao.GetWrapped()
appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions))
aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune.
aerdone = make(chan error)
blockdone = make(chan error)
)
go func() {
var (
kvcache = cache.GetWrapped()
writeBuf = io.NewBufBinWriter()
)
if err := kvcache.StoreAsBlock(block, writeBuf); err != nil {
blockdone <- err
return
}
writeBuf.Reset()
if err := kvcache.StoreAsCurrentBlock(block, writeBuf); err != nil {
blockdone <- err
return
}
writeBuf.Reset()
for _, tx := range block.Transactions {
if err := kvcache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil {
blockdone <- err
return
}
writeBuf.Reset()
if bc.config.P2PSigExtensions {
for _, attr := range tx.GetAttributes(transaction.ConflictsT) {
hash := attr.Value.(*transaction.Conflicts).Hash
dummyTx := transaction.NewTrimmedTX(hash)
dummyTx.Version = transaction.DummyVersion
if err := kvcache.StoreAsTransaction(dummyTx, block.Index, writeBuf); err != nil {
blockdone <- fmt.Errorf("failed to store conflicting transaction %s for transaction %s: %w", hash.StringLE(), tx.Hash().StringLE(), err)
return
}
writeBuf.Reset()
}
}
}
if bc.config.RemoveUntraceableBlocks {
if block.Index > bc.config.MaxTraceableBlocks {
index := block.Index - bc.config.MaxTraceableBlocks // is at least 1
err := kvcache.DeleteBlock(bc.headerHashes[index], writeBuf)
if err != nil {
bc.log.Warn("error while removing old block",
zap.Uint32("index", index),
zap.Error(err))
}
writeBuf.Reset()
}
}
_, err := kvcache.Persist()
if err != nil {
blockdone <- err
}
close(blockdone)
}()
go func() {
var (
kvcache = dao.NewCached(cache)
writeBuf = io.NewBufBinWriter()
err error
appendBlock bool
)
for aer := range aerchan {
if aer.Container == block.Hash() && appendBlock {
err = kvcache.AppendAppExecResult(aer, writeBuf)
} else {
err = kvcache.PutAppExecResult(aer, writeBuf)
if aer.Container == block.Hash() {
appendBlock = true
}
}
if err != nil {
err = fmt.Errorf("failed to store exec result: %w", err)
break
}
if aer.Execution.VMState == vm.HaltState {
for j := range aer.Execution.Events {
bc.handleNotification(&aer.Execution.Events[j], kvcache, block, aer.Container)
}
}
writeBuf.Reset()
}
if err != nil {
aerdone <- err
return
}
_, err = kvcache.Persist()
if err != nil {
aerdone <- err
}
close(aerdone)
}()
aer, err := bc.runPersist(bc.contracts.GetPersistScript(), block, cache, trigger.OnPersist)
if err != nil {
// Release goroutines, don't care about errors, we already have one.
close(aerchan)
<-blockdone
<-aerdone
return fmt.Errorf("onPersist failed: %w", err)
}
appExecResults = append(appExecResults, aer)
aerchan <- aer
for _, tx := range block.Transactions {
systemInterop := bc.newInteropContext(trigger.Application, cache, block, tx)
v := systemInterop.SpawnVM()
v.LoadScriptWithFlags(tx.Script, callflag.All)
@ -719,11 +800,12 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
if !v.HasFailed() {
_, err := systemInterop.DAO.Persist()
if err != nil {
// Release goroutines, don't care about errors, we already have one.
close(aerchan)
<-blockdone
<-aerdone
return fmt.Errorf("failed to persist invocation results: %w", err)
}
for j := range systemInterop.Notifications {
bc.handleNotification(&systemInterop.Notifications[j], cache, block, tx.Hash())
}
} else {
bc.log.Warn("contract invocation failed",
zap.String("tx", tx.Hash().StringLE()),
@ -743,40 +825,27 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
},
}
appExecResults = append(appExecResults, aer)
err = cache.PutAppExecResult(aer, writeBuf)
if err != nil {
return fmt.Errorf("failed to store tx exec result: %w", err)
}
writeBuf.Reset()
if bc.config.P2PSigExtensions {
for _, attr := range tx.GetAttributes(transaction.ConflictsT) {
hash := attr.Value.(*transaction.Conflicts).Hash
dummyTx := transaction.NewTrimmedTX(hash)
dummyTx.Version = transaction.DummyVersion
if err = cache.StoreAsTransaction(dummyTx, block.Index, writeBuf); err != nil {
return fmt.Errorf("failed to store conflicting transaction %s for transaction %s: %w", hash.StringLE(), tx.Hash().StringLE(), err)
}
writeBuf.Reset()
}
}
aerchan <- aer
}
aer, err = bc.runPersist(bc.contracts.GetPostPersistScript(), block, cache, trigger.PostPersist)
if err != nil {
// Release goroutines, don't care about errors, we already have one.
close(aerchan)
<-blockdone
<-aerdone
return fmt.Errorf("postPersist failed: %w", err)
}
appExecResults = append(appExecResults, aer)
err = cache.AppendAppExecResult(aer, writeBuf)
if err != nil {
return fmt.Errorf("failed to store postPersist exec result: %w", err)
}
writeBuf.Reset()
d := cache.DAO.(*dao.Simple)
aerchan <- aer
close(aerchan)
d := cache.(*dao.Simple)
b := d.GetMPTBatch()
mpt, sr, err := bc.stateRoot.AddMPTBatch(block.Index, b, d.Store)
if err != nil {
// Release goroutines, don't care about errors, we already have one.
<-blockdone
<-aerdone
// Here MPT can be left in a half-applied state.
// However if this error occurs, this is a bug somewhere in code
// because changes applied are the ones from HALTed transactions.
@ -785,27 +854,20 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
if bc.config.StateRootInHeader && bc.HeaderHeight() > sr.Index {
h, err := bc.GetHeader(bc.GetHeaderHash(int(sr.Index) + 1))
if err != nil {
return fmt.Errorf("failed to get next header: %w", err)
err = fmt.Errorf("failed to get next header: %w", err)
} else if h.PrevStateRoot != sr.Root {
err = fmt.Errorf("local stateroot and next header's PrevStateRoot mismatch: %s vs %s", sr.Root.StringBE(), h.PrevStateRoot.StringBE())
}
if h.PrevStateRoot != sr.Root {
return fmt.Errorf("local stateroot and next header's PrevStateRoot mismatch: %s vs %s", sr.Root.StringBE(), h.PrevStateRoot.StringBE())
if err != nil {
// Release goroutines, don't care about errors, we already have one.
<-blockdone
<-aerdone
return err
}
}
if bc.config.SaveStorageBatch {
bc.lastBatch = cache.DAO.GetBatch()
}
if bc.config.RemoveUntraceableBlocks {
if block.Index > bc.config.MaxTraceableBlocks {
index := block.Index - bc.config.MaxTraceableBlocks // is at least 1
err := cache.DeleteBlock(bc.headerHashes[index], writeBuf)
if err != nil {
bc.log.Warn("error while removing old block",
zap.Uint32("index", index),
zap.Error(err))
}
writeBuf.Reset()
}
bc.lastBatch = d.GetBatch()
}
// Every persist cycle we also compact our in-memory MPT. It's flushed
// already in AddMPTBatch, so collapsing it is safe.
@ -815,6 +877,16 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
mpt.Collapse(10)
}
// Wait for _both_ goroutines to finish.
blockerr := <-blockdone
aererr := <-aerdone
if blockerr != nil {
return blockerr
}
if aererr != nil {
return aererr
}
bc.lock.Lock()
_, err = cache.Persist()
if err != nil {
@ -895,7 +967,7 @@ func (bc *Blockchain) IsExtensibleAllowed(u util.Uint160) bool {
return n < len(us)
}
func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache *dao.Cached, trig trigger.Type) (*state.AppExecResult, error) {
func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache dao.DAO, trig trigger.Type) (*state.AppExecResult, error) {
systemInterop := bc.newInteropContext(trig, cache, block, nil)
v := systemInterop.SpawnVM()
v.LoadScriptWithFlags(script, callflag.All)
@ -905,9 +977,6 @@ func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache *dao.C
} else if _, err := systemInterop.DAO.Persist(); err != nil {
return nil, fmt.Errorf("can't save changes: %w", err)
}
for i := range systemInterop.Notifications {
bc.handleNotification(&systemInterop.Notifications[i], cache, block, block.Hash())
}
return &state.AppExecResult{
Container: block.Hash(), // application logs can be retrieved by block hash
Execution: state.Execution{

View file

@ -15,15 +15,13 @@ type Cached struct {
DAO
balances map[util.Uint160]*state.NEP17TransferInfo
transfers map[util.Uint160]map[uint32]*state.NEP17TransferLog
dropNEP17Cache bool
}
// NewCached returns new Cached wrapping around given backing store.
func NewCached(d DAO) *Cached {
balances := make(map[util.Uint160]*state.NEP17TransferInfo)
transfers := make(map[util.Uint160]map[uint32]*state.NEP17TransferLog)
return &Cached{d.GetWrapped(), balances, transfers, false}
return &Cached{d.GetWrapped(), balances, transfers}
}
// GetNEP17TransferInfo retrieves NEP17TransferInfo for the acc.
@ -87,9 +85,6 @@ func (cd *Cached) Persist() (int, error) {
// usage scenario it should be good enough if cd doesn't modify object
// caches (accounts/transfer data) in any way.
if ok {
if cd.dropNEP17Cache {
lowerCache.balances = make(map[util.Uint160]*state.NEP17TransferInfo)
}
var simpleCache *Simple
for simpleCache == nil {
simpleCache, ok = lowerCache.DAO.(*Simple)
@ -127,6 +122,5 @@ func (cd *Cached) GetWrapped() DAO {
return &Cached{cd.DAO.GetWrapped(),
cd.balances,
cd.transfers,
false,
}
}

View file

@ -42,7 +42,7 @@ type Context struct {
Block *block.Block
NonceData [16]byte
Tx *transaction.Transaction
DAO *dao.Cached
DAO dao.DAO
Notifications []state.NotificationEvent
Log *zap.Logger
VM *vm.VM
@ -54,7 +54,7 @@ type Context struct {
func NewContext(trigger trigger.Type, bc blockchainer.Blockchainer, d dao.DAO,
getContract func(dao.DAO, util.Uint160) (*state.Contract, error), natives []Contract,
block *block.Block, tx *transaction.Transaction, log *zap.Logger) *Context {
dao := dao.NewCached(d)
dao := d.GetWrapped()
nes := make([]state.NotificationEvent, 0)
return &Context{
Chain: bc,

View file

@ -180,7 +180,7 @@ func getBlockHashFromItem(bc blockchainer.Blockchainer, item stackitem.Item) uti
// getTransactionAndHeight returns transaction and its height if it's present
// on the chain. It panics if anything goes wrong.
func getTransactionAndHeight(cd *dao.Cached, item stackitem.Item) (*transaction.Transaction, uint32, error) {
func getTransactionAndHeight(d dao.DAO, item stackitem.Item) (*transaction.Transaction, uint32, error) {
hashbytes, err := item.TryBytes()
if err != nil {
panic(err)
@ -189,7 +189,7 @@ func getTransactionAndHeight(cd *dao.Cached, item stackitem.Item) (*transaction.
if err != nil {
panic(err)
}
return cd.GetTransaction(hash)
return d.GetTransaction(hash)
}
// BlockToStackItem converts block.Block to stackitem.Item.