blockchainer: allow to dump/restore chain

This commit is contained in:
Evgenii Stratonikov 2020-11-24 11:36:09 +03:00
parent 966b50f2ae
commit 6f7284906a
4 changed files with 208 additions and 91 deletions

View file

@ -10,6 +10,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/chaindump"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network"
@ -193,20 +194,9 @@ func dumpDB(ctx *cli.Context) error {
count = chainCount - start
}
writer.WriteU32LE(count)
for i := start; i < start+count; i++ {
bh := chain.GetHeaderHash(int(i))
b, err := chain.GetBlock(bh)
if err != nil {
return cli.NewExitError(fmt.Errorf("failed to get block %d: %w", i, err), 1)
}
buf := io.NewBufBinWriter()
b.EncodeBinary(buf.BinWriter)
bytes := buf.Bytes()
writer.WriteU32LE(uint32(len(bytes)))
writer.WriteBytes(bytes)
if writer.Err != nil {
return cli.NewExitError(err, 1)
}
err = chaindump.Dump(chain, writer, start, count)
if err != nil {
return cli.NewExitError(err.Error(), 1)
}
pprof.ShutDown()
prometheus.ShutDown()
@ -259,13 +249,6 @@ func restoreDB(ctx *cli.Context) error {
if count == 0 {
count = allBlocks - skip
}
i := uint32(0)
for ; i < skip; i++ {
_, err := readBlock(reader)
if err != nil {
return cli.NewExitError(err, 1)
}
}
gctx := newGraceContext()
var lastIndex uint32
@ -274,45 +257,41 @@ func restoreDB(ctx *cli.Context) error {
_ = dump.tryPersist(dumpDir, lastIndex)
}()
for ; i < skip+count; i++ {
var f = func(b *block.Block) error {
select {
case <-gctx.Done():
return cli.NewExitError("cancelled", 1)
return gctx.Err()
default:
return nil
}
bytes, err := readBlock(reader)
block := block.New(cfg.ProtocolConfiguration.Magic, cfg.ProtocolConfiguration.StateRootInHeader)
newReader := io.NewBinReaderFromBuf(bytes)
block.DecodeBinary(newReader)
if err != nil {
return cli.NewExitError(err, 1)
}
if block.Index == 0 && i == 0 && skip == 0 {
genesis, err := chain.GetBlock(block.Hash())
if err == nil && genesis.Index == 0 {
log.Info("skipped genesis block", zap.String("hash", block.Hash().StringLE()))
}
if dumpDir != "" {
f = func(b *block.Block) error {
select {
case <-gctx.Done():
return gctx.Err()
default:
}
} else {
err = chain.AddBlock(block)
if err != nil {
return cli.NewExitError(fmt.Errorf("failed to add block %d: %w", i, err), 1)
}
}
if dumpDir != "" {
batch := chain.LastBatch()
// The genesis block may already be persisted, so LastBatch() will return nil.
if batch == nil && block.Index == 0 {
continue
if batch == nil && b.Index == 0 {
return nil
}
dump.add(block.Index, batch)
lastIndex = block.Index
if block.Index%1000 == 0 {
if err := dump.tryPersist(dumpDir, block.Index); err != nil {
return cli.NewExitError(fmt.Errorf("can't dump storage to file: %w", err), 1)
dump.add(b.Index, batch)
lastIndex = b.Index
if b.Index%1000 == 0 {
if err := dump.tryPersist(dumpDir, b.Index); err != nil {
return fmt.Errorf("can't dump storage to file: %w", err)
}
}
return nil
}
}
err = chaindump.Restore(chain, reader, skip, count, f)
if err != nil {
return cli.NewExitError(err, 1)
}
return nil
}

View file

@ -11,6 +11,7 @@ import (
"github.com/nspcc-dev/neo-go/internal/testchain"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/chaindump"
"github.com/nspcc-dev/neo-go/pkg/core/fee"
"github.com/nspcc-dev/neo-go/pkg/core/interop/interopnames"
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
@ -1032,3 +1033,63 @@ func TestSubscriptions(t *testing.T) {
_, err = bc.genBlocks(2 * chBufSize)
require.NoError(t, err)
}
func testDumpAndRestore(t *testing.T, stateRootInHeader bool) {
bc := newTestChainWithStateRoot(t, stateRootInHeader)
defer bc.Close()
initBasicChain(t, bc)
require.True(t, bc.BlockHeight() > 5) // ensure that test is valid
w := io.NewBufBinWriter()
require.NoError(t, chaindump.Dump(bc, w.BinWriter, 0, bc.BlockHeight()+1))
require.NoError(t, w.Err)
buf := w.Bytes()
t.Run("invalid start", func(t *testing.T) {
bc2 := newTestChainWithStateRoot(t, stateRootInHeader)
defer bc2.Close()
r := io.NewBinReaderFromBuf(buf)
require.Error(t, chaindump.Restore(bc2, r, 2, 1, nil))
})
t.Run("good", func(t *testing.T) {
bc2 := newTestChainWithStateRoot(t, stateRootInHeader)
defer bc2.Close()
r := io.NewBinReaderFromBuf(buf)
require.NoError(t, chaindump.Restore(bc2, r, 0, 2, nil))
require.Equal(t, uint32(1), bc2.BlockHeight())
r = io.NewBinReaderFromBuf(buf) // new reader because start is relative to dump
require.NoError(t, chaindump.Restore(bc2, r, 2, 1, nil))
t.Run("check handler", func(t *testing.T) {
lastIndex := uint32(0)
errStopped := errors.New("stopped")
f := func(b *block.Block) error {
lastIndex = b.Index
if b.Index >= bc.BlockHeight()-1 {
return errStopped
}
return nil
}
require.NoError(t, chaindump.Restore(bc2, r, 0, 1, f))
require.Equal(t, bc2.BlockHeight(), lastIndex)
r = io.NewBinReaderFromBuf(buf)
err := chaindump.Restore(bc2, r, 4, bc.BlockHeight()-bc2.BlockHeight(), f)
require.True(t, errors.Is(err, errStopped))
require.Equal(t, bc.BlockHeight()-1, lastIndex)
})
})
}
func TestDumpAndRestore(t *testing.T) {
t.Run("no state root", func(t *testing.T) {
testDumpAndRestore(t, false)
})
t.Run("with state root", func(t *testing.T) {
testDumpAndRestore(t, true)
})
}

View file

@ -0,0 +1,77 @@
package chaindump
import (
"fmt"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/io"
)
// Dump writes count blocks from start to the provided writer.
// Note: header needs to be written separately by client.
func Dump(bc blockchainer.Blockchainer, w *io.BinWriter, start, count uint32) error {
for i := start; i < start+count; i++ {
bh := bc.GetHeaderHash(int(i))
b, err := bc.GetBlock(bh)
if err != nil {
return err
}
buf := io.NewBufBinWriter()
b.EncodeBinary(buf.BinWriter)
bytes := buf.Bytes()
w.WriteU32LE(uint32(len(bytes)))
w.WriteBytes(bytes)
if w.Err != nil {
return w.Err
}
}
return nil
}
// Restore restores blocks from provided reader.
// f is called after addition of every block.
func Restore(bc blockchainer.Blockchainer, r *io.BinReader, skip, count uint32, f func(b *block.Block) error) error {
readBlock := func(r *io.BinReader) ([]byte, error) {
var size = r.ReadU32LE()
buf := make([]byte, size)
r.ReadBytes(buf)
return buf, r.Err
}
i := uint32(0)
for ; i < skip; i++ {
_, err := readBlock(r)
if err != nil {
return err
}
}
magic := bc.GetConfig().Magic
stateRootInHeader := bc.GetConfig().StateRootInHeader
for ; i < skip+count; i++ {
buf, err := readBlock(r)
if err != nil {
return err
}
b := block.New(magic, stateRootInHeader)
r := io.NewBinReaderFromBuf(buf)
b.DecodeBinary(r)
if r.Err != nil {
return r.Err
}
if b.Index != 0 || i != 0 || skip != 0 {
err = bc.AddBlock(b)
if err != nil {
return fmt.Errorf("failed to add block %d: %w", i, err)
}
}
if f != nil {
if err := f(b); err != nil {
return err
}
}
}
return nil
}

View file

@ -16,6 +16,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/compiler"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/chaindump"
"github.com/nspcc-dev/neo-go/pkg/core/fee"
"github.com/nspcc-dev/neo-go/pkg/core/interop/interopnames"
"github.com/nspcc-dev/neo-go/pkg/core/native"
@ -171,10 +172,49 @@ func newDumbBlock() *block.Block {
func TestCreateBasicChain(t *testing.T) {
const saveChain = false
const prefix = "../rpc/server/testdata/"
// To make enough GAS.
const numOfEmptyBlocks = 200
bc := newTestChain(t)
defer bc.Close()
initBasicChain(t, bc)
if saveChain {
outStream, err := os.Create(prefix + "testblocks.acc")
require.NoError(t, err)
defer outStream.Close()
writer := io.NewBinWriterFromIO(outStream)
writer.WriteU32LE(bc.BlockHeight())
err = chaindump.Dump(bc, writer, 1, bc.BlockHeight())
require.NoError(t, err)
}
priv0 := testchain.PrivateKeyByID(0)
priv1 := testchain.PrivateKeyByID(1)
priv0ScriptHash := priv0.GetScriptHash()
acc0, err := wallet.NewAccountFromWIF(priv0.WIF())
require.NoError(t, err)
// Prepare some transaction for future submission.
txSendRaw := newNEP17Transfer(bc.contracts.NEO.Hash, priv0ScriptHash, priv1.GetScriptHash(), int64(util.Fixed8FromInt64(1000)))
txSendRaw.ValidUntilBlock = transaction.MaxValidUntilBlockIncrement
txSendRaw.Nonce = 0x1234
txSendRaw.Signers = []transaction.Signer{{
Account: priv0ScriptHash,
Scopes: transaction.CalledByEntry,
AllowedContracts: nil,
AllowedGroups: nil,
}}
require.NoError(t, addNetworkFee(bc, txSendRaw, acc0))
require.NoError(t, acc0.SignTx(txSendRaw))
bw := io.NewBufBinWriter()
txSendRaw.EncodeBinary(bw.BinWriter)
t.Logf("sendrawtransaction: %s", hex.EncodeToString(bw.Bytes()))
}
func initBasicChain(t *testing.T, bc *Blockchain) {
const prefix = "../rpc/server/testdata/"
// Increase in case if you need more blocks
const validUntilBlock = numOfEmptyBlocks + 1000
const validUntilBlock = 1200
// To be incremented after each created transaction to keep chain constant.
var testNonce uint32 = 1
@ -186,8 +226,6 @@ func TestCreateBasicChain(t *testing.T) {
}
const neoAmount = 99999000
bc := newTestChain(t)
defer bc.Close()
gasHash := bc.contracts.GAS.Hash
neoHash := bc.contracts.NEO.Hash
@ -340,44 +378,6 @@ func TestCreateBasicChain(t *testing.T) {
require.NoError(t, acc0.SignTx(txDeploy2))
b = bc.newBlock(txDeploy2)
require.NoError(t, bc.AddBlock(b))
if saveChain {
outStream, err := os.Create(prefix + "testblocks.acc")
require.NoError(t, err)
defer outStream.Close()
writer := io.NewBinWriterFromIO(outStream)
count := bc.BlockHeight() + 1
writer.WriteU32LE(count - 1)
for i := 1; i < int(count); i++ {
bh := bc.GetHeaderHash(i)
b, err := bc.GetBlock(bh)
require.NoError(t, err)
bytes, err := testserdes.EncodeBinary(b)
require.NoError(t, err)
writer.WriteU32LE(uint32(len(bytes)))
writer.WriteBytes(bytes)
require.NoError(t, writer.Err)
}
}
// Prepare some transaction for future submission.
txSendRaw := newNEP17Transfer(neoHash, priv0ScriptHash, priv1.GetScriptHash(), int64(util.Fixed8FromInt64(1000)))
txSendRaw.ValidUntilBlock = validUntilBlock
txSendRaw.Nonce = getNextNonce()
txSendRaw.Signers = []transaction.Signer{{
Account: priv0ScriptHash,
Scopes: transaction.CalledByEntry,
AllowedContracts: nil,
AllowedGroups: nil,
}}
require.NoError(t, addNetworkFee(bc, txSendRaw, acc0))
require.NoError(t, acc0.SignTx(txSendRaw))
bw = io.NewBufBinWriter()
txSendRaw.EncodeBinary(bw.BinWriter)
t.Logf("sendrawtransaction: %s", hex.EncodeToString(bw.Bytes()))
}
func newNEP17Transfer(sc, from, to util.Uint160, amount int64) *transaction.Transaction {