core,dao: implement Block-level storage caching
The order in which storage.Find items are returns depends on what items were processed in previous transactions of the same block. The easiest way to implement this sort of caching is to cache operations with storage, flushing the only in `Persist()`.
This commit is contained in:
parent
f0abbfd399
commit
b96fe8173c
6 changed files with 246 additions and 20 deletions
|
@ -1147,7 +1147,16 @@ func (bc *Blockchain) GetStorageItem(scripthash util.Uint160, key []byte) *state
|
|||
|
||||
// GetStorageItems returns all storage items for a given scripthash.
|
||||
func (bc *Blockchain) GetStorageItems(hash util.Uint160) (map[string]*state.StorageItem, error) {
|
||||
return bc.dao.GetStorageItems(hash)
|
||||
siMap, err := bc.dao.GetStorageItems(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := make(map[string]*state.StorageItem)
|
||||
for i := range siMap {
|
||||
val := siMap[i].StorageItem
|
||||
m[string(siMap[i].Key)] = &val
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// GetBlock returns a Block by the given hash.
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package dao
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"sort"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
|
@ -18,6 +20,7 @@ type Cached struct {
|
|||
unspents map[util.Uint256]*state.UnspentCoin
|
||||
balances map[util.Uint160]*state.NEP5Balances
|
||||
transfers map[util.Uint160]map[uint32]*state.NEP5TransferLog
|
||||
storage *itemCache
|
||||
}
|
||||
|
||||
// NewCached returns new Cached wrapping around given backing store.
|
||||
|
@ -27,7 +30,16 @@ func NewCached(d DAO) *Cached {
|
|||
unspents := make(map[util.Uint256]*state.UnspentCoin)
|
||||
balances := make(map[util.Uint160]*state.NEP5Balances)
|
||||
transfers := make(map[util.Uint160]map[uint32]*state.NEP5TransferLog)
|
||||
return &Cached{d.GetWrapped(), accs, ctrs, unspents, balances, transfers}
|
||||
st := newItemCache()
|
||||
dao := d.GetWrapped()
|
||||
if cd, ok := dao.(*Cached); ok {
|
||||
for h, m := range cd.storage.st {
|
||||
for _, k := range cd.storage.keys[h] {
|
||||
st.put(h, []byte(k), m[k].State, copyItem(&m[k].StorageItem))
|
||||
}
|
||||
}
|
||||
}
|
||||
return &Cached{dao, accs, ctrs, unspents, balances, transfers, st}
|
||||
}
|
||||
|
||||
// GetAccountStateOrNew retrieves Account from cache or underlying store
|
||||
|
@ -140,6 +152,10 @@ func (cd *Cached) AppendNEP5Transfer(acc util.Uint160, index uint32, tr *state.N
|
|||
// Persist flushes all the changes made into the (supposedly) persistent
|
||||
// underlying store.
|
||||
func (cd *Cached) Persist() (int, error) {
|
||||
if err := cd.FlushStorage(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
lowerCache, ok := cd.DAO.(*Cached)
|
||||
// If the lower DAO is Cached, we only need to flush the MemCached DB.
|
||||
// This actually breaks DAO interface incapsulation, but for our current
|
||||
|
@ -148,6 +164,9 @@ func (cd *Cached) Persist() (int, error) {
|
|||
if ok {
|
||||
var simpleCache *Simple
|
||||
for simpleCache == nil {
|
||||
if err := lowerCache.FlushStorage(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
simpleCache, ok = lowerCache.DAO.(*Simple)
|
||||
if !ok {
|
||||
lowerCache, ok = cd.DAO.(*Cached)
|
||||
|
@ -200,5 +219,137 @@ func (cd *Cached) GetWrapped() DAO {
|
|||
cd.unspents,
|
||||
cd.balances,
|
||||
cd.transfers,
|
||||
cd.storage,
|
||||
}
|
||||
}
|
||||
|
||||
// FlushStorage flushes storage changes to the underlying DAO.
|
||||
func (cd *Cached) FlushStorage() error {
|
||||
if d, ok := cd.DAO.(*Cached); ok {
|
||||
d.storage.st = cd.storage.st
|
||||
d.storage.keys = cd.storage.keys
|
||||
return nil
|
||||
}
|
||||
for h, items := range cd.storage.st {
|
||||
for _, k := range cd.storage.keys[h] {
|
||||
ti := items[k]
|
||||
switch ti.State {
|
||||
case putOp, addOp:
|
||||
err := cd.DAO.PutStorageItem(h, []byte(k), &ti.StorageItem)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case delOp:
|
||||
err := cd.DAO.DeleteStorageItem(h, []byte(k))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func copyItem(si *state.StorageItem) *state.StorageItem {
|
||||
val := make([]byte, len(si.Value))
|
||||
copy(val, si.Value)
|
||||
return &state.StorageItem{
|
||||
Value: val,
|
||||
IsConst: si.IsConst,
|
||||
}
|
||||
}
|
||||
|
||||
// GetStorageItem returns StorageItem if it exists in the given store.
|
||||
func (cd *Cached) GetStorageItem(scripthash util.Uint160, key []byte) *state.StorageItem {
|
||||
ti := cd.storage.getItem(scripthash, key)
|
||||
if ti != nil {
|
||||
if ti.State == delOp {
|
||||
return nil
|
||||
}
|
||||
return copyItem(&ti.StorageItem)
|
||||
}
|
||||
|
||||
si := cd.DAO.GetStorageItem(scripthash, key)
|
||||
if si != nil {
|
||||
cd.storage.put(scripthash, key, getOp, si)
|
||||
return copyItem(si)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutStorageItem puts given StorageItem for given script with given
|
||||
// key into the given store.
|
||||
func (cd *Cached) PutStorageItem(scripthash util.Uint160, key []byte, si *state.StorageItem) error {
|
||||
item := copyItem(si)
|
||||
ti := cd.storage.getItem(scripthash, key)
|
||||
if ti != nil {
|
||||
if ti.State == delOp || ti.State == getOp {
|
||||
ti.State = putOp
|
||||
}
|
||||
ti.StorageItem = *item
|
||||
return nil
|
||||
}
|
||||
|
||||
op := addOp
|
||||
if it := cd.DAO.GetStorageItem(scripthash, key); it != nil {
|
||||
op = putOp
|
||||
}
|
||||
cd.storage.put(scripthash, key, op, item)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteStorageItem drops storage item for the given script with the
|
||||
// given key from the store.
|
||||
func (cd *Cached) DeleteStorageItem(scripthash util.Uint160, key []byte) error {
|
||||
ti := cd.storage.getItem(scripthash, key)
|
||||
if ti != nil {
|
||||
ti.State = delOp
|
||||
ti.Value = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
it := cd.DAO.GetStorageItem(scripthash, key)
|
||||
if it != nil {
|
||||
cd.storage.put(scripthash, key, delOp, it)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetStorageItems returns all storage items for a given scripthash.
|
||||
func (cd *Cached) GetStorageItems(hash util.Uint160) ([]StorageItemWithKey, error) {
|
||||
items, err := cd.DAO.GetStorageItems(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cache := cd.storage.getItems(hash)
|
||||
if len(cache) == 0 {
|
||||
return items, nil
|
||||
}
|
||||
|
||||
result := make([]StorageItemWithKey, 0, len(items))
|
||||
for i := range items {
|
||||
_, ok := cache[string(items[i].Key)]
|
||||
if !ok {
|
||||
result = append(result, items[i])
|
||||
}
|
||||
}
|
||||
sort.Slice(result, func(i, j int) bool { return bytes.Compare(result[i].Key, result[j].Key) == -1 })
|
||||
|
||||
for _, k := range cd.storage.keys[hash] {
|
||||
v := cache[k]
|
||||
if v.State != delOp {
|
||||
val := make([]byte, len(v.StorageItem.Value))
|
||||
copy(val, v.StorageItem.Value)
|
||||
result = append(result, StorageItemWithKey{
|
||||
StorageItem: state.StorageItem{
|
||||
Value: val,
|
||||
IsConst: v.StorageItem.IsConst,
|
||||
},
|
||||
Key: []byte(k),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ type DAO interface {
|
|||
GetNEP5Balances(acc util.Uint160) (*state.NEP5Balances, error)
|
||||
GetNEP5TransferLog(acc util.Uint160, index uint32) (*state.NEP5TransferLog, error)
|
||||
GetStorageItem(scripthash util.Uint160, key []byte) *state.StorageItem
|
||||
GetStorageItems(hash util.Uint160) (map[string]*state.StorageItem, error)
|
||||
GetStorageItems(hash util.Uint160) ([]StorageItemWithKey, error)
|
||||
GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error)
|
||||
GetUnspentCoinState(hash util.Uint256) (*state.UnspentCoin, error)
|
||||
GetValidatorState(publicKey *keys.PublicKey) (*state.Validator, error)
|
||||
|
@ -435,9 +435,15 @@ func (dao *Simple) DeleteStorageItem(scripthash util.Uint160, key []byte) error
|
|||
return dao.Store.Delete(makeStorageItemKey(scripthash, key))
|
||||
}
|
||||
|
||||
// StorageItemWithKey is a Key-Value pair together with possible const modifier.
|
||||
type StorageItemWithKey struct {
|
||||
state.StorageItem
|
||||
Key []byte
|
||||
}
|
||||
|
||||
// GetStorageItems returns all storage items for a given scripthash.
|
||||
func (dao *Simple) GetStorageItems(hash util.Uint160) (map[string]*state.StorageItem, error) {
|
||||
var siMap = make(map[string]*state.StorageItem)
|
||||
func (dao *Simple) GetStorageItems(hash util.Uint160) ([]StorageItemWithKey, error) {
|
||||
var res []StorageItemWithKey
|
||||
var err error
|
||||
|
||||
saveToMap := func(k, v []byte) {
|
||||
|
@ -445,21 +451,22 @@ func (dao *Simple) GetStorageItems(hash util.Uint160) (map[string]*state.Storage
|
|||
return
|
||||
}
|
||||
r := io.NewBinReaderFromBuf(v)
|
||||
si := &state.StorageItem{}
|
||||
si.DecodeBinary(r)
|
||||
var s StorageItemWithKey
|
||||
s.StorageItem.DecodeBinary(r)
|
||||
if r.Err != nil {
|
||||
err = r.Err
|
||||
return
|
||||
}
|
||||
|
||||
// Cut prefix and hash.
|
||||
siMap[string(k[21:])] = si
|
||||
s.Key = k[21:]
|
||||
res = append(res, s)
|
||||
}
|
||||
dao.Store.Seek(storage.AppendPrefix(storage.STStorage, hash.BytesLE()), saveToMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return siMap, nil
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// makeStorageItemKey returns a key used to store StorageItem in the DB.
|
||||
|
|
57
pkg/core/dao/storage_item.go
Normal file
57
pkg/core/dao/storage_item.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
package dao
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
)
|
||||
|
||||
type (
|
||||
itemState int
|
||||
|
||||
trackedItem struct {
|
||||
state.StorageItem
|
||||
State itemState
|
||||
}
|
||||
|
||||
itemCache struct {
|
||||
st map[util.Uint160]map[string]*trackedItem
|
||||
keys map[util.Uint160][]string
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
getOp itemState = 1 << iota
|
||||
delOp
|
||||
addOp
|
||||
putOp
|
||||
)
|
||||
|
||||
func newItemCache() *itemCache {
|
||||
return &itemCache{
|
||||
make(map[util.Uint160]map[string]*trackedItem),
|
||||
make(map[util.Uint160][]string),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *itemCache) put(h util.Uint160, key []byte, op itemState, item *state.StorageItem) {
|
||||
m := c.getItems(h)
|
||||
m[string(key)] = &trackedItem{
|
||||
StorageItem: *item,
|
||||
State: op,
|
||||
}
|
||||
c.keys[h] = append(c.keys[h], string(key))
|
||||
c.st[h] = m
|
||||
}
|
||||
|
||||
func (c *itemCache) getItem(h util.Uint160, key []byte) *trackedItem {
|
||||
m := c.getItems(h)
|
||||
return m[string(key)]
|
||||
}
|
||||
|
||||
func (c *itemCache) getItems(h util.Uint160) map[string]*trackedItem {
|
||||
m, ok := c.st[h]
|
||||
if !ok {
|
||||
return make(map[string]*trackedItem)
|
||||
}
|
||||
return m
|
||||
}
|
|
@ -1,10 +1,10 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||
|
@ -444,17 +444,18 @@ func (ic *interopContext) storageFind(v *vm.VM) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
prefix := string(v.Estack().Pop().Bytes())
|
||||
pref := v.Estack().Pop().Bytes()
|
||||
siMap, err := ic.dao.GetStorageItems(stc.ScriptHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
filteredMap := vm.NewMapItem()
|
||||
for k, v := range siMap {
|
||||
if strings.HasPrefix(k, prefix) {
|
||||
filteredMap.Add(vm.NewByteArrayItem([]byte(k)),
|
||||
vm.NewByteArrayItem(v.Value))
|
||||
for i := range siMap {
|
||||
k := siMap[i].Key
|
||||
if bytes.HasPrefix(k, pref) {
|
||||
filteredMap.Add(vm.NewByteArrayItem(siMap[i].Key),
|
||||
vm.NewByteArrayItem(siMap[i].Value))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -578,9 +579,10 @@ func (ic *interopContext) contractMigrate(v *vm.VM) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for k, v := range siMap {
|
||||
v.IsConst = false
|
||||
err = ic.dao.PutStorageItem(contract.ScriptHash(), []byte(k), v)
|
||||
for i := range siMap {
|
||||
v := siMap[i].StorageItem
|
||||
siMap[i].IsConst = false
|
||||
err = ic.dao.PutStorageItem(contract.ScriptHash(), siMap[i].Key, &v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -561,8 +561,8 @@ func (ic *interopContext) contractDestroy(v *vm.VM) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for k := range siMap {
|
||||
_ = ic.dao.DeleteStorageItem(hash, []byte(k))
|
||||
for i := range siMap {
|
||||
_ = ic.dao.DeleteStorageItem(hash, siMap[i].Key)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue