diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index f2b08c83a..1f53a550a 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -164,6 +164,12 @@ type bcEvent struct { appExecResults []*state.AppExecResult } +// transferData is used for transfer caching during storeBlock. +type transferData struct { + Info state.NEP17TransferInfo + Log state.NEP17TransferLog +} + // NewBlockchain returns a new blockchain object the will use the // given Store as its underlying storage. For it to work correctly you need // to spawn a goroutine for its Run method after this initialization. @@ -748,10 +754,11 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error }() go func() { var ( - kvcache = dao.NewCached(cache) + kvcache = cache.GetWrapped() writeBuf = io.NewBufBinWriter() err error appendBlock bool + transCache = make(map[util.Uint160]transferData) ) for aer := range aerchan { if aer.Container == block.Hash() && appendBlock { @@ -768,7 +775,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error } if aer.Execution.VMState == vm.HaltState { for j := range aer.Execution.Events { - bc.handleNotification(&aer.Execution.Events[j], kvcache, block, aer.Container) + bc.handleNotification(&aer.Execution.Events[j], kvcache, transCache, block, aer.Container) } } writeBuf.Reset() @@ -777,6 +784,19 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error aerdone <- err return } + for acc, trData := range transCache { + err = kvcache.PutNEP17TransferInfo(acc, &trData.Info) + if err != nil { + aerdone <- err + return + } + err = kvcache.PutNEP17TransferLog(acc, trData.Info.NextTransferBatch, &trData.Log) + if err != nil { + aerdone <- err + return + } + } + _, err = kvcache.Persist() if err != nil { aerdone <- err @@ -996,7 +1016,8 @@ func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache dao.DA }, nil } -func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d *dao.Cached, b *block.Block, h util.Uint256) { +func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d dao.DAO, + transCache map[util.Uint160]transferData, b *block.Block, h util.Uint256) { if note.Name != "Transfer" { return } @@ -1033,7 +1054,7 @@ func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d *dao.C } amount = bigint.FromBytes(bs) } - bc.processNEP17Transfer(d, h, b, note.ScriptHash, from, to, amount) + bc.processNEP17Transfer(d, transCache, h, b, note.ScriptHash, from, to, amount) } func parseUint160(addr []byte) util.Uint160 { @@ -1043,7 +1064,8 @@ func parseUint160(addr []byte) util.Uint160 { return util.Uint160{} } -func (bc *Blockchain) processNEP17Transfer(cache *dao.Cached, h util.Uint256, b *block.Block, sc util.Uint160, from, to []byte, amount *big.Int) { +func (bc *Blockchain) processNEP17Transfer(cache dao.DAO, transCache map[util.Uint160]transferData, + h util.Uint256, b *block.Block, sc util.Uint160, from, to []byte, amount *big.Int) { toAddr := parseUint160(to) fromAddr := parseUint160(from) var id int32 @@ -1067,31 +1089,48 @@ func (bc *Blockchain) processNEP17Transfer(cache *dao.Cached, h util.Uint256, b } if !fromAddr.Equals(util.Uint160{}) { _ = transfer.Amount.Neg(amount) // We already have the Int. - if appendNEP17Transfer(cache, fromAddr, transfer) != nil { + if appendNEP17Transfer(cache, transCache, fromAddr, transfer) != nil { return } } if !toAddr.Equals(util.Uint160{}) { - _ = transfer.Amount.Set(amount) // We already have the Int. - _ = appendNEP17Transfer(cache, toAddr, transfer) // Nothing useful we can do. + _ = transfer.Amount.Set(amount) // We already have the Int. + _ = appendNEP17Transfer(cache, transCache, toAddr, transfer) // Nothing useful we can do. } } -func appendNEP17Transfer(cache *dao.Cached, addr util.Uint160, transfer *state.NEP17Transfer) error { - balances, err := cache.GetNEP17TransferInfo(addr) +func appendNEP17Transfer(cache dao.DAO, transCache map[util.Uint160]transferData, addr util.Uint160, transfer *state.NEP17Transfer) error { + transferData, ok := transCache[addr] + if !ok { + balances, err := cache.GetNEP17TransferInfo(addr) + if err != nil { + return err + } + if !balances.NewBatch { + trLog, err := cache.GetNEP17TransferLog(addr, balances.NextTransferBatch) + if err != nil { + return err + } + transferData.Log = *trLog + } + transferData.Info = *balances + } + err := transferData.Log.Append(transfer) if err != nil { return err } - balances.LastUpdated[transfer.Asset] = transfer.Block - balances.NewBatch, err = cache.AppendNEP17Transfer(addr, - balances.NextTransferBatch, balances.NewBatch, transfer) - if err != nil { - return err + transferData.Info.LastUpdated[transfer.Asset] = transfer.Block + transferData.Info.NewBatch = transferData.Log.Size() >= state.NEP17TransferBatchSize + if transferData.Info.NewBatch { + err = cache.PutNEP17TransferLog(addr, transferData.Info.NextTransferBatch, &transferData.Log) + if err != nil { + return err + } + transferData.Info.NextTransferBatch++ + transferData.Log = state.NEP17TransferLog{} } - if balances.NewBatch { - balances.NextTransferBatch++ - } - return cache.PutNEP17TransferInfo(addr, balances) + transCache[addr] = transferData + return nil } // ForEachNEP17Transfer executes f for each nep17 transfer in log. diff --git a/pkg/core/dao/cacheddao.go b/pkg/core/dao/cacheddao.go deleted file mode 100644 index f147deb85..000000000 --- a/pkg/core/dao/cacheddao.go +++ /dev/null @@ -1,126 +0,0 @@ -package dao - -import ( - "errors" - - "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/io" - "github.com/nspcc-dev/neo-go/pkg/util" -) - -// Cached is a data access object that mimics DAO, but has a write cache -// for accounts and NEP17 transfer data. These are the most frequently used -// objects in the storeBlock(). -type Cached struct { - DAO - balances map[util.Uint160]*state.NEP17TransferInfo - transfers map[util.Uint160]map[uint32]*state.NEP17TransferLog -} - -// NewCached returns new Cached wrapping around given backing store. -func NewCached(d DAO) *Cached { - balances := make(map[util.Uint160]*state.NEP17TransferInfo) - transfers := make(map[util.Uint160]map[uint32]*state.NEP17TransferLog) - return &Cached{d.GetWrapped(), balances, transfers} -} - -// GetNEP17TransferInfo retrieves NEP17TransferInfo for the acc. -func (cd *Cached) GetNEP17TransferInfo(acc util.Uint160) (*state.NEP17TransferInfo, error) { - if bs := cd.balances[acc]; bs != nil { - return bs, nil - } - return cd.DAO.GetNEP17TransferInfo(acc) -} - -// PutNEP17TransferInfo saves NEP17TransferInfo for the acc. -func (cd *Cached) PutNEP17TransferInfo(acc util.Uint160, bs *state.NEP17TransferInfo) error { - cd.balances[acc] = bs - return nil -} - -// GetNEP17TransferLog retrieves NEP17TransferLog for the acc. -func (cd *Cached) GetNEP17TransferLog(acc util.Uint160, index uint32) (*state.NEP17TransferLog, error) { - ts := cd.transfers[acc] - if ts != nil && ts[index] != nil { - return ts[index], nil - } - return cd.DAO.GetNEP17TransferLog(acc, index) -} - -// PutNEP17TransferLog saves NEP17TransferLog for the acc. -func (cd *Cached) PutNEP17TransferLog(acc util.Uint160, index uint32, bs *state.NEP17TransferLog) error { - ts := cd.transfers[acc] - if ts == nil { - ts = make(map[uint32]*state.NEP17TransferLog, 2) - cd.transfers[acc] = ts - } - ts[index] = bs - return nil -} - -// AppendNEP17Transfer appends new transfer to a transfer event log. -func (cd *Cached) AppendNEP17Transfer(acc util.Uint160, index uint32, isNew bool, tr *state.NEP17Transfer) (bool, error) { - var lg *state.NEP17TransferLog - if isNew { - lg = new(state.NEP17TransferLog) - } else { - var err error - lg, err = cd.GetNEP17TransferLog(acc, index) - if err != nil { - return false, err - } - } - if err := lg.Append(tr); err != nil { - return false, err - } - return lg.Size() >= state.NEP17TransferBatchSize, cd.PutNEP17TransferLog(acc, index, lg) -} - -// Persist flushes all the changes made into the (supposedly) persistent -// underlying store. -func (cd *Cached) Persist() (int, error) { - lowerCache, ok := cd.DAO.(*Cached) - // If the lower DAO is Cached, we only need to flush the MemCached DB. - // This actually breaks DAO interface incapsulation, but for our current - // usage scenario it should be good enough if cd doesn't modify object - // caches (accounts/transfer data) in any way. - if ok { - var simpleCache *Simple - for simpleCache == nil { - simpleCache, ok = lowerCache.DAO.(*Simple) - if !ok { - lowerCache, ok = cd.DAO.(*Cached) - if !ok { - return 0, errors.New("unsupported lower DAO") - } - } - } - return simpleCache.Persist() - } - buf := io.NewBufBinWriter() - - for acc, bs := range cd.balances { - err := cd.DAO.putNEP17TransferInfo(acc, bs, buf) - if err != nil { - return 0, err - } - buf.Reset() - } - for acc, ts := range cd.transfers { - for ind, lg := range ts { - err := cd.DAO.PutNEP17TransferLog(acc, ind, lg) - if err != nil { - return 0, err - } - } - } - return cd.DAO.Persist() -} - -// GetWrapped implements DAO interface. -func (cd *Cached) GetWrapped() DAO { - return &Cached{cd.DAO.GetWrapped(), - cd.balances, - cd.transfers, - } -} diff --git a/pkg/core/dao/cacheddao_test.go b/pkg/core/dao/cacheddao_test.go deleted file mode 100644 index d1c99bd71..000000000 --- a/pkg/core/dao/cacheddao_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package dao - -import ( - "testing" - - "github.com/nspcc-dev/neo-go/internal/random" - "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/core/storage" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestCachedCachedDao(t *testing.T) { - store := storage.NewMemoryStore() - // Persistent DAO to check for backing storage. - pdao := NewSimple(store, false) - assert.NotEqual(t, store, pdao.Store) - // Cached DAO. - cdao := NewCached(pdao) - cdaoDao := cdao.DAO.(*Simple) - assert.NotEqual(t, store, cdaoDao.Store) - assert.NotEqual(t, pdao.Store, cdaoDao.Store) - - // Cached cached DAO. - ccdao := NewCached(cdao) - ccdaoDao := ccdao.DAO.(*Cached) - intDao := ccdaoDao.DAO.(*Simple) - assert.NotEqual(t, store, intDao.Store) - assert.NotEqual(t, pdao.Store, intDao.Store) - assert.NotEqual(t, cdaoDao.Store, intDao.Store) - - id := int32(random.Int(0, 1024)) - key := []byte("qwerty") - si := state.StorageItem("poiuyt") - require.NoError(t, ccdao.PutStorageItem(id, key, si)) - resi := ccdao.GetStorageItem(id, key) - assert.Equal(t, si, resi) - - resi = cdao.GetStorageItem(id, key) - assert.Equal(t, state.StorageItem(nil), resi) - resi = pdao.GetStorageItem(id, key) - assert.Equal(t, state.StorageItem(nil), resi) - - cnt, err := ccdao.Persist() - assert.NoError(t, err) - assert.Equal(t, 1, cnt) - resi = cdao.GetStorageItem(id, key) - assert.Equal(t, si, resi) - resi = pdao.GetStorageItem(id, key) - assert.Equal(t, state.StorageItem(nil), resi) - - cnt, err = cdao.Persist() - assert.NoError(t, err) - assert.Equal(t, 1, cnt) - resi = pdao.GetStorageItem(id, key) - assert.Equal(t, si, resi) -}