core: store NEP5Transfers in batches

This is an append-only log which is read only during some RPCs.
It is rather slow to get it from base every time we need to append to
it. This commit stores all NEP5Transfers in batches, so that
only a last batch needs to be unmarshaled during block processing.
This commit is contained in:
Evgenii Stratonikov 2020-03-12 12:43:21 +03:00
parent 3c6d9653b0
commit 32401a567e
3 changed files with 51 additions and 20 deletions

View file

@ -29,7 +29,7 @@ import (
// Tuning parameters. // Tuning parameters.
const ( const (
headerBatchCount = 2000 headerBatchCount = 2000
version = "0.0.7" version = "0.0.8"
// This one comes from C# code and it's different from the constant used // This one comes from C# code and it's different from the constant used
// when creating an asset with Neo.Asset.Create interop call. It looks // when creating an asset with Neo.Asset.Create interop call. It looks
@ -771,12 +771,16 @@ func (bc *Blockchain) processNEP5Transfer(cache *cachedDao, tx *transaction.Tran
bs.Balance -= amount bs.Balance -= amount
bs.LastUpdatedBlock = b.Index bs.LastUpdatedBlock = b.Index
balances.Trackers[sc] = bs balances.Trackers[sc] = bs
if err := cache.PutNEP5Balances(fromAddr, balances); err != nil {
return
}
transfer.Amount = -amount transfer.Amount = -amount
if err := cache.AppendNEP5Transfer(fromAddr, transfer); err != nil { isBig, err := cache.AppendNEP5Transfer(fromAddr, balances.NextTransferBatch, transfer)
if err != nil {
return
}
if isBig {
balances.NextTransferBatch++
}
if err := cache.PutNEP5Balances(fromAddr, balances); err != nil {
return return
} }
} }
@ -789,12 +793,16 @@ func (bc *Blockchain) processNEP5Transfer(cache *cachedDao, tx *transaction.Tran
bs.Balance += amount bs.Balance += amount
bs.LastUpdatedBlock = b.Index bs.LastUpdatedBlock = b.Index
balances.Trackers[sc] = bs balances.Trackers[sc] = bs
if err := cache.PutNEP5Balances(toAddr, balances); err != nil {
return
}
transfer.Amount = amount transfer.Amount = amount
if err := cache.AppendNEP5Transfer(toAddr, transfer); err != nil { isBig, err := cache.AppendNEP5Transfer(toAddr, balances.NextTransferBatch, transfer)
if err != nil {
return
}
if isBig {
balances.NextTransferBatch++
}
if err := cache.PutNEP5Balances(toAddr, balances); err != nil {
return return
} }
} }
@ -802,11 +810,19 @@ func (bc *Blockchain) processNEP5Transfer(cache *cachedDao, tx *transaction.Tran
// GetNEP5TransferLog returns NEP5 transfer log for the acc. // GetNEP5TransferLog returns NEP5 transfer log for the acc.
func (bc *Blockchain) GetNEP5TransferLog(acc util.Uint160) *state.NEP5TransferLog { func (bc *Blockchain) GetNEP5TransferLog(acc util.Uint160) *state.NEP5TransferLog {
lg, err := bc.dao.GetNEP5TransferLog(acc) balances, err := bc.dao.GetNEP5Balances(acc)
if err != nil { if err != nil {
return nil return nil
} }
return lg result := new(state.NEP5TransferLog)
for i := uint32(0); i <= balances.NextTransferBatch; i++ {
lg, err := bc.dao.GetNEP5TransferLog(acc, i)
if err != nil {
return nil
}
result.Raw = append(result.Raw, lg.Raw...)
}
return result
} }
// GetNEP5Balances returns NEP5 balances for the acc. // GetNEP5Balances returns NEP5 balances for the acc.

View file

@ -158,9 +158,19 @@ func (dao *dao) PutNEP5Balances(acc util.Uint160, bs *state.NEP5Balances) error
// -- start transfer log. // -- start transfer log.
const nep5TransferBatchSize = 128
func getNEP5TransferLogKey(acc util.Uint160, index uint32) []byte {
key := make([]byte, 1+util.Uint160Size+4)
key[0] = byte(storage.STNEP5Transfers)
copy(key[1:], acc.BytesBE())
binary.LittleEndian.PutUint32(key[util.Uint160Size:], index)
return key
}
// GetNEP5TransferLog retrieves transfer log from the cache. // GetNEP5TransferLog retrieves transfer log from the cache.
func (dao *dao) GetNEP5TransferLog(acc util.Uint160) (*state.NEP5TransferLog, error) { func (dao *dao) GetNEP5TransferLog(acc util.Uint160, index uint32) (*state.NEP5TransferLog, error) {
key := storage.AppendPrefix(storage.STNEP5Transfers, acc.BytesBE()) key := getNEP5TransferLogKey(acc, index)
value, err := dao.store.Get(key) value, err := dao.store.Get(key)
if err != nil { if err != nil {
if err == storage.ErrKeyNotFound { if err == storage.ErrKeyNotFound {
@ -172,24 +182,25 @@ func (dao *dao) GetNEP5TransferLog(acc util.Uint160) (*state.NEP5TransferLog, er
} }
// PutNEP5TransferLog saves given transfer log in the cache. // PutNEP5TransferLog saves given transfer log in the cache.
func (dao *dao) PutNEP5TransferLog(acc util.Uint160, lg *state.NEP5TransferLog) error { func (dao *dao) PutNEP5TransferLog(acc util.Uint160, index uint32, lg *state.NEP5TransferLog) error {
key := storage.AppendPrefix(storage.STNEP5Transfers, acc.BytesBE()) key := getNEP5TransferLogKey(acc, index)
return dao.store.Put(key, lg.Raw) return dao.store.Put(key, lg.Raw)
} }
// AppendNEP5Transfer appends a single NEP5 transfer to a log. // AppendNEP5Transfer appends a single NEP5 transfer to a log.
func (dao *dao) AppendNEP5Transfer(acc util.Uint160, tr *state.NEP5Transfer) error { // First return value signalizes that log size has exceeded batch size.
lg, err := dao.GetNEP5TransferLog(acc) func (dao *dao) AppendNEP5Transfer(acc util.Uint160, index uint32, tr *state.NEP5Transfer) (bool, error) {
lg, err := dao.GetNEP5TransferLog(acc, index)
if err != nil { if err != nil {
if err != storage.ErrKeyNotFound { if err != storage.ErrKeyNotFound {
return err return false, err
} }
lg = new(state.NEP5TransferLog) lg = new(state.NEP5TransferLog)
} }
if err := lg.Append(tr); err != nil { if err := lg.Append(tr); err != nil {
return err return false, err
} }
return dao.PutNEP5TransferLog(acc, lg) return lg.Size() >= nep5TransferBatchSize, dao.PutNEP5TransferLog(acc, index, lg)
} }
// -- end transfer log. // -- end transfer log.

View file

@ -45,6 +45,8 @@ type NEP5Transfer struct {
// to the corresponding structures. // to the corresponding structures.
type NEP5Balances struct { type NEP5Balances struct {
Trackers map[util.Uint160]NEP5Tracker Trackers map[util.Uint160]NEP5Tracker
// NextTransferBatch stores an index of the next transfer batch.
NextTransferBatch uint32
} }
// NewNEP5Balances returns new NEP5Balances. // NewNEP5Balances returns new NEP5Balances.
@ -56,6 +58,7 @@ func NewNEP5Balances() *NEP5Balances {
// DecodeBinary implements io.Serializable interface. // DecodeBinary implements io.Serializable interface.
func (bs *NEP5Balances) DecodeBinary(r *io.BinReader) { func (bs *NEP5Balances) DecodeBinary(r *io.BinReader) {
bs.NextTransferBatch = r.ReadU32LE()
lenBalances := r.ReadVarUint() lenBalances := r.ReadVarUint()
m := make(map[util.Uint160]NEP5Tracker, lenBalances) m := make(map[util.Uint160]NEP5Tracker, lenBalances)
for i := 0; i < int(lenBalances); i++ { for i := 0; i < int(lenBalances); i++ {
@ -70,6 +73,7 @@ func (bs *NEP5Balances) DecodeBinary(r *io.BinReader) {
// EncodeBinary implements io.Serializable interface. // EncodeBinary implements io.Serializable interface.
func (bs *NEP5Balances) EncodeBinary(w *io.BinWriter) { func (bs *NEP5Balances) EncodeBinary(w *io.BinWriter) {
w.WriteU32LE(bs.NextTransferBatch)
w.WriteVarUint(uint64(len(bs.Trackers))) w.WriteVarUint(uint64(len(bs.Trackers)))
for k, v := range bs.Trackers { for k, v := range bs.Trackers {
w.WriteBytes(k[:]) w.WriteBytes(k[:])