From 32401a567e6a7cfec88aa3eab392010790b24373 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 12 Mar 2020 12:43:21 +0300 Subject: [PATCH] core: store NEP5Transfers in batches This is an append-only log which is read only during some RPCs. It is rather slow to get it from base every time we need to append to it. This commit stores all NEP5Transfers in batches, so that only a last batch needs to be unmarshaled during block processing. --- pkg/core/blockchain.go | 38 +++++++++++++++++++++++++++----------- pkg/core/dao.go | 29 ++++++++++++++++++++--------- pkg/core/state/nep5.go | 4 ++++ 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 13c4f2ca8..4a3a95aca 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -29,7 +29,7 @@ import ( // Tuning parameters. const ( headerBatchCount = 2000 - version = "0.0.7" + version = "0.0.8" // This one comes from C# code and it's different from the constant used // when creating an asset with Neo.Asset.Create interop call. It looks @@ -771,12 +771,16 @@ func (bc *Blockchain) processNEP5Transfer(cache *cachedDao, tx *transaction.Tran bs.Balance -= amount bs.LastUpdatedBlock = b.Index balances.Trackers[sc] = bs - if err := cache.PutNEP5Balances(fromAddr, balances); err != nil { - return - } transfer.Amount = -amount - if err := cache.AppendNEP5Transfer(fromAddr, transfer); err != nil { + isBig, err := cache.AppendNEP5Transfer(fromAddr, balances.NextTransferBatch, transfer) + if err != nil { + return + } + if isBig { + balances.NextTransferBatch++ + } + if err := cache.PutNEP5Balances(fromAddr, balances); err != nil { return } } @@ -789,12 +793,16 @@ func (bc *Blockchain) processNEP5Transfer(cache *cachedDao, tx *transaction.Tran bs.Balance += amount bs.LastUpdatedBlock = b.Index balances.Trackers[sc] = bs - if err := cache.PutNEP5Balances(toAddr, balances); err != nil { - return - } transfer.Amount = amount - if err := cache.AppendNEP5Transfer(toAddr, transfer); err != nil { + isBig, err := cache.AppendNEP5Transfer(toAddr, balances.NextTransferBatch, transfer) + if err != nil { + return + } + if isBig { + balances.NextTransferBatch++ + } + if err := cache.PutNEP5Balances(toAddr, balances); err != nil { return } } @@ -802,11 +810,19 @@ func (bc *Blockchain) processNEP5Transfer(cache *cachedDao, tx *transaction.Tran // GetNEP5TransferLog returns NEP5 transfer log for the acc. func (bc *Blockchain) GetNEP5TransferLog(acc util.Uint160) *state.NEP5TransferLog { - lg, err := bc.dao.GetNEP5TransferLog(acc) + balances, err := bc.dao.GetNEP5Balances(acc) if err != nil { return nil } - return lg + result := new(state.NEP5TransferLog) + for i := uint32(0); i <= balances.NextTransferBatch; i++ { + lg, err := bc.dao.GetNEP5TransferLog(acc, i) + if err != nil { + return nil + } + result.Raw = append(result.Raw, lg.Raw...) + } + return result } // GetNEP5Balances returns NEP5 balances for the acc. diff --git a/pkg/core/dao.go b/pkg/core/dao.go index abab95085..6a32b0f3e 100644 --- a/pkg/core/dao.go +++ b/pkg/core/dao.go @@ -158,9 +158,19 @@ func (dao *dao) PutNEP5Balances(acc util.Uint160, bs *state.NEP5Balances) error // -- start transfer log. +const nep5TransferBatchSize = 128 + +func getNEP5TransferLogKey(acc util.Uint160, index uint32) []byte { + key := make([]byte, 1+util.Uint160Size+4) + key[0] = byte(storage.STNEP5Transfers) + copy(key[1:], acc.BytesBE()) + binary.LittleEndian.PutUint32(key[util.Uint160Size:], index) + return key +} + // GetNEP5TransferLog retrieves transfer log from the cache. -func (dao *dao) GetNEP5TransferLog(acc util.Uint160) (*state.NEP5TransferLog, error) { - key := storage.AppendPrefix(storage.STNEP5Transfers, acc.BytesBE()) +func (dao *dao) GetNEP5TransferLog(acc util.Uint160, index uint32) (*state.NEP5TransferLog, error) { + key := getNEP5TransferLogKey(acc, index) value, err := dao.store.Get(key) if err != nil { if err == storage.ErrKeyNotFound { @@ -172,24 +182,25 @@ func (dao *dao) GetNEP5TransferLog(acc util.Uint160) (*state.NEP5TransferLog, er } // PutNEP5TransferLog saves given transfer log in the cache. -func (dao *dao) PutNEP5TransferLog(acc util.Uint160, lg *state.NEP5TransferLog) error { - key := storage.AppendPrefix(storage.STNEP5Transfers, acc.BytesBE()) +func (dao *dao) PutNEP5TransferLog(acc util.Uint160, index uint32, lg *state.NEP5TransferLog) error { + key := getNEP5TransferLogKey(acc, index) return dao.store.Put(key, lg.Raw) } // AppendNEP5Transfer appends a single NEP5 transfer to a log. -func (dao *dao) AppendNEP5Transfer(acc util.Uint160, tr *state.NEP5Transfer) error { - lg, err := dao.GetNEP5TransferLog(acc) +// First return value signalizes that log size has exceeded batch size. +func (dao *dao) AppendNEP5Transfer(acc util.Uint160, index uint32, tr *state.NEP5Transfer) (bool, error) { + lg, err := dao.GetNEP5TransferLog(acc, index) if err != nil { if err != storage.ErrKeyNotFound { - return err + return false, err } lg = new(state.NEP5TransferLog) } if err := lg.Append(tr); err != nil { - return err + return false, err } - return dao.PutNEP5TransferLog(acc, lg) + return lg.Size() >= nep5TransferBatchSize, dao.PutNEP5TransferLog(acc, index, lg) } // -- end transfer log. diff --git a/pkg/core/state/nep5.go b/pkg/core/state/nep5.go index 9ca8a0466..91fe30932 100644 --- a/pkg/core/state/nep5.go +++ b/pkg/core/state/nep5.go @@ -45,6 +45,8 @@ type NEP5Transfer struct { // to the corresponding structures. type NEP5Balances struct { Trackers map[util.Uint160]NEP5Tracker + // NextTransferBatch stores an index of the next transfer batch. + NextTransferBatch uint32 } // NewNEP5Balances returns new NEP5Balances. @@ -56,6 +58,7 @@ func NewNEP5Balances() *NEP5Balances { // DecodeBinary implements io.Serializable interface. func (bs *NEP5Balances) DecodeBinary(r *io.BinReader) { + bs.NextTransferBatch = r.ReadU32LE() lenBalances := r.ReadVarUint() m := make(map[util.Uint160]NEP5Tracker, lenBalances) for i := 0; i < int(lenBalances); i++ { @@ -70,6 +73,7 @@ func (bs *NEP5Balances) DecodeBinary(r *io.BinReader) { // EncodeBinary implements io.Serializable interface. func (bs *NEP5Balances) EncodeBinary(w *io.BinWriter) { + w.WriteU32LE(bs.NextTransferBatch) w.WriteVarUint(uint64(len(bs.Trackers))) for k, v := range bs.Trackers { w.WriteBytes(k[:])