Merge pull request #440 from nspcc-dev/db-dump-restore

DB dump and restore, fixes #436.
This commit is contained in:
Roman Khimov 2019-10-21 15:42:05 +03:00 committed by GitHub
commit f1750d117b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 238 additions and 57 deletions

View file

@ -17,12 +17,10 @@ func main() {
ctl.Version = config.Version ctl.Version = config.Version
ctl.Usage = "Official Go client for Neo" ctl.Usage = "Official Go client for Neo"
ctl.Commands = []cli.Command{ ctl.Commands = append(ctl.Commands, server.NewCommands()...)
server.NewCommand(), ctl.Commands = append(ctl.Commands, smartcontract.NewCommands()...)
smartcontract.NewCommand(), ctl.Commands = append(ctl.Commands, wallet.NewCommands()...)
wallet.NewCommand(), ctl.Commands = append(ctl.Commands, vm.NewCommands()...)
vm.NewCommand(),
}
if err := ctl.Run(os.Args); err != nil { if err := ctl.Run(os.Args); err != nil {
panic(err) panic(err)

View file

@ -9,6 +9,7 @@ import (
"github.com/CityOfZion/neo-go/config" "github.com/CityOfZion/neo-go/config"
"github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/storage"
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/CityOfZion/neo-go/pkg/network" "github.com/CityOfZion/neo-go/pkg/network"
"github.com/CityOfZion/neo-go/pkg/rpc" "github.com/CityOfZion/neo-go/pkg/rpc"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -16,18 +17,63 @@ import (
"github.com/urfave/cli" "github.com/urfave/cli"
) )
// NewCommand creates a new Node command. // NewCommands returns 'node' command.
func NewCommand() cli.Command { func NewCommands() []cli.Command {
return cli.Command{ var cfgFlags = []cli.Flag{
Name: "node",
Usage: "start a NEO node",
Action: startServer,
Flags: []cli.Flag{
cli.StringFlag{Name: "config-path"}, cli.StringFlag{Name: "config-path"},
cli.BoolFlag{Name: "privnet, p"}, cli.BoolFlag{Name: "privnet, p"},
cli.BoolFlag{Name: "mainnet, m"}, cli.BoolFlag{Name: "mainnet, m"},
cli.BoolFlag{Name: "testnet, t"}, cli.BoolFlag{Name: "testnet, t"},
cli.BoolFlag{Name: "debug, d"}, cli.BoolFlag{Name: "debug, d"},
}
var cfgWithCountFlags = make([]cli.Flag, len(cfgFlags))
copy(cfgWithCountFlags, cfgFlags)
cfgWithCountFlags = append(cfgWithCountFlags,
cli.UintFlag{
Name: "count, c",
Usage: "number of blocks to be processed (default or 0: all chain)",
},
cli.UintFlag{
Name: "skip, s",
Usage: "number of blocks to skip (default: 0)",
},
)
var cfgCountOutFlags = make([]cli.Flag, len(cfgWithCountFlags))
copy(cfgCountOutFlags, cfgWithCountFlags)
cfgCountOutFlags = append(cfgCountOutFlags, cli.StringFlag{
Name: "out, o",
Usage: "Output file (stdout if not given)",
})
var cfgCountInFlags = make([]cli.Flag, len(cfgWithCountFlags))
copy(cfgCountInFlags, cfgWithCountFlags)
cfgCountInFlags = append(cfgCountInFlags, cli.StringFlag{
Name: "in, i",
Usage: "Input file (stdin if not given)",
})
return []cli.Command{
{
Name: "node",
Usage: "start a NEO node",
Action: startServer,
Flags: cfgFlags,
},
{
Name: "db",
Usage: "database manipulations",
Subcommands: []cli.Command{
{
Name: "dump",
Usage: "dump blocks (starting with block #1) to the file",
Action: dumpDB,
Flags: cfgCountOutFlags,
},
{
Name: "restore",
Usage: "restore blocks from the file",
Action: restoreDB,
Flags: cfgCountInFlags,
},
},
}, },
} }
} }
@ -43,26 +89,154 @@ func newGraceContext() context.Context {
return ctx return ctx
} }
func startServer(ctx *cli.Context) error { // getConfigFromContext looks at path and mode flags in the given config and
net := config.ModePrivNet // returns appropriate config.
func getConfigFromContext(ctx *cli.Context) (config.Config, error) {
var net = config.ModePrivNet
if ctx.Bool("testnet") { if ctx.Bool("testnet") {
net = config.ModeTestNet net = config.ModeTestNet
} }
if ctx.Bool("mainnet") { if ctx.Bool("mainnet") {
net = config.ModeMainNet net = config.ModeMainNet
} }
grace, cancel := context.WithCancel(newGraceContext())
defer cancel()
configPath := "./config" configPath := "./config"
if argCp := ctx.String("config-path"); argCp != "" { if argCp := ctx.String("config-path"); argCp != "" {
configPath = argCp configPath = argCp
} }
cfg, err := config.Load(configPath, net) return config.Load(configPath, net)
}
// handleLoggingParams enables debugging output is that's requested by the user.
func handleLoggingParams(ctx *cli.Context) {
if ctx.Bool("debug") {
log.SetLevel(log.DebugLevel)
}
}
func getCountAndSkipFromContext(ctx *cli.Context) (uint32, uint32) {
count := uint32(ctx.Uint("count"))
skip := uint32(ctx.Uint("skip"))
return count, skip
}
func dumpDB(ctx *cli.Context) error {
cfg, err := getConfigFromContext(ctx)
if err != nil { if err != nil {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
handleLoggingParams(ctx)
count, skip := getCountAndSkipFromContext(ctx)
var outStream = os.Stdout
if out := ctx.String("out"); out != "" {
outStream, err = os.Create(out)
if err != nil {
return cli.NewExitError(err, 1)
}
}
defer outStream.Close()
writer := io.NewBinWriterFromIO(outStream)
grace, cancel := context.WithCancel(newGraceContext())
defer cancel()
chain, err := initBlockChain(cfg)
if err != nil {
return cli.NewExitError(err, 1)
}
go chain.Run(grace)
chainHeight := chain.BlockHeight()
if skip+count > chainHeight {
return cli.NewExitError(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", chainHeight, count, skip), 1)
}
if count == 0 {
count = chainHeight - skip
}
writer.WriteLE(count)
for i := skip + 1; i <= count; i++ {
bh := chain.GetHeaderHash(int(i))
b, err := chain.GetBlock(bh)
if err != nil {
return cli.NewExitError(fmt.Errorf("failed to get block %d: %s", i, err), 1)
}
b.EncodeBinary(writer)
if writer.Err != nil {
return cli.NewExitError(err, 1)
}
}
return nil
}
func restoreDB(ctx *cli.Context) error {
cfg, err := getConfigFromContext(ctx)
if err != nil {
return err
}
handleLoggingParams(ctx)
count, skip := getCountAndSkipFromContext(ctx)
var inStream = os.Stdin
if in := ctx.String("in"); in != "" {
inStream, err = os.Open(in)
if err != nil {
return cli.NewExitError(err, 1)
}
}
defer inStream.Close()
reader := io.NewBinReaderFromIO(inStream)
grace, cancel := context.WithCancel(newGraceContext())
defer cancel()
chain, err := initBlockChain(cfg)
if err != nil {
return err
}
go chain.Run(grace)
var allBlocks uint32
reader.ReadLE(&allBlocks)
if reader.Err != nil {
return cli.NewExitError(err, 1)
}
if skip+count > allBlocks {
return cli.NewExitError(fmt.Errorf("input file has only %d blocks, can't read %d starting from %d", allBlocks, count, skip), 1)
}
if count == 0 {
count = allBlocks
}
i := uint32(0)
for ; i < skip; i++ {
b := &core.Block{}
b.DecodeBinary(reader)
if reader.Err != nil {
return cli.NewExitError(err, 1)
}
}
for ; i < count; i++ {
b := &core.Block{}
b.DecodeBinary(reader)
if reader.Err != nil {
return cli.NewExitError(err, 1)
}
err := chain.AddBlock(b)
if err != nil {
return cli.NewExitError(fmt.Errorf("failed to add block %d: %s", i, err), 1)
}
}
return nil
}
func startServer(ctx *cli.Context) error {
cfg, err := getConfigFromContext(ctx)
if err != nil {
return err
}
handleLoggingParams(ctx)
grace, cancel := context.WithCancel(newGraceContext())
defer cancel()
serverConfig := network.NewServerConfig(cfg) serverConfig := network.NewServerConfig(cfg)
@ -71,10 +245,6 @@ func startServer(ctx *cli.Context) error {
return err return err
} }
if ctx.Bool("debug") {
log.SetLevel(log.DebugLevel)
}
server := network.NewServer(serverConfig, chain) server := network.NewServer(serverConfig, chain)
rpcServer := rpc.NewServer(chain, cfg.ApplicationConfiguration.RPCPort, server) rpcServer := rpc.NewServer(chain, cfg.ApplicationConfiguration.RPCPort, server)
errChan := make(chan error) errChan := make(chan error)

View file

@ -35,9 +35,9 @@ func Main(op string, args []interface{}) {
}` }`
) )
// NewCommand returns a new contract command. // NewCommands returns 'contract' command.
func NewCommand() cli.Command { func NewCommands() []cli.Command {
return cli.Command{ return []cli.Command{{
Name: "contract", Name: "contract",
Usage: "compile - debug - deploy smart contracts", Usage: "compile - debug - deploy smart contracts",
Subcommands: []cli.Command{ Subcommands: []cli.Command{
@ -98,7 +98,7 @@ func NewCommand() cli.Command {
}, },
}, },
}, },
} }}
} }
// initSmartContract initializes a given directory with some boiler plate code. // initSmartContract initializes a given directory with some boiler plate code.

View file

@ -9,9 +9,9 @@ import (
"github.com/urfave/cli" "github.com/urfave/cli"
) )
// NewCommand creates a new VM command. // NewCommands returns 'vm' command.
func NewCommand() cli.Command { func NewCommands() []cli.Command {
return cli.Command{ return []cli.Command{{
Name: "vm", Name: "vm",
Usage: "start the virtual machine", Usage: "start the virtual machine",
Action: startVMPrompt, Action: startVMPrompt,
@ -31,7 +31,7 @@ func NewCommand() cli.Command {
}, },
}, },
}, },
} }}
} }
func startVMPrompt(ctx *cli.Context) error { func startVMPrompt(ctx *cli.Context) error {

View file

@ -16,9 +16,9 @@ var (
errPhraseMismatch = errors.New("the entered pass-phrases do not match. Maybe you have misspelled them") errPhraseMismatch = errors.New("the entered pass-phrases do not match. Maybe you have misspelled them")
) )
// NewCommand creates a new Wallet command. // NewCommands returns 'wallet' command.
func NewCommand() cli.Command { func NewCommands() []cli.Command {
return cli.Command{ return []cli.Command{{
Name: "wallet", Name: "wallet",
Usage: "create, open and manage a NEO wallet", Usage: "create, open and manage a NEO wallet",
Subcommands: []cli.Command{ Subcommands: []cli.Command{
@ -49,7 +49,7 @@ func NewCommand() cli.Command {
}, },
}, },
}, },
} }}
} }
func openWallet(ctx *cli.Context) error { func openWallet(ctx *cli.Context) error {

View file

@ -137,7 +137,16 @@ func (bc *Blockchain) init() error {
// that with stored blocks. // that with stored blocks.
if currHeaderHeight > bc.storedHeaderCount { if currHeaderHeight > bc.storedHeaderCount {
hash := currHeaderHash hash := currHeaderHash
targetHash := bc.headerList.Get(bc.headerList.Len() - 1) var targetHash util.Uint256
if bc.headerList.Len() > 0 {
targetHash = bc.headerList.Get(bc.headerList.Len() - 1)
} else {
genesisBlock, err := createGenesisBlock(bc.config)
if err != nil {
return err
}
targetHash = genesisBlock.Hash()
}
headers := make([]*Header, 0) headers := make([]*Header, 0)
for hash != targetHash { for hash != targetHash {
@ -166,7 +175,7 @@ func (bc *Blockchain) Run(ctx context.Context) {
persistTimer := time.NewTimer(persistInterval) persistTimer := time.NewTimer(persistInterval)
defer func() { defer func() {
persistTimer.Stop() persistTimer.Stop()
if err := bc.persist(ctx); err != nil { if err := bc.persist(); err != nil {
log.Warnf("failed to persist: %s", err) log.Warnf("failed to persist: %s", err)
} }
if err := bc.store.Close(); err != nil { if err := bc.store.Close(); err != nil {
@ -182,7 +191,7 @@ func (bc *Blockchain) Run(ctx context.Context) {
bc.headersOpDone <- struct{}{} bc.headersOpDone <- struct{}{}
case <-persistTimer.C: case <-persistTimer.C:
go func() { go func() {
err := bc.persist(ctx) err := bc.persist()
if err != nil { if err != nil {
log.Warnf("failed to persist blockchain: %s", err) log.Warnf("failed to persist blockchain: %s", err)
} }
@ -461,7 +470,7 @@ func (bc *Blockchain) storeBlock(block *Block) error {
} }
// persist flushes current in-memory store contents to the persistent storage. // persist flushes current in-memory store contents to the persistent storage.
func (bc *Blockchain) persist(ctx context.Context) error { func (bc *Blockchain) persist() error {
var ( var (
start = time.Now() start = time.Now()
persisted int persisted int
@ -479,13 +488,16 @@ func (bc *Blockchain) persist(ctx context.Context) error {
oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight) oldHeight := atomic.SwapUint32(&bc.persistedHeight, bHeight)
diff := bHeight - oldHeight diff := bHeight - oldHeight
storedHeaderHeight, _, err := storage.CurrentHeaderHeight(bc.store)
if err != nil {
return err
}
if persisted > 0 { if persisted > 0 {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"persistedBlocks": diff, "persistedBlocks": diff,
"persistedKeys": persisted, "persistedKeys": persisted,
"headerHeight": bc.HeaderHeight(), "headerHeight": storedHeaderHeight,
"blockHeight": bc.BlockHeight(), "blockHeight": bHeight,
"persistedHeight": atomic.LoadUint32(&bc.persistedHeight),
"took": time.Since(start), "took": time.Since(start),
}).Info("blockchain persist completed") }).Info("blockchain persist completed")
} }

View file

@ -1,7 +1,6 @@
package core package core
import ( import (
"context"
"testing" "testing"
"github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/storage"
@ -53,7 +52,7 @@ func TestAddBlock(t *testing.T) {
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash()) assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())
// This one tests persisting blocks, so it does need to persist() // This one tests persisting blocks, so it does need to persist()
require.NoError(t, bc.persist(context.Background())) require.NoError(t, bc.persist())
for _, block := range blocks { for _, block := range blocks {
key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesReverse()) key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesReverse())
@ -82,7 +81,7 @@ func TestGetHeader(t *testing.T) {
b2 := newBlock(2) b2 := newBlock(2)
_, err = bc.GetHeader(b2.Hash()) _, err = bc.GetHeader(b2.Hash())
assert.Error(t, err) assert.Error(t, err)
assert.NoError(t, bc.persist(context.Background())) assert.NoError(t, bc.persist())
} }
} }
@ -106,7 +105,7 @@ func TestGetBlock(t *testing.T) {
assert.Equal(t, blocks[i].Index, block.Index) assert.Equal(t, blocks[i].Index, block.Index)
assert.Equal(t, blocks[i].Hash(), block.Hash()) assert.Equal(t, blocks[i].Hash(), block.Hash())
} }
assert.NoError(t, bc.persist(context.Background())) assert.NoError(t, bc.persist())
} }
} }
@ -127,7 +126,7 @@ func TestHasBlock(t *testing.T) {
} }
newBlock := newBlock(51) newBlock := newBlock(51)
assert.False(t, bc.HasBlock(newBlock.Hash())) assert.False(t, bc.HasBlock(newBlock.Hash()))
assert.NoError(t, bc.persist(context.Background())) assert.NoError(t, bc.persist())
} }
} }
@ -135,10 +134,12 @@ func TestGetTransaction(t *testing.T) {
b1 := getDecodedBlock(t, 1) b1 := getDecodedBlock(t, 1)
block := getDecodedBlock(t, 2) block := getDecodedBlock(t, 2)
bc := newTestChain(t) bc := newTestChain(t)
// Turn verification off, because these blocks are really from some other chain
// and can't be verified, but we don't care about that in this test.
bc.config.VerifyBlocks = false
// These are from some kind of different chain, so can't be added via AddBlock(). assert.Nil(t, bc.AddBlock(b1))
assert.Nil(t, bc.storeBlock(b1)) assert.Nil(t, bc.AddBlock(block))
assert.Nil(t, bc.storeBlock(block))
// Test unpersisted and persisted access // Test unpersisted and persisted access
for j := 0; j < 2; j++ { for j := 0; j < 2; j++ {
@ -151,6 +152,6 @@ func TestGetTransaction(t *testing.T) {
assert.Equal(t, 1, io.GetVarSize(tx.Inputs)) assert.Equal(t, 1, io.GetVarSize(tx.Inputs))
assert.Equal(t, 1, io.GetVarSize(tx.Outputs)) assert.Equal(t, 1, io.GetVarSize(tx.Outputs))
assert.Equal(t, 1, io.GetVarSize(tx.Scripts)) assert.Equal(t, 1, io.GetVarSize(tx.Scripts))
assert.NoError(t, bc.persist(context.Background())) assert.NoError(t, bc.persist())
} }
} }