From 407e348cd5777d3561580f5f5441ef6270423681 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 4 Aug 2020 17:08:28 +0300 Subject: [PATCH] core: save UTXO transfer info --- pkg/core/blockchain.go | 98 +++++++++++++++++++++++++++++++++- pkg/core/dao/cacheddao.go | 79 ++++++++++++++++++++++++++- pkg/core/dao/dao.go | 70 ++++++++++++++++++++++++ pkg/core/state/nep5_test.go | 4 ++ pkg/core/state/transfer_log.go | 51 ++++++++++++++++++ pkg/core/storage/store.go | 1 + 6 files changed, 301 insertions(+), 2 deletions(-) create mode 100644 pkg/core/state/transfer_log.go diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 732ddfcc5..50358a848 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -613,6 +613,9 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { return err } + var pseudoSender util.Uint160 + var gasTotal, neoTotal util.Fixed8 + // Process TX inputs that are grouped by previous hash. for _, inputs := range transaction.GroupInputsByPrevHash(tx.Inputs) { prevHash := inputs[0].PrevHash @@ -620,7 +623,7 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { if err != nil { return err } - for _, input := range inputs { + for i, input := range inputs { if len(unspent.States) <= int(input.PrevIndex) { return fmt.Errorf("bad input: %s/%d", input.PrevHash.StringLE(), input.PrevIndex) } @@ -630,6 +633,14 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { unspent.States[input.PrevIndex].State |= state.CoinSpent unspent.States[input.PrevIndex].SpendHeight = block.Index prevTXOutput := &unspent.States[input.PrevIndex].Output + if i == 0 { + pseudoSender = prevTXOutput.ScriptHash + } + if prevTXOutput.AssetID.Equals(GoverningTokenID()) { + neoTotal += prevTXOutput.Amount + } else if prevTXOutput.AssetID.Equals(UtilityTokenID()) { + gasTotal += prevTXOutput.Amount + } account, err := cache.GetAccountStateOrNew(prevTXOutput.ScriptHash) if err != nil { return err @@ -679,6 +690,10 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { } } + if err := bc.processTransfer(cache, pseudoSender, tx, block, neoTotal, gasTotal); err != nil { + return err + } + // Process the underlying type of the TX. switch t := tx.Data.(type) { case *transaction.RegisterTX: @@ -906,6 +921,87 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { return nil } +func appendSingleTransfer(cache *dao.Cached, acc util.Uint160, tr *state.Transfer) error { + index, err := cache.GetNextTransferBatch(acc) + if err != nil { + return err + } + isBig, err := cache.AppendTransfer(acc, index, tr) + if err != nil { + return err + } + if isBig { + if err := cache.PutNextTransferBatch(acc, index+1); err != nil { + return err + } + } + return nil +} + +// processTransfer processes single UTXO transfer. Totals is a slice of neo (0) and gas (1) total transfer amount. +func (bc *Blockchain) processTransfer(cache *dao.Cached, from util.Uint160, tx *transaction.Transaction, b *block.Block, + neoTotal, gasTotal util.Fixed8) error { + + fromIndex, err := cache.GetNextTransferBatch(from) + if err != nil { + return err + } + for i := range tx.Outputs { + isGoverning := tx.Outputs[i].AssetID.Equals(GoverningTokenID()) + if !isGoverning && !tx.Outputs[i].AssetID.Equals(UtilityTokenID()) { + continue + } + if !from.Equals(tx.Outputs[i].ScriptHash) { + tr := &state.Transfer{ + IsGoverning: isGoverning, + From: from, + To: tx.Outputs[i].ScriptHash, + Amount: int64(tx.Outputs[i].Amount), + Block: b.Index, + Timestamp: b.Timestamp, + Tx: tx.Hash(), + } + isBig, err := cache.AppendTransfer(from, fromIndex, tr) + if err != nil { + return err + } else if isBig { + fromIndex++ + } + if err := appendSingleTransfer(cache, tx.Outputs[i].ScriptHash, tr); err != nil { + return err + } + } + if isGoverning { + neoTotal -= tx.Outputs[i].Amount + } else { + gasTotal -= tx.Outputs[i].Amount + } + + } + for i, amount := range []util.Fixed8{neoTotal, gasTotal} { + if amount > 0 { + tr := &state.Transfer{ + IsGoverning: i == 0, + From: from, + Amount: int64(amount), + Block: b.Index, + Timestamp: b.Timestamp, + Tx: tx.Hash(), + } + isBig, err := cache.AppendTransfer(from, fromIndex, tr) + if err != nil { + return err + } else if isBig { + fromIndex++ + } + if err := appendSingleTransfer(cache, util.Uint160{}, tr); err != nil { + return err + } + } + } + return cache.PutNextTransferBatch(from, fromIndex) +} + func parseUint160(addr []byte) util.Uint160 { if u, err := util.Uint160DecodeBytesBE(addr); err == nil { return u diff --git a/pkg/core/dao/cacheddao.go b/pkg/core/dao/cacheddao.go index 6d806c80b..3309aa254 100644 --- a/pkg/core/dao/cacheddao.go +++ b/pkg/core/dao/cacheddao.go @@ -21,6 +21,8 @@ type Cached struct { unspents map[util.Uint256]*state.UnspentCoin balances map[util.Uint160]*state.NEP5Balances nep5transfers map[util.Uint160]map[uint32]*state.TransferLog + transfers map[util.Uint160]map[uint32]*state.TransferLog + nextBatch map[util.Uint160]uint32 storage *itemCache dropNEP5Cache bool @@ -33,6 +35,8 @@ func NewCached(d DAO) *Cached { unspents := make(map[util.Uint256]*state.UnspentCoin) balances := make(map[util.Uint160]*state.NEP5Balances) nep5transfers := make(map[util.Uint160]map[uint32]*state.TransferLog) + transfers := make(map[util.Uint160]map[uint32]*state.TransferLog) + nextBatch := make(map[util.Uint160]uint32) st := newItemCache() dao := d.GetWrapped() if cd, ok := dao.(*Cached); ok { @@ -42,7 +46,18 @@ func NewCached(d DAO) *Cached { } } } - return &Cached{dao, accs, ctrs, unspents, balances, nep5transfers, st, false} + return &Cached{ + DAO: dao, + accounts: accs, + contracts: ctrs, + unspents: unspents, + balances: balances, + nep5transfers: nep5transfers, + transfers: transfers, + nextBatch: nextBatch, + storage: st, + dropNEP5Cache: false, + } } // GetAccountStateOrNew retrieves Account from cache or underlying store @@ -106,6 +121,52 @@ func (cd *Cached) PutUnspentCoinState(hash util.Uint256, ucs *state.UnspentCoin) return nil } +// GetNextTransferBatch returns index for the transfer batch to write to. +func (cd *Cached) GetNextTransferBatch(acc util.Uint160) (uint32, error) { + if n, ok := cd.nextBatch[acc]; ok { + return n, nil + } + return cd.DAO.GetNextTransferBatch(acc) +} + +// PutNextTransferBatch sets index of the transfer batch to write to. +func (cd *Cached) PutNextTransferBatch(acc util.Uint160, num uint32) error { + cd.nextBatch[acc] = num + return nil +} + +// GetTransferLog retrieves TransferLog for the acc. +func (cd *Cached) GetTransferLog(acc util.Uint160, index uint32) (*state.TransferLog, error) { + ts := cd.transfers[acc] + if ts != nil && ts[index] != nil { + return ts[index], nil + } + return cd.DAO.GetTransferLog(acc, index) +} + +// PutTransferLog saves TransferLog for the acc. +func (cd *Cached) PutTransferLog(acc util.Uint160, index uint32, bs *state.TransferLog) error { + ts := cd.transfers[acc] + if ts == nil { + ts = make(map[uint32]*state.TransferLog, 2) + cd.transfers[acc] = ts + } + ts[index] = bs + return nil +} + +// AppendTransfer appends new transfer to a transfer event log. +func (cd *Cached) AppendTransfer(acc util.Uint160, index uint32, tr *state.Transfer) (bool, error) { + lg, err := cd.GetTransferLog(acc, index) + if err != nil { + return false, err + } + if err := lg.Append(tr); err != nil { + return false, err + } + return lg.Size() >= transferBatchSize, cd.PutTransferLog(acc, index, lg) +} + // GetNEP5Balances retrieves NEP5Balances for the acc. func (cd *Cached) GetNEP5Balances(acc util.Uint160) (*state.NEP5Balances, error) { if bs := cd.balances[acc]; bs != nil { @@ -273,6 +334,20 @@ func (cd *Cached) Persist() (int, error) { } } } + for acc, ts := range cd.transfers { + for ind, lg := range ts { + err := cd.DAO.PutTransferLog(acc, ind, lg) + if err != nil { + return 0, err + } + } + } + for acc, nb := range cd.nextBatch { + err := cd.DAO.PutNextTransferBatch(acc, nb) + if err != nil { + return 0, err + } + } return cd.DAO.Persist() } @@ -284,6 +359,8 @@ func (cd *Cached) GetWrapped() DAO { cd.unspents, cd.balances, cd.nep5transfers, + cd.transfers, + cd.nextBatch, cd.storage, false, } diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index dee1995ee..550d873ef 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -19,6 +19,7 @@ import ( // DAO is a data access object. type DAO interface { AppendNEP5Transfer(acc util.Uint160, index uint32, tr *state.NEP5Transfer) (bool, error) + AppendTransfer(acc util.Uint160, index uint32, tr *state.Transfer) (bool, error) DeleteContractState(hash util.Uint160) error DeleteStorageItem(scripthash util.Uint160, key []byte) error DeleteValidatorState(vs *state.Validator) error @@ -37,11 +38,13 @@ type DAO interface { GetNEP5Balances(acc util.Uint160) (*state.NEP5Balances, error) GetNEP5Metadata(h util.Uint160) (*state.NEP5Metadata, error) GetNEP5TransferLog(acc util.Uint160, index uint32) (*state.TransferLog, error) + GetNextTransferBatch(acc util.Uint160) (uint32, error) GetStateRoot(height uint32) (*state.MPTRootState, error) PutStateRoot(root *state.MPTRootState) error GetStorageItem(scripthash util.Uint160, key []byte) *state.StorageItem GetStorageItems(hash util.Uint160, prefix []byte) ([]StorageItemWithKey, error) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) + GetTransferLog(acc util.Uint160, index uint32) (*state.TransferLog, error) GetUnspentCoinState(hash util.Uint256) (*state.UnspentCoin, error) GetValidatorState(publicKey *keys.PublicKey) (*state.Validator, error) GetValidatorStateOrNew(publicKey *keys.PublicKey) (*state.Validator, error) @@ -61,7 +64,9 @@ type DAO interface { PutNEP5Balances(acc util.Uint160, bs *state.NEP5Balances) error PutNEP5Metadata(h util.Uint160, meta *state.NEP5Metadata) error PutNEP5TransferLog(acc util.Uint160, index uint32, lg *state.TransferLog) error + PutNextTransferBatch(acc util.Uint160, num uint32) error PutStorageItem(scripthash util.Uint160, key []byte, si *state.StorageItem) error + PutTransferLog(acc util.Uint160, index uint32, lg *state.TransferLog) error PutUnspentCoinState(hash util.Uint256, ucs *state.UnspentCoin) error PutValidatorState(vs *state.Validator) error PutValidatorsCount(vc *state.ValidatorsCount) error @@ -263,6 +268,15 @@ func (dao *Simple) putNEP5Balances(acc util.Uint160, bs *state.NEP5Balances, buf // -- start transfer log. const nep5TransferBatchSize = 128 * state.NEP5TransferSize +const transferBatchSize = 128 * state.TransferSize + +func getTransferLogKey(acc util.Uint160, index uint32) []byte { + key := make([]byte, 1+util.Uint160Size+4) + key[0] = byte(storage.STTransfers) + copy(key[1:], acc.BytesBE()) + binary.LittleEndian.PutUint32(key[util.Uint160Size:], index) + return key +} func getNEP5TransferLogKey(acc util.Uint160, index uint32) []byte { key := make([]byte, 1+util.Uint160Size+4) @@ -272,6 +286,62 @@ func getNEP5TransferLogKey(acc util.Uint160, index uint32) []byte { return key } +// GetNextTransferBatch returns index for the transfer batch to write to. +func (dao *Simple) GetNextTransferBatch(acc util.Uint160) (uint32, error) { + key := storage.AppendPrefix(storage.STTransfers, acc.BytesBE()) + val, err := dao.Store.Get(key) + if err != nil { + if err != storage.ErrKeyNotFound { + return 0, err + } + return 0, nil + } + return binary.LittleEndian.Uint32(val), nil +} + +// PutNextTransferBatch sets index of the transfer batch to write to. +func (dao *Simple) PutNextTransferBatch(acc util.Uint160, num uint32) error { + key := storage.AppendPrefix(storage.STTransfers, acc.BytesBE()) + val := make([]byte, 4) + binary.LittleEndian.PutUint32(val, num) + return dao.Store.Put(key, val) +} + +// GetTransferLog retrieves transfer log from the cache. +func (dao *Simple) GetTransferLog(acc util.Uint160, index uint32) (*state.TransferLog, error) { + key := getTransferLogKey(acc, index) + value, err := dao.Store.Get(key) + if err != nil { + if err == storage.ErrKeyNotFound { + return new(state.TransferLog), nil + } + return nil, err + } + return &state.TransferLog{Raw: value}, nil +} + +// PutTransferLog saves given transfer log in the cache. +func (dao *Simple) PutTransferLog(acc util.Uint160, index uint32, lg *state.TransferLog) error { + key := getTransferLogKey(acc, index) + return dao.Store.Put(key, lg.Raw) +} + +// AppendTransfer appends a single transfer to a log. +// First return value signalizes that log size has exceeded batch size. +func (dao *Simple) AppendTransfer(acc util.Uint160, index uint32, tr *state.Transfer) (bool, error) { + lg, err := dao.GetTransferLog(acc, index) + if err != nil { + if err != storage.ErrKeyNotFound { + return false, err + } + lg = new(state.TransferLog) + } + if err := lg.Append(tr); err != nil { + return false, err + } + return lg.Size() >= transferBatchSize, dao.PutTransferLog(acc, index, lg) +} + // GetNEP5TransferLog retrieves transfer log from the cache. func (dao *Simple) GetNEP5TransferLog(acc util.Uint160, index uint32) (*state.TransferLog, error) { key := getNEP5TransferLogKey(acc, index) diff --git a/pkg/core/state/nep5_test.go b/pkg/core/state/nep5_test.go index 92ae689ea..90fb01e16 100644 --- a/pkg/core/state/nep5_test.go +++ b/pkg/core/state/nep5_test.go @@ -78,3 +78,7 @@ func randomTransfer(r *rand.Rand) *NEP5Transfer { Tx: random.Uint256(), } } + +func TestTransfer_Size(t *testing.T) { + require.Equal(t, TransferSize, io.GetVarSize(new(Transfer))) +} diff --git a/pkg/core/state/transfer_log.go b/pkg/core/state/transfer_log.go new file mode 100644 index 000000000..c89d638c2 --- /dev/null +++ b/pkg/core/state/transfer_log.go @@ -0,0 +1,51 @@ +package state + +import ( + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/util" +) + +// TransferSize is a size of a marshaled Transfer struct in bytes. +const TransferSize = 1 + util.Uint160Size*2 + 8 + 4 + 4 + util.Uint256Size + +// Transfer represents a single Transfer event. +type Transfer struct { + // IsGoverning is true iff transfer is for neo token. + IsGoverning bool + // Address is the address of the sender. + From util.Uint160 + // To is the address of the receiver. + To util.Uint160 + // Amount is the amount of tokens transferred. + // It is negative when tokens are sent and positive if they are received. + Amount int64 + // Block is a number of block when the event occured. + Block uint32 + // Timestamp is the timestamp of the block where transfer occured. + Timestamp uint32 + // Tx is a hash the transaction. + Tx util.Uint256 +} + +// EncodeBinary implements io.Serializable interface. +// Note: change TransferSize constant when changing this function. +func (t *Transfer) EncodeBinary(w *io.BinWriter) { + w.WriteBytes(t.Tx[:]) + w.WriteBytes(t.From[:]) + w.WriteBytes(t.To[:]) + w.WriteU32LE(t.Block) + w.WriteU32LE(t.Timestamp) + w.WriteU64LE(uint64(t.Amount)) + w.WriteBool(t.IsGoverning) +} + +// DecodeBinary implements io.Serializable interface. +func (t *Transfer) DecodeBinary(r *io.BinReader) { + r.ReadBytes(t.Tx[:]) + r.ReadBytes(t.From[:]) + r.ReadBytes(t.To[:]) + t.Block = r.ReadU32LE() + t.Timestamp = r.ReadU32LE() + t.Amount = int64(r.ReadU64LE()) + t.IsGoverning = r.ReadBool() +} diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 4f8324f14..ff09321c8 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -13,6 +13,7 @@ const ( STAccount KeyPrefix = 0x40 STCoin KeyPrefix = 0x44 STSpentCoin KeyPrefix = 0x45 + STTransfers KeyPrefix = 0x47 STValidator KeyPrefix = 0x48 STAsset KeyPrefix = 0x4c STNotification KeyPrefix = 0x4d