From fb9af981792891b949a8fa8e6e1719b374e0eb5a Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 6 Feb 2020 16:11:32 +0300 Subject: [PATCH 1/4] storage: implement GetBatch() to view storage changes GetBatch returns changes to be persisted. --- pkg/core/storage/memcached_store.go | 34 ++++++++++++++++++++++++ pkg/core/storage/memcached_store_test.go | 24 +++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index de13fcada..bc99393eb 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -9,6 +9,20 @@ type MemCachedStore struct { ps Store } +type ( + // KeyValue represents key-value pair. + KeyValue struct { + Key []byte + Value []byte + } + + // MemBatch represents a changeset to be persisted. + MemBatch struct { + Put []KeyValue + Deleted []KeyValue + } +) + // NewMemCachedStore creates a new MemCachedStore object. func NewMemCachedStore(lower Store) *MemCachedStore { return &MemCachedStore{ @@ -31,6 +45,26 @@ func (s *MemCachedStore) Get(key []byte) ([]byte, error) { return s.ps.Get(key) } +// GetBatch returns currently accumulated changeset. +func (s *MemCachedStore) GetBatch() *MemBatch { + s.mut.RLock() + defer s.mut.RUnlock() + + var b MemBatch + + b.Put = make([]KeyValue, 0, len(s.mem)) + for k, v := range s.mem { + b.Put = append(b.Put, KeyValue{Key: []byte(k), Value: v}) + } + + b.Deleted = make([]KeyValue, 0, len(s.del)) + for k := range s.del { + b.Deleted = append(b.Deleted, KeyValue{Key: []byte(k)}) + } + + return &b +} + // Seek implements the Store interface. func (s *MemCachedStore) Seek(key []byte, f func(k, v []byte)) { s.mut.RLock() diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index b99fc6330..b3ac62cb3 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -18,7 +18,9 @@ func TestMemCachedStorePersist(t *testing.T) { assert.Equal(t, 0, c) // persisting one key should result in one key in ps and nothing in ts assert.NoError(t, ts.Put([]byte("key"), []byte("value"))) + checkBatch(t, ts, []KeyValue{{[]byte("key"), []byte("value")}}, nil) c, err = ts.Persist() + checkBatch(t, ts, nil, nil) assert.Equal(t, nil, err) assert.Equal(t, 1, c) v, err := ps.Get([]byte("key")) @@ -35,9 +37,14 @@ func TestMemCachedStorePersist(t *testing.T) { v, err = ps.Get([]byte("key2")) assert.Equal(t, ErrKeyNotFound, err) assert.Equal(t, []byte(nil), v) + checkBatch(t, ts, []KeyValue{ + {[]byte("key"), []byte("newvalue")}, + {[]byte("key2"), []byte("value2")}, + }, nil) // two keys should be persisted (one overwritten and one new) and // available in the ps c, err = ts.Persist() + checkBatch(t, ts, nil, nil) assert.Equal(t, nil, err) assert.Equal(t, 2, c) v, err = ts.MemoryStore.Get([]byte("key")) @@ -52,6 +59,7 @@ func TestMemCachedStorePersist(t *testing.T) { v, err = ps.Get([]byte("key2")) assert.Equal(t, nil, err) assert.Equal(t, []byte("value2"), v) + checkBatch(t, ts, nil, nil) // we've persisted some values, make sure successive persist is a no-op c, err = ts.Persist() assert.Equal(t, nil, err) @@ -59,7 +67,9 @@ func TestMemCachedStorePersist(t *testing.T) { // test persisting deletions err = ts.Delete([]byte("key")) assert.Equal(t, nil, err) + checkBatch(t, ts, nil, []KeyValue{{Key: []byte("key")}}) c, err = ts.Persist() + checkBatch(t, ts, nil, nil) assert.Equal(t, nil, err) assert.Equal(t, 0, c) v, err = ps.Get([]byte("key")) @@ -70,6 +80,20 @@ func TestMemCachedStorePersist(t *testing.T) { assert.Equal(t, []byte("value2"), v) } +func checkBatch(t *testing.T, ts *MemCachedStore, put []KeyValue, del []KeyValue) { + b := ts.GetBatch() + assert.Equal(t, len(put), len(b.Put), "wrong number of put elements in a batch") + assert.Equal(t, len(del), len(b.Deleted), "wrong number of deleted elements in a batch") + + for i := range put { + assert.Contains(t, b.Put, put[i]) + } + + for i := range del { + assert.Contains(t, b.Deleted, del[i]) + } +} + func TestCachedGetFromPersistent(t *testing.T) { key := []byte("key") value := []byte("value") From b1d9e1132da61a00cec5efd4316c011caf48f5b3 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 6 Feb 2020 18:47:03 +0300 Subject: [PATCH 2/4] cli: dump storage changes into JSON files --- cli/server/dump.go | 158 +++++++++++++++++++++++++++++++++++++++++ cli/server/server.go | 20 ++++++ config/config.go | 2 + pkg/core/blockchain.go | 11 +++ 4 files changed, 191 insertions(+) create mode 100644 cli/server/dump.go diff --git a/cli/server/dump.go b/cli/server/dump.go new file mode 100644 index 000000000..5e6ac009a --- /dev/null +++ b/cli/server/dump.go @@ -0,0 +1,158 @@ +package server + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/CityOfZion/neo-go/pkg/core/storage" + "github.com/CityOfZion/neo-go/pkg/util" +) + +type dump []interface{} + +type storageOp struct { + State string `json:"state"` + Key string `json:"key"` + Value string `json:"value,omitempty"` +} + +// NEO has some differences of key storing. +// out format: script hash in LE + key +// neo format: script hash in BE + byte(0) + key with 0 between every 16 bytes, padded to len 16. +func toNeoStorageKey(key []byte) []byte { + if len(key) < util.Uint160Size { + panic("invalid key in storage") + } + + var nkey []byte + for i := util.Uint160Size - 1; i >= 0; i-- { + nkey = append(nkey, key[i]) + } + + key = key[util.Uint160Size:] + + index := 0 + remain := len(key) + for remain >= 16 { + nkey = append(nkey, key[index:index+16]...) + nkey = append(nkey, 0) + index += 16 + remain -= 16 + } + + if remain > 0 { + nkey = append(nkey, key[index:]...) + } + + padding := 16 - remain + for i := 0; i < padding; i++ { + nkey = append(nkey, 0) + } + + nkey = append(nkey, byte(padding)) + + return nkey +} + +// batchToMap converts batch to a map so that JSON is compatible +// with https://github.com/NeoResearch/neo-storage-audit/ +func batchToMap(index uint32, batch *storage.MemBatch) map[string]interface{} { + size := len(batch.Put) + len(batch.Deleted) + ops := make([]storageOp, 0, size) + for i := range batch.Put { + key := batch.Put[i].Key + if len(key) == 0 || key[0] != byte(storage.STStorage) { + continue + } + + key = toNeoStorageKey(key[1:]) + ops = append(ops, storageOp{ + State: "Added", + Key: hex.EncodeToString(key), + Value: "00" + hex.EncodeToString(batch.Put[i].Value), + }) + } + + for i := range batch.Deleted { + key := batch.Deleted[i].Key + if len(key) == 0 || key[0] != byte(storage.STStorage) { + continue + } + + key = toNeoStorageKey(key[1:]) + ops = append(ops, storageOp{ + State: "Deleted", + Key: hex.EncodeToString(key), + }) + } + + return map[string]interface{}{ + "block": index, + "size": len(ops), + "storage": ops, + } +} + +func newDump() *dump { + return new(dump) +} + +func (d *dump) add(index uint32, batch *storage.MemBatch) { + m := batchToMap(index, batch) + *d = append(*d, m) +} + +func (d *dump) tryPersist(prefix string, index uint32) error { + if index%1000 != 0 { + return nil + } + + f, err := createFile(prefix, index) + if err != nil { + return err + } + + defer f.Close() + + enc := json.NewEncoder(f) + enc.SetIndent("", " ") + if err := enc.Encode(*d); err != nil { + return err + } + + *d = (*d)[:0] + + return nil +} + +// createFile creates directory and file in it for storing blocks up to index. +// Directory structure is the following: +// https://github.com/NeoResearch/neo-storage-audit#folder-organization-where-to-find-the-desired-block +// Dir `BlockStorage_$DIRNO` contains blocks up to $DIRNO (from $DIRNO-100k) +// Inside it there are files grouped by 1k blocks. +// File dump-block-$FILENO.json contains blocks from $FILENO-999, $FILENO +// Example: file `BlockStorage_100000/dump-block-6000.json` contains blocks from 5001 to 6000. +func createFile(prefix string, index uint32) (*os.File, error) { + dirN := (index-1)/100000 + 1 + dir := fmt.Sprintf("BlockStorage_%d00000", dirN) + + path := filepath.Join(prefix, dir) + info, err := os.Stat(path) + if os.IsNotExist(err) { + err := os.MkdirAll(path, os.ModePerm) + if err != nil { + return nil, err + } + } else if !info.IsDir() { + return nil, fmt.Errorf("file `%s` is not a directory", path) + } + + fileN := (index-1)/1000 + 1 + file := fmt.Sprintf("dump-block-%d000.json", fileN) + path = filepath.Join(path, file) + + return os.Create(path) +} diff --git a/cli/server/server.go b/cli/server/server.go index 0f1aa80b9..a8af9571a 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -61,6 +61,10 @@ func NewCommands() []cli.Command { Name: "in, i", Usage: "Input file (stdin if not given)", }, + cli.StringFlag{ + Name: "dump", + Usage: "directory for storing JSON dumps", + }, ) return []cli.Command{ { @@ -242,6 +246,11 @@ func restoreDB(ctx *cli.Context) error { defer inStream.Close() reader := io.NewBinReaderFromIO(inStream) + dumpDir := ctx.String("dump") + if dumpDir != "" { + cfg.ProtocolConfiguration.SaveStorageBatch = true + } + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) if err != nil { return err @@ -267,6 +276,9 @@ func restoreDB(ctx *cli.Context) error { return cli.NewExitError(err, 1) } } + + dump := newDump() + for ; i < skip+count; i++ { bytes, err := readBlock(reader) block := &block.Block{} @@ -286,6 +298,14 @@ func restoreDB(ctx *cli.Context) error { if err != nil { return cli.NewExitError(fmt.Errorf("failed to add block %d: %s", i, err), 1) } + + if dumpDir != "" { + batch := chain.LastBatch() + dump.add(block.Index, batch) + if err := dump.tryPersist(dumpDir, block.Index); err != nil { + return cli.NewExitError(fmt.Errorf("can't dump storage to file: %v", err), 1) + } + } } return nil } diff --git a/config/config.go b/config/config.go index aedbcbece..d84ee6e36 100644 --- a/config/config.go +++ b/config/config.go @@ -57,6 +57,8 @@ type ( VerifyTransactions bool `yaml:"VerifyTransactions"` // FreeGasLimit is an amount of GAS which can be spent for free. FreeGasLimit util.Fixed8 `yaml:"FreeGasLimit"` + // SaveStorageBatch enables storage batch saving before every persist. + SaveStorageBatch bool `yaml:"SaveStorageBatch"` } // SystemFee fees related to system. diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 43d716114..ecc97ba9d 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -109,6 +109,8 @@ type Blockchain struct { keyCache map[util.Uint160]map[string]*keys.PublicKey log *zap.Logger + + lastBatch *storage.MemBatch } type headersOpFunc func(headerList *HeaderHashList) @@ -604,6 +606,10 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { bc.lock.Lock() defer bc.lock.Unlock() + if bc.config.SaveStorageBatch { + bc.lastBatch = cache.dao.store.GetBatch() + } + _, err := cache.Persist() if err != nil { return err @@ -615,6 +621,11 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { return nil } +// LastBatch returns last persisted storage batch. +func (bc *Blockchain) LastBatch() *storage.MemBatch { + return bc.lastBatch +} + // processOutputs processes transaction outputs. func processOutputs(tx *transaction.Transaction, dao *cachedDao) error { for index, output := range tx.Outputs { From a1fecd2e88bdbdedc416a0c45341c0d548a0336a Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 7 Feb 2020 11:05:14 +0300 Subject: [PATCH 3/4] cli: add script for comparing storage dumps It uses jq to normalize and compare json dumps. --- scripts/compare-dumps | 63 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100755 scripts/compare-dumps diff --git a/scripts/compare-dumps b/scripts/compare-dumps new file mode 100755 index 000000000..da41dcd11 --- /dev/null +++ b/scripts/compare-dumps @@ -0,0 +1,63 @@ +#!/bin/sh + +ARG1=$1 +ARG2=$2 +if [ -z "$ARG1" ] || [ -z "$ARG2" ]; then + echo one of the arguments is empty + exit 1 +fi + + +compare() { + # replace replaces storage operation from "Changed" to "Added" + # normalize replaces performs replace and sorts keys in lexicographic order + # next we normalize every json file + # and finally compare them as a whole + jq --argfile x "$1" --argfile y "$2" \ + -n 'def replace: map(if (.state == "Changed") then (.state="Added") else . end); + def normalize: .storage = (.storage | replace | sort_by(.key)); + ($x | map(normalize)) as $x + | ($y | map(normalize)) as $y + | $x | range(length) | . as $i | select($x[$i] != $y[$i]) | $x[$i].block | halt_error(1)' +} + +if [ -f "$ARG1" ] && [ -f "$ARG2" ]; then + compare "$ARG1" "$ARG2" + if [ $? -ne 0 ]; then + echo failed + exit 1 + fi + + exit 0 +fi + +if [ ! -d "$ARG1" ] || [ ! -d "$ARG2" ]; then + echo both arguments must have the same type and exist + exit 1 +fi + +FIRST=$3 +if [ -z "$FIRST" ]; then + FIRST=1 +fi + +LAST=$4 +if [ -z "$LAST" ]; then + LAST=40 # 40_00000 block +fi + +# directories contain 100k blocks +for i in `seq $FIRST $LAST`; do + dir=BlockStorage_${i}00000 + echo Processing directory $dir + + # files are grouped by 1k blocks + for j in `seq $(((i-1)*100 + 1)) $((i*100))`; do + file=dump-block-${j}000.json + compare "$ARG1/$dir/$file" "$ARG2/$dir/$file" + if [ $? -ne 0 ]; then + echo failed on file $dir/$file + exit 1 + fi + done +done From 0a894db7f8c9988865b88f07f661878305ad3f7d Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 7 Feb 2020 15:08:25 +0300 Subject: [PATCH 4/4] storage: add Exists flag to KeyValue in batch Set Exists flag if an item with the specified key was already present in storage before persisting. --- cli/server/dump.go | 9 +++++++-- pkg/core/storage/memcached_store.go | 10 ++++++++-- pkg/core/storage/memcached_store_test.go | 8 ++++---- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/cli/server/dump.go b/cli/server/dump.go index 5e6ac009a..37865d3ab 100644 --- a/cli/server/dump.go +++ b/cli/server/dump.go @@ -68,9 +68,14 @@ func batchToMap(index uint32, batch *storage.MemBatch) map[string]interface{} { continue } + op := "Added" + if batch.Put[i].Exists { + op = "Changed" + } + key = toNeoStorageKey(key[1:]) ops = append(ops, storageOp{ - State: "Added", + State: op, Key: hex.EncodeToString(key), Value: "00" + hex.EncodeToString(batch.Put[i].Value), }) @@ -78,7 +83,7 @@ func batchToMap(index uint32, batch *storage.MemBatch) map[string]interface{} { for i := range batch.Deleted { key := batch.Deleted[i].Key - if len(key) == 0 || key[0] != byte(storage.STStorage) { + if len(key) == 0 || key[0] != byte(storage.STStorage) || !batch.Deleted[i].Exists { continue } diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index bc99393eb..fc5daba3b 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -14,6 +14,8 @@ type ( KeyValue struct { Key []byte Value []byte + + Exists bool } // MemBatch represents a changeset to be persisted. @@ -54,12 +56,16 @@ func (s *MemCachedStore) GetBatch() *MemBatch { b.Put = make([]KeyValue, 0, len(s.mem)) for k, v := range s.mem { - b.Put = append(b.Put, KeyValue{Key: []byte(k), Value: v}) + key := []byte(k) + _, err := s.ps.Get(key) + b.Put = append(b.Put, KeyValue{Key: key, Value: v, Exists: err == nil}) } b.Deleted = make([]KeyValue, 0, len(s.del)) for k := range s.del { - b.Deleted = append(b.Deleted, KeyValue{Key: []byte(k)}) + key := []byte(k) + _, err := s.ps.Get(key) + b.Deleted = append(b.Deleted, KeyValue{Key: key, Exists: err == nil}) } return &b diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index b3ac62cb3..cf7bcf40f 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -18,7 +18,7 @@ func TestMemCachedStorePersist(t *testing.T) { assert.Equal(t, 0, c) // persisting one key should result in one key in ps and nothing in ts assert.NoError(t, ts.Put([]byte("key"), []byte("value"))) - checkBatch(t, ts, []KeyValue{{[]byte("key"), []byte("value")}}, nil) + checkBatch(t, ts, []KeyValue{{Key: []byte("key"), Value: []byte("value")}}, nil) c, err = ts.Persist() checkBatch(t, ts, nil, nil) assert.Equal(t, nil, err) @@ -38,8 +38,8 @@ func TestMemCachedStorePersist(t *testing.T) { assert.Equal(t, ErrKeyNotFound, err) assert.Equal(t, []byte(nil), v) checkBatch(t, ts, []KeyValue{ - {[]byte("key"), []byte("newvalue")}, - {[]byte("key2"), []byte("value2")}, + {Key: []byte("key"), Value: []byte("newvalue"), Exists: true}, + {Key: []byte("key2"), Value: []byte("value2")}, }, nil) // two keys should be persisted (one overwritten and one new) and // available in the ps @@ -67,7 +67,7 @@ func TestMemCachedStorePersist(t *testing.T) { // test persisting deletions err = ts.Delete([]byte("key")) assert.Equal(t, nil, err) - checkBatch(t, ts, nil, []KeyValue{{Key: []byte("key")}}) + checkBatch(t, ts, nil, []KeyValue{{Key: []byte("key"), Exists: true}}) c, err = ts.Persist() checkBatch(t, ts, nil, nil) assert.Equal(t, nil, err)