forked from TrueCloudLab/neoneo-go
78bd01db57
Diff dumps provided by NGD contain index of the first block. Also do proper error handling.
460 lines
12 KiB
Go
460 lines
12 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
|
"github.com/nspcc-dev/neo-go/pkg/core"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/storage"
|
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
|
"github.com/nspcc-dev/neo-go/pkg/network"
|
|
"github.com/nspcc-dev/neo-go/pkg/network/metrics"
|
|
"github.com/nspcc-dev/neo-go/pkg/rpc/server"
|
|
"github.com/pkg/errors"
|
|
"github.com/urfave/cli"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
)
|
|
|
|
// NewCommands returns 'node' command.
|
|
func NewCommands() []cli.Command {
|
|
var cfgFlags = []cli.Flag{
|
|
cli.StringFlag{Name: "config-path"},
|
|
cli.BoolFlag{Name: "privnet, p"},
|
|
cli.BoolFlag{Name: "mainnet, m"},
|
|
cli.BoolFlag{Name: "testnet, t"},
|
|
cli.BoolFlag{Name: "debug, d"},
|
|
}
|
|
var cfgWithCountFlags = make([]cli.Flag, len(cfgFlags))
|
|
copy(cfgWithCountFlags, cfgFlags)
|
|
cfgWithCountFlags = append(cfgWithCountFlags,
|
|
cli.UintFlag{
|
|
Name: "count, c",
|
|
Usage: "number of blocks to be processed (default or 0: all chain)",
|
|
},
|
|
cli.UintFlag{
|
|
Name: "start, s",
|
|
Usage: "block number to start from (default: 0)",
|
|
},
|
|
)
|
|
var cfgCountOutFlags = make([]cli.Flag, len(cfgWithCountFlags))
|
|
copy(cfgCountOutFlags, cfgWithCountFlags)
|
|
cfgCountOutFlags = append(cfgCountOutFlags,
|
|
cli.StringFlag{
|
|
Name: "out, o",
|
|
Usage: "Output file (stdout if not given)",
|
|
},
|
|
)
|
|
var cfgCountInFlags = make([]cli.Flag, len(cfgWithCountFlags))
|
|
copy(cfgCountInFlags, cfgWithCountFlags)
|
|
cfgCountInFlags = append(cfgCountInFlags,
|
|
cli.StringFlag{
|
|
Name: "in, i",
|
|
Usage: "Input file (stdin if not given)",
|
|
},
|
|
cli.StringFlag{
|
|
Name: "dump",
|
|
Usage: "directory for storing JSON dumps",
|
|
},
|
|
cli.BoolFlag{
|
|
Name: "diff, k",
|
|
Usage: "Use if DB is restore from diff and not full dump",
|
|
},
|
|
)
|
|
return []cli.Command{
|
|
{
|
|
Name: "node",
|
|
Usage: "start a NEO node",
|
|
Action: startServer,
|
|
Flags: cfgFlags,
|
|
},
|
|
{
|
|
Name: "db",
|
|
Usage: "database manipulations",
|
|
Subcommands: []cli.Command{
|
|
{
|
|
Name: "dump",
|
|
Usage: "dump blocks (starting with block #1) to the file",
|
|
UsageText: "When --start option is provided format is different because " +
|
|
"index of the first block is written first.",
|
|
Action: dumpDB,
|
|
Flags: cfgCountOutFlags,
|
|
},
|
|
{
|
|
Name: "restore",
|
|
Usage: "restore blocks from the file",
|
|
Action: restoreDB,
|
|
Flags: cfgCountInFlags,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func newGraceContext() context.Context {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
stop := make(chan os.Signal, 1)
|
|
signal.Notify(stop, os.Interrupt)
|
|
go func() {
|
|
<-stop
|
|
cancel()
|
|
}()
|
|
return ctx
|
|
}
|
|
|
|
// getConfigFromContext looks at path and mode flags in the given config and
|
|
// returns appropriate config.
|
|
func getConfigFromContext(ctx *cli.Context) (config.Config, error) {
|
|
var net = config.ModePrivNet
|
|
if ctx.Bool("testnet") {
|
|
net = config.ModeTestNet
|
|
}
|
|
if ctx.Bool("mainnet") {
|
|
net = config.ModeMainNet
|
|
}
|
|
configPath := "./config"
|
|
if argCp := ctx.String("config-path"); argCp != "" {
|
|
configPath = argCp
|
|
}
|
|
return config.Load(configPath, net)
|
|
}
|
|
|
|
// handleLoggingParams reads logging parameters.
|
|
// If user selected debug level -- function enables it.
|
|
// If logPath is configured -- function creates dir and file for logging.
|
|
func handleLoggingParams(ctx *cli.Context, cfg config.ApplicationConfiguration) (*zap.Logger, error) {
|
|
level := zapcore.InfoLevel
|
|
if ctx.Bool("debug") {
|
|
level = zapcore.DebugLevel
|
|
}
|
|
|
|
cc := zap.NewProductionConfig()
|
|
cc.DisableCaller = true
|
|
cc.DisableStacktrace = true
|
|
cc.EncoderConfig.EncodeDuration = zapcore.StringDurationEncoder
|
|
cc.EncoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
|
cc.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
|
cc.Encoding = "console"
|
|
cc.Level = zap.NewAtomicLevelAt(level)
|
|
cc.Sampling = nil
|
|
|
|
if logPath := cfg.LogPath; logPath != "" {
|
|
if err := io.MakeDirForFile(logPath, "logger"); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cc.OutputPaths = []string{logPath}
|
|
}
|
|
|
|
return cc.Build()
|
|
}
|
|
|
|
func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *metrics.Service, *metrics.Service, error) {
|
|
chain, err := initBlockChain(cfg, log)
|
|
if err != nil {
|
|
return nil, nil, nil, cli.NewExitError(err, 1)
|
|
}
|
|
configureAddresses(cfg.ApplicationConfiguration)
|
|
prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus, log)
|
|
pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof, log)
|
|
|
|
go chain.Run()
|
|
go prometheus.Start()
|
|
go pprof.Start()
|
|
|
|
return chain, prometheus, pprof, nil
|
|
}
|
|
|
|
func dumpDB(ctx *cli.Context) error {
|
|
cfg, err := getConfigFromContext(ctx)
|
|
if err != nil {
|
|
return cli.NewExitError(err, 1)
|
|
}
|
|
log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration)
|
|
if err != nil {
|
|
return cli.NewExitError(err, 1)
|
|
}
|
|
count := uint32(ctx.Uint("count"))
|
|
start := uint32(ctx.Uint("start"))
|
|
|
|
var outStream = os.Stdout
|
|
if out := ctx.String("out"); out != "" {
|
|
outStream, err = os.Create(out)
|
|
if err != nil {
|
|
return cli.NewExitError(err, 1)
|
|
}
|
|
}
|
|
defer outStream.Close()
|
|
writer := io.NewBinWriterFromIO(outStream)
|
|
|
|
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
chainCount := chain.BlockHeight() + 1
|
|
if start+count > chainCount {
|
|
return cli.NewExitError(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", chainCount-1, count, start), 1)
|
|
}
|
|
if count == 0 {
|
|
count = chainCount - start
|
|
}
|
|
if start != 0 {
|
|
writer.WriteU32LE(start)
|
|
}
|
|
writer.WriteU32LE(count)
|
|
for i := start; i < start+count; i++ {
|
|
bh := chain.GetHeaderHash(int(i))
|
|
b, err := chain.GetBlock(bh)
|
|
if err != nil {
|
|
return cli.NewExitError(fmt.Errorf("failed to get block %d: %s", i, err), 1)
|
|
}
|
|
buf := io.NewBufBinWriter()
|
|
b.EncodeBinary(buf.BinWriter)
|
|
bytes := buf.Bytes()
|
|
writer.WriteU32LE(uint32(len(bytes)))
|
|
writer.WriteBytes(bytes)
|
|
if writer.Err != nil {
|
|
return cli.NewExitError(err, 1)
|
|
}
|
|
}
|
|
pprof.ShutDown()
|
|
prometheus.ShutDown()
|
|
chain.Close()
|
|
return nil
|
|
}
|
|
|
|
func restoreDB(ctx *cli.Context) error {
|
|
cfg, err := getConfigFromContext(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration)
|
|
if err != nil {
|
|
return cli.NewExitError(err, 1)
|
|
}
|
|
count := uint32(ctx.Uint("count"))
|
|
start := uint32(ctx.Uint("start"))
|
|
|
|
var inStream = os.Stdin
|
|
if in := ctx.String("in"); in != "" {
|
|
inStream, err = os.Open(in)
|
|
if err != nil {
|
|
return cli.NewExitError(err, 1)
|
|
}
|
|
}
|
|
defer inStream.Close()
|
|
reader := io.NewBinReaderFromIO(inStream)
|
|
|
|
dumpDir := ctx.String("dump")
|
|
if dumpDir != "" {
|
|
cfg.ProtocolConfiguration.SaveStorageBatch = true
|
|
}
|
|
|
|
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer chain.Close()
|
|
defer prometheus.ShutDown()
|
|
defer pprof.ShutDown()
|
|
|
|
dumpStart := uint32(0)
|
|
dumpSize := reader.ReadU32LE()
|
|
if ctx.Bool("diff") {
|
|
// in diff first uint32 is the index of the first block
|
|
dumpStart = dumpSize
|
|
dumpSize = reader.ReadU32LE()
|
|
}
|
|
if reader.Err != nil {
|
|
return cli.NewExitError(err, 1)
|
|
}
|
|
if start < dumpStart {
|
|
return cli.NewExitError(fmt.Errorf("input file start from %d block, can't import %d", dumpStart, start), 1)
|
|
}
|
|
|
|
lastBlock := dumpStart + dumpSize
|
|
if start+count > lastBlock {
|
|
return cli.NewExitError(fmt.Errorf("input file has blocks up until %d, can't read %d starting from %d", lastBlock, count, start), 1)
|
|
}
|
|
if count == 0 {
|
|
count = lastBlock - start
|
|
}
|
|
i := dumpStart
|
|
for ; i < start; i++ {
|
|
_, err := readBlock(reader)
|
|
if err != nil {
|
|
return cli.NewExitError(err, 1)
|
|
}
|
|
}
|
|
|
|
gctx := newGraceContext()
|
|
var lastIndex uint32
|
|
dump := newDump()
|
|
defer func() {
|
|
_ = dump.tryPersist(dumpDir, lastIndex)
|
|
}()
|
|
|
|
for ; i < start+count; i++ {
|
|
select {
|
|
case <-gctx.Done():
|
|
return cli.NewExitError("cancelled", 1)
|
|
default:
|
|
}
|
|
bytes, err := readBlock(reader)
|
|
if err != nil {
|
|
return cli.NewExitError(err, 1)
|
|
}
|
|
block := &block.Block{}
|
|
newReader := io.NewBinReaderFromBuf(bytes)
|
|
block.DecodeBinary(newReader)
|
|
if newReader.Err != nil {
|
|
return cli.NewExitError(newReader.Err, 1)
|
|
}
|
|
if block.Index == 0 && i == 0 && start == 0 {
|
|
genesis, err := chain.GetBlock(block.Hash())
|
|
if err == nil && genesis.Index == 0 {
|
|
log.Info("skipped genesis block", zap.String("hash", block.Hash().StringLE()))
|
|
continue
|
|
}
|
|
}
|
|
err = chain.AddBlock(block)
|
|
if err != nil {
|
|
return cli.NewExitError(fmt.Errorf("failed to add block %d: %s", i, err), 1)
|
|
}
|
|
|
|
if dumpDir != "" {
|
|
batch := chain.LastBatch()
|
|
dump.add(block.Index, batch)
|
|
lastIndex = block.Index
|
|
if block.Index%1000 == 0 {
|
|
if err := dump.tryPersist(dumpDir, block.Index); err != nil {
|
|
return cli.NewExitError(fmt.Errorf("can't dump storage to file: %v", err), 1)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// readBlock performs reading of block size and then bytes with the length equal to that size.
|
|
func readBlock(reader *io.BinReader) ([]byte, error) {
|
|
var size = reader.ReadU32LE()
|
|
bytes := make([]byte, size)
|
|
reader.ReadBytes(bytes)
|
|
if reader.Err != nil {
|
|
return nil, reader.Err
|
|
}
|
|
return bytes, nil
|
|
}
|
|
|
|
func startServer(ctx *cli.Context) error {
|
|
cfg, err := getConfigFromContext(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
grace, cancel := context.WithCancel(newGraceContext())
|
|
defer cancel()
|
|
|
|
serverConfig := network.NewServerConfig(cfg)
|
|
|
|
chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
serv, err := network.NewServer(serverConfig, chain, log)
|
|
if err != nil {
|
|
return cli.NewExitError(fmt.Errorf("failed to create network server: %v", err), 1)
|
|
}
|
|
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, log)
|
|
errChan := make(chan error)
|
|
|
|
go serv.Start(errChan)
|
|
go rpcServer.Start(errChan)
|
|
|
|
fmt.Println(logo())
|
|
fmt.Println(serv.UserAgent)
|
|
fmt.Println()
|
|
|
|
var shutdownErr error
|
|
Main:
|
|
for {
|
|
select {
|
|
case err := <-errChan:
|
|
shutdownErr = errors.Wrap(err, "Error encountered by server")
|
|
cancel()
|
|
|
|
case <-grace.Done():
|
|
serv.Shutdown()
|
|
if serverErr := rpcServer.Shutdown(); serverErr != nil {
|
|
shutdownErr = errors.Wrap(serverErr, "Error encountered whilst shutting down server")
|
|
}
|
|
prometheus.ShutDown()
|
|
pprof.ShutDown()
|
|
chain.Close()
|
|
break Main
|
|
}
|
|
}
|
|
|
|
if shutdownErr != nil {
|
|
return cli.NewExitError(shutdownErr, 1)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// configureAddresses sets up addresses for RPC, Prometheus and Pprof depending from the provided config.
|
|
// In case RPC or Prometheus or Pprof Address provided each of them will use it.
|
|
// In case global Address (of the node) provided and RPC/Prometheus/Pprof don't have configured addresses they will
|
|
// use global one. So Node and RPC and Prometheus and Pprof will run on one address.
|
|
func configureAddresses(cfg config.ApplicationConfiguration) {
|
|
if cfg.Address != "" {
|
|
if cfg.RPC.Address == "" {
|
|
cfg.RPC.Address = cfg.Address
|
|
}
|
|
if cfg.Prometheus.Address == "" {
|
|
cfg.Prometheus.Address = cfg.Address
|
|
}
|
|
if cfg.Pprof.Address == "" {
|
|
cfg.Pprof.Address = cfg.Address
|
|
}
|
|
}
|
|
}
|
|
|
|
// initBlockChain initializes BlockChain with preselected DB.
|
|
func initBlockChain(cfg config.Config, log *zap.Logger) (*core.Blockchain, error) {
|
|
store, err := storage.NewStore(cfg.ApplicationConfiguration.DBConfiguration)
|
|
if err != nil {
|
|
return nil, cli.NewExitError(fmt.Errorf("could not initialize storage: %s", err), 1)
|
|
}
|
|
|
|
chain, err := core.NewBlockchain(store, cfg.ProtocolConfiguration, log)
|
|
if err != nil {
|
|
return nil, cli.NewExitError(fmt.Errorf("could not initialize blockchain: %s", err), 1)
|
|
}
|
|
if cfg.ProtocolConfiguration.AddressVersion != 0 {
|
|
address.Prefix = cfg.ProtocolConfiguration.AddressVersion
|
|
}
|
|
return chain, nil
|
|
}
|
|
|
|
func logo() string {
|
|
return `
|
|
_ ____________ __________
|
|
/ | / / ____/ __ \ / ____/ __ \
|
|
/ |/ / __/ / / / /_____/ / __/ / / /
|
|
/ /| / /___/ /_/ /_____/ /_/ / /_/ /
|
|
/_/ |_/_____/\____/ \____/\____/
|
|
`
|
|
}
|