dao: completely drop Cached

It was very useful in 2.0 days, but today it only serves one purpose that
could easily (and more effectively!) be solved in another way.
This commit is contained in:
Roman Khimov 2021-08-11 23:06:17 +03:00
parent 3e60771175
commit 47f0f4c45f
3 changed files with 58 additions and 202 deletions

View file

@ -164,6 +164,12 @@ type bcEvent struct {
appExecResults []*state.AppExecResult appExecResults []*state.AppExecResult
} }
// transferData is used for transfer caching during storeBlock.
type transferData struct {
Info state.NEP17TransferInfo
Log state.NEP17TransferLog
}
// NewBlockchain returns a new blockchain object the will use the // NewBlockchain returns a new blockchain object the will use the
// given Store as its underlying storage. For it to work correctly you need // given Store as its underlying storage. For it to work correctly you need
// to spawn a goroutine for its Run method after this initialization. // to spawn a goroutine for its Run method after this initialization.
@ -748,10 +754,11 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
}() }()
go func() { go func() {
var ( var (
kvcache = dao.NewCached(cache) kvcache = cache.GetWrapped()
writeBuf = io.NewBufBinWriter() writeBuf = io.NewBufBinWriter()
err error err error
appendBlock bool appendBlock bool
transCache = make(map[util.Uint160]transferData)
) )
for aer := range aerchan { for aer := range aerchan {
if aer.Container == block.Hash() && appendBlock { if aer.Container == block.Hash() && appendBlock {
@ -768,7 +775,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
} }
if aer.Execution.VMState == vm.HaltState { if aer.Execution.VMState == vm.HaltState {
for j := range aer.Execution.Events { for j := range aer.Execution.Events {
bc.handleNotification(&aer.Execution.Events[j], kvcache, block, aer.Container) bc.handleNotification(&aer.Execution.Events[j], kvcache, transCache, block, aer.Container)
} }
} }
writeBuf.Reset() writeBuf.Reset()
@ -777,6 +784,19 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
aerdone <- err aerdone <- err
return return
} }
for acc, trData := range transCache {
err = kvcache.PutNEP17TransferInfo(acc, &trData.Info)
if err != nil {
aerdone <- err
return
}
err = kvcache.PutNEP17TransferLog(acc, trData.Info.NextTransferBatch, &trData.Log)
if err != nil {
aerdone <- err
return
}
}
_, err = kvcache.Persist() _, err = kvcache.Persist()
if err != nil { if err != nil {
aerdone <- err aerdone <- err
@ -996,7 +1016,8 @@ func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache dao.DA
}, nil }, nil
} }
func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d *dao.Cached, b *block.Block, h util.Uint256) { func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d dao.DAO,
transCache map[util.Uint160]transferData, b *block.Block, h util.Uint256) {
if note.Name != "Transfer" { if note.Name != "Transfer" {
return return
} }
@ -1033,7 +1054,7 @@ func (bc *Blockchain) handleNotification(note *state.NotificationEvent, d *dao.C
} }
amount = bigint.FromBytes(bs) amount = bigint.FromBytes(bs)
} }
bc.processNEP17Transfer(d, h, b, note.ScriptHash, from, to, amount) bc.processNEP17Transfer(d, transCache, h, b, note.ScriptHash, from, to, amount)
} }
func parseUint160(addr []byte) util.Uint160 { func parseUint160(addr []byte) util.Uint160 {
@ -1043,7 +1064,8 @@ func parseUint160(addr []byte) util.Uint160 {
return util.Uint160{} return util.Uint160{}
} }
func (bc *Blockchain) processNEP17Transfer(cache *dao.Cached, h util.Uint256, b *block.Block, sc util.Uint160, from, to []byte, amount *big.Int) { func (bc *Blockchain) processNEP17Transfer(cache dao.DAO, transCache map[util.Uint160]transferData,
h util.Uint256, b *block.Block, sc util.Uint160, from, to []byte, amount *big.Int) {
toAddr := parseUint160(to) toAddr := parseUint160(to)
fromAddr := parseUint160(from) fromAddr := parseUint160(from)
var id int32 var id int32
@ -1067,31 +1089,48 @@ func (bc *Blockchain) processNEP17Transfer(cache *dao.Cached, h util.Uint256, b
} }
if !fromAddr.Equals(util.Uint160{}) { if !fromAddr.Equals(util.Uint160{}) {
_ = transfer.Amount.Neg(amount) // We already have the Int. _ = transfer.Amount.Neg(amount) // We already have the Int.
if appendNEP17Transfer(cache, fromAddr, transfer) != nil { if appendNEP17Transfer(cache, transCache, fromAddr, transfer) != nil {
return return
} }
} }
if !toAddr.Equals(util.Uint160{}) { if !toAddr.Equals(util.Uint160{}) {
_ = transfer.Amount.Set(amount) // We already have the Int. _ = transfer.Amount.Set(amount) // We already have the Int.
_ = appendNEP17Transfer(cache, toAddr, transfer) // Nothing useful we can do. _ = appendNEP17Transfer(cache, transCache, toAddr, transfer) // Nothing useful we can do.
} }
} }
func appendNEP17Transfer(cache *dao.Cached, addr util.Uint160, transfer *state.NEP17Transfer) error { func appendNEP17Transfer(cache dao.DAO, transCache map[util.Uint160]transferData, addr util.Uint160, transfer *state.NEP17Transfer) error {
balances, err := cache.GetNEP17TransferInfo(addr) transferData, ok := transCache[addr]
if !ok {
balances, err := cache.GetNEP17TransferInfo(addr)
if err != nil {
return err
}
if !balances.NewBatch {
trLog, err := cache.GetNEP17TransferLog(addr, balances.NextTransferBatch)
if err != nil {
return err
}
transferData.Log = *trLog
}
transferData.Info = *balances
}
err := transferData.Log.Append(transfer)
if err != nil { if err != nil {
return err return err
} }
balances.LastUpdated[transfer.Asset] = transfer.Block transferData.Info.LastUpdated[transfer.Asset] = transfer.Block
balances.NewBatch, err = cache.AppendNEP17Transfer(addr, transferData.Info.NewBatch = transferData.Log.Size() >= state.NEP17TransferBatchSize
balances.NextTransferBatch, balances.NewBatch, transfer) if transferData.Info.NewBatch {
if err != nil { err = cache.PutNEP17TransferLog(addr, transferData.Info.NextTransferBatch, &transferData.Log)
return err if err != nil {
return err
}
transferData.Info.NextTransferBatch++
transferData.Log = state.NEP17TransferLog{}
} }
if balances.NewBatch { transCache[addr] = transferData
balances.NextTransferBatch++ return nil
}
return cache.PutNEP17TransferInfo(addr, balances)
} }
// ForEachNEP17Transfer executes f for each nep17 transfer in log. // ForEachNEP17Transfer executes f for each nep17 transfer in log.

View file

@ -1,126 +0,0 @@
package dao
import (
"errors"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/util"
)
// Cached is a data access object that mimics DAO, but has a write cache
// for accounts and NEP17 transfer data. These are the most frequently used
// objects in the storeBlock().
type Cached struct {
DAO
balances map[util.Uint160]*state.NEP17TransferInfo
transfers map[util.Uint160]map[uint32]*state.NEP17TransferLog
}
// NewCached returns new Cached wrapping around given backing store.
func NewCached(d DAO) *Cached {
balances := make(map[util.Uint160]*state.NEP17TransferInfo)
transfers := make(map[util.Uint160]map[uint32]*state.NEP17TransferLog)
return &Cached{d.GetWrapped(), balances, transfers}
}
// GetNEP17TransferInfo retrieves NEP17TransferInfo for the acc.
func (cd *Cached) GetNEP17TransferInfo(acc util.Uint160) (*state.NEP17TransferInfo, error) {
if bs := cd.balances[acc]; bs != nil {
return bs, nil
}
return cd.DAO.GetNEP17TransferInfo(acc)
}
// PutNEP17TransferInfo saves NEP17TransferInfo for the acc.
func (cd *Cached) PutNEP17TransferInfo(acc util.Uint160, bs *state.NEP17TransferInfo) error {
cd.balances[acc] = bs
return nil
}
// GetNEP17TransferLog retrieves NEP17TransferLog for the acc.
func (cd *Cached) GetNEP17TransferLog(acc util.Uint160, index uint32) (*state.NEP17TransferLog, error) {
ts := cd.transfers[acc]
if ts != nil && ts[index] != nil {
return ts[index], nil
}
return cd.DAO.GetNEP17TransferLog(acc, index)
}
// PutNEP17TransferLog saves NEP17TransferLog for the acc.
func (cd *Cached) PutNEP17TransferLog(acc util.Uint160, index uint32, bs *state.NEP17TransferLog) error {
ts := cd.transfers[acc]
if ts == nil {
ts = make(map[uint32]*state.NEP17TransferLog, 2)
cd.transfers[acc] = ts
}
ts[index] = bs
return nil
}
// AppendNEP17Transfer appends new transfer to a transfer event log.
func (cd *Cached) AppendNEP17Transfer(acc util.Uint160, index uint32, isNew bool, tr *state.NEP17Transfer) (bool, error) {
var lg *state.NEP17TransferLog
if isNew {
lg = new(state.NEP17TransferLog)
} else {
var err error
lg, err = cd.GetNEP17TransferLog(acc, index)
if err != nil {
return false, err
}
}
if err := lg.Append(tr); err != nil {
return false, err
}
return lg.Size() >= state.NEP17TransferBatchSize, cd.PutNEP17TransferLog(acc, index, lg)
}
// Persist flushes all the changes made into the (supposedly) persistent
// underlying store.
func (cd *Cached) Persist() (int, error) {
lowerCache, ok := cd.DAO.(*Cached)
// If the lower DAO is Cached, we only need to flush the MemCached DB.
// This actually breaks DAO interface incapsulation, but for our current
// usage scenario it should be good enough if cd doesn't modify object
// caches (accounts/transfer data) in any way.
if ok {
var simpleCache *Simple
for simpleCache == nil {
simpleCache, ok = lowerCache.DAO.(*Simple)
if !ok {
lowerCache, ok = cd.DAO.(*Cached)
if !ok {
return 0, errors.New("unsupported lower DAO")
}
}
}
return simpleCache.Persist()
}
buf := io.NewBufBinWriter()
for acc, bs := range cd.balances {
err := cd.DAO.putNEP17TransferInfo(acc, bs, buf)
if err != nil {
return 0, err
}
buf.Reset()
}
for acc, ts := range cd.transfers {
for ind, lg := range ts {
err := cd.DAO.PutNEP17TransferLog(acc, ind, lg)
if err != nil {
return 0, err
}
}
}
return cd.DAO.Persist()
}
// GetWrapped implements DAO interface.
func (cd *Cached) GetWrapped() DAO {
return &Cached{cd.DAO.GetWrapped(),
cd.balances,
cd.transfers,
}
}

View file

@ -1,57 +0,0 @@
package dao
import (
"testing"
"github.com/nspcc-dev/neo-go/internal/random"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCachedCachedDao(t *testing.T) {
store := storage.NewMemoryStore()
// Persistent DAO to check for backing storage.
pdao := NewSimple(store, false)
assert.NotEqual(t, store, pdao.Store)
// Cached DAO.
cdao := NewCached(pdao)
cdaoDao := cdao.DAO.(*Simple)
assert.NotEqual(t, store, cdaoDao.Store)
assert.NotEqual(t, pdao.Store, cdaoDao.Store)
// Cached cached DAO.
ccdao := NewCached(cdao)
ccdaoDao := ccdao.DAO.(*Cached)
intDao := ccdaoDao.DAO.(*Simple)
assert.NotEqual(t, store, intDao.Store)
assert.NotEqual(t, pdao.Store, intDao.Store)
assert.NotEqual(t, cdaoDao.Store, intDao.Store)
id := int32(random.Int(0, 1024))
key := []byte("qwerty")
si := state.StorageItem("poiuyt")
require.NoError(t, ccdao.PutStorageItem(id, key, si))
resi := ccdao.GetStorageItem(id, key)
assert.Equal(t, si, resi)
resi = cdao.GetStorageItem(id, key)
assert.Equal(t, state.StorageItem(nil), resi)
resi = pdao.GetStorageItem(id, key)
assert.Equal(t, state.StorageItem(nil), resi)
cnt, err := ccdao.Persist()
assert.NoError(t, err)
assert.Equal(t, 1, cnt)
resi = cdao.GetStorageItem(id, key)
assert.Equal(t, si, resi)
resi = pdao.GetStorageItem(id, key)
assert.Equal(t, state.StorageItem(nil), resi)
cnt, err = cdao.Persist()
assert.NoError(t, err)
assert.Equal(t, 1, cnt)
resi = pdao.GetStorageItem(id, key)
assert.Equal(t, si, resi)
}