From 9c79684516ab2e86b0e9a5378c3ac18aee0ef2ad Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 9 Jan 2020 10:38:09 +0300 Subject: [PATCH 1/8] storage: panic on error in boltdb.Seek Error in Seek means something is terribly wrong (e.g. db was not opened) and error drop is not the right thing to do, because caller will continue working with the wrong view. --- pkg/core/storage/boltdb_store.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index c0c1e5f9e..4d1d10ed3 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -7,7 +7,6 @@ import ( "github.com/CityOfZion/neo-go/pkg/io" "github.com/etcd-io/bbolt" - log "github.com/sirupsen/logrus" "github.com/syndtr/goleveldb/leveldb/util" ) @@ -109,7 +108,7 @@ func (s *BoltDBStore) Seek(key []byte, f func(k, v []byte)) { return nil }) if err != nil { - log.Error("error while executing seek in boltDB") + panic(err) } } From aecdf470e7adbc981382e524d60b82cfb9bd82b3 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 30 Dec 2019 10:43:05 +0300 Subject: [PATCH 2/8] cli,pkg: use zap.Logger --- cli/server/server.go | 60 ++++++++++++++---------- integration/performance_test.go | 6 ++- pkg/consensus/consensus.go | 13 +++--- pkg/consensus/consensus_test.go | 4 +- pkg/consensus/logger.go | 19 -------- pkg/core/blockchain.go | 70 +++++++++++++++------------- pkg/core/helper_test.go | 3 +- pkg/core/interop_neo_test.go | 15 +++--- pkg/core/interop_system.go | 6 ++- pkg/core/interops.go | 6 ++- pkg/core/interops_test.go | 3 +- pkg/network/blockqueue.go | 19 +++++--- pkg/network/blockqueue_test.go | 3 +- pkg/network/helper_test.go | 4 +- pkg/network/metrics/metrics.go | 19 +++----- pkg/network/metrics/pprof.go | 15 ++++-- pkg/network/metrics/prometheus.go | 14 ++++-- pkg/network/payload/headers.go | 2 - pkg/network/server.go | 76 +++++++++++++++---------------- pkg/network/server_config.go | 4 +- pkg/network/server_test.go | 8 ++-- pkg/network/tcp_transport.go | 14 +++--- pkg/rpc/server_helper_test.go | 5 +- 23 files changed, 208 insertions(+), 180 deletions(-) delete mode 100644 pkg/consensus/logger.go diff --git a/cli/server/server.go b/cli/server/server.go index 6ecdfdda7..a1e194c81 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -15,8 +15,9 @@ import ( "github.com/CityOfZion/neo-go/pkg/network/metrics" "github.com/CityOfZion/neo-go/pkg/rpc" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" "github.com/urfave/cli" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // NewCommands returns 'node' command. @@ -119,32 +120,40 @@ func getConfigFromContext(ctx *cli.Context) (config.Config, error) { // handleLoggingParams reads logging parameters. // If user selected debug level -- function enables it. // If logPath is configured -- function creates dir and file for logging. -func handleLoggingParams(ctx *cli.Context, cfg config.ApplicationConfiguration) error { +func handleLoggingParams(ctx *cli.Context, cfg config.ApplicationConfiguration) (*zap.Logger, error) { + level := zapcore.InfoLevel if ctx.Bool("debug") { - log.SetLevel(log.DebugLevel) + level = zapcore.DebugLevel } + cc := zap.NewProductionConfig() + cc.DisableCaller = true + cc.DisableStacktrace = true + cc.EncoderConfig.EncodeDuration = zapcore.StringDurationEncoder + cc.EncoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder + cc.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + cc.Encoding = "console" + cc.Level = zap.NewAtomicLevelAt(level) + if logPath := cfg.LogPath; logPath != "" { if err := io.MakeDirForFile(logPath, "logger"); err != nil { - return err + return nil, err } - f, err := os.Create(logPath) - if err != nil { - return err - } - log.SetOutput(f) + + cc.OutputPaths = []string{logPath} } - return nil + + return cc.Build() } -func initBCWithMetrics(cfg config.Config) (*core.Blockchain, *metrics.Service, *metrics.Service, error) { - chain, err := initBlockChain(cfg) +func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *metrics.Service, *metrics.Service, error) { + chain, err := initBlockChain(cfg, log) if err != nil { return nil, nil, nil, cli.NewExitError(err, 1) } configureAddresses(cfg.ApplicationConfiguration) - prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus) - pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof) + prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus, log) + pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof, log) go chain.Run() go prometheus.Start() @@ -158,7 +167,8 @@ func dumpDB(ctx *cli.Context) error { if err != nil { return cli.NewExitError(err, 1) } - if err := handleLoggingParams(ctx, cfg.ApplicationConfiguration); err != nil { + log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration) + if err != nil { return cli.NewExitError(err, 1) } count := uint32(ctx.Uint("count")) @@ -174,7 +184,7 @@ func dumpDB(ctx *cli.Context) error { defer outStream.Close() writer := io.NewBinWriterFromIO(outStream) - chain, prometheus, pprof, err := initBCWithMetrics(cfg) + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) if err != nil { return err } @@ -213,7 +223,8 @@ func restoreDB(ctx *cli.Context) error { if err != nil { return err } - if err := handleLoggingParams(ctx, cfg.ApplicationConfiguration); err != nil { + log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration) + if err != nil { return cli.NewExitError(err, 1) } count := uint32(ctx.Uint("count")) @@ -229,7 +240,7 @@ func restoreDB(ctx *cli.Context) error { defer inStream.Close() reader := io.NewBinReaderFromIO(inStream) - chain, prometheus, pprof, err := initBCWithMetrics(cfg) + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) if err != nil { return err } @@ -265,7 +276,7 @@ func restoreDB(ctx *cli.Context) error { if block.Index == 0 && i == 0 && skip == 0 { genesis, err := chain.GetBlock(block.Hash()) if err == nil && genesis.Index == 0 { - log.Info("skipped genesis block ", block.Hash().StringLE()) + log.Info("skipped genesis block", zap.String("hash", block.Hash().StringLE())) continue } } @@ -293,7 +304,8 @@ func startServer(ctx *cli.Context) error { if err != nil { return err } - if err := handleLoggingParams(ctx, cfg.ApplicationConfiguration); err != nil { + log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration) + if err != nil { return err } @@ -302,12 +314,12 @@ func startServer(ctx *cli.Context) error { serverConfig := network.NewServerConfig(cfg) - chain, prometheus, pprof, err := initBCWithMetrics(cfg) + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) if err != nil { return err } - server := network.NewServer(serverConfig, chain) + server := network.NewServer(serverConfig, chain, log) rpcServer := rpc.NewServer(chain, cfg.ApplicationConfiguration.RPC, server) errChan := make(chan error) @@ -364,13 +376,13 @@ func configureAddresses(cfg config.ApplicationConfiguration) { } // initBlockChain initializes BlockChain with preselected DB. -func initBlockChain(cfg config.Config) (*core.Blockchain, error) { +func initBlockChain(cfg config.Config, log *zap.Logger) (*core.Blockchain, error) { store, err := storage.NewStore(cfg.ApplicationConfiguration.DBConfiguration) if err != nil { return nil, cli.NewExitError(fmt.Errorf("could not initialize storage: %s", err), 1) } - chain, err := core.NewBlockchain(store, cfg.ProtocolConfiguration) + chain, err := core.NewBlockchain(store, cfg.ProtocolConfiguration, log) if err != nil { return nil, cli.NewExitError(fmt.Errorf("could not initialize blockchain: %s", err), 1) } diff --git a/integration/performance_test.go b/integration/performance_test.go index d81c42e58..738f660b5 100644 --- a/integration/performance_test.go +++ b/integration/performance_test.go @@ -13,6 +13,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/network" "github.com/CityOfZion/neo-go/pkg/rpc" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) // Benchmark test to measure number of processed TX. @@ -23,14 +24,15 @@ func BenchmarkTXPerformanceTest(t *testing.B) { cfg, err := config.Load(configPath, net) require.NoError(t, err, "could not load config") + logger := zaptest.NewLogger(t) memoryStore := storage.NewMemoryStore() - chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration) + chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration, logger) require.NoError(t, err, "could not create chain") go chain.Run() serverConfig := network.NewServerConfig(cfg) - server := network.NewServer(serverConfig, chain) + server := network.NewServer(serverConfig, chain, logger) data := prepareData(t) t.ResetTimer() diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 8faf2c080..054a8761f 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -60,6 +60,8 @@ type service struct { // Config is a configuration for consensus services. type Config struct { + // Logger is a logger instance. + Logger *zap.Logger // Broadcast is a callback which is called to notify server // about new consensus payload to sent. Broadcast func(p *Payload) @@ -79,19 +81,18 @@ type Config struct { // NewService returns new consensus.Service instance. func NewService(cfg Config) (Service, error) { - log, err := getLogger() - if err != nil { - return nil, err - } - if cfg.TimePerBlock <= 0 { cfg.TimePerBlock = defaultTimePerBlock } + if cfg.Logger == nil { + return nil, errors.New("empty logger") + } + srv := &service{ Config: cfg, - log: log.Sugar(), + log: cfg.Logger.Sugar(), cache: newFIFOCache(cacheMaxCapacity), txx: newFIFOCache(cacheMaxCapacity), messages: make(chan Payload, 100), diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 3d86e969a..66909fcf2 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -10,6 +10,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" "github.com/nspcc-dev/dbft/block" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) func TestNewService(t *testing.T) { @@ -128,6 +129,7 @@ func shouldNotReceive(t *testing.T, ch chan Payload) { func newTestService(t *testing.T) *service { srv, err := NewService(Config{ + Logger: zaptest.NewLogger(t), Broadcast: func(*Payload) {}, Chain: newTestChain(t), RequestTx: func(...util.Uint256) {}, @@ -177,7 +179,7 @@ func newTestChain(t *testing.T) *core.Blockchain { unitTestNetCfg, err := config.Load("../../config", config.ModeUnitTestNet) require.NoError(t, err) - chain, err := core.NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration) + chain, err := core.NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration, zaptest.NewLogger(t)) require.NoError(t, err) go chain.Run() diff --git a/pkg/consensus/logger.go b/pkg/consensus/logger.go deleted file mode 100644 index 49dba8e69..000000000 --- a/pkg/consensus/logger.go +++ /dev/null @@ -1,19 +0,0 @@ -package consensus - -import ( - "go.uber.org/zap" -) - -func getLogger() (*zap.Logger, error) { - cc := zap.NewDevelopmentConfig() - cc.DisableCaller = true - cc.DisableStacktrace = true - cc.Encoding = "console" - - log, err := cc.Build() - if err != nil { - return nil, err - } - - return log.With(zap.String("module", "dbft")), nil -} diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index a44a03386..4f8250aa7 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -21,7 +21,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/vm" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // Tuning parameters. @@ -80,13 +80,19 @@ type Blockchain struct { // cache for block verification keys. keyCache map[util.Uint160]map[string]*keys.PublicKey + + log *zap.Logger } type headersOpFunc func(headerList *HeaderHashList) // NewBlockchain returns a new blockchain object the will use the // given Store as its underlying storage. -func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockchain, error) { +func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.Logger) (*Blockchain, error) { + if log == nil { + return nil, errors.New("empty logger") + } + bc := &Blockchain{ config: cfg, dao: newDao(s), @@ -96,6 +102,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha runToExitCh: make(chan struct{}), memPool: NewMemPool(50000), keyCache: make(map[util.Uint160]map[string]*keys.PublicKey), + log: log, } if err := bc.init(); err != nil { @@ -109,7 +116,7 @@ func (bc *Blockchain) init() error { // If we could not find the version in the Store, we know that there is nothing stored. ver, err := bc.dao.GetVersion() if err != nil { - log.Infof("no storage version found! creating genesis block") + bc.log.Info("no storage version found! creating genesis block") if err = bc.dao.PutVersion(version); err != nil { return err } @@ -131,7 +138,7 @@ func (bc *Blockchain) init() error { // At this point there was no version found in the storage which // implies a creating fresh storage with the version specified // and the genesis block as first block. - log.Infof("restoring blockchain with version: %s", version) + bc.log.Info("restoring blockchain", zap.String("version", version)) bHeight, err := bc.dao.GetCurrentBlockHeight() if err != nil { @@ -200,10 +207,10 @@ func (bc *Blockchain) Run() { defer func() { persistTimer.Stop() if err := bc.persist(); err != nil { - log.Warnf("failed to persist: %s", err) + bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.store.Close(); err != nil { - log.Warnf("failed to close db: %s", err) + bc.log.Warn("failed to close db", zap.Error(err)) } close(bc.runToExitCh) }() @@ -218,7 +225,7 @@ func (bc *Blockchain) Run() { go func() { err := bc.persist() if err != nil { - log.Warnf("failed to persist blockchain: %s", err) + bc.log.Warn("failed to persist blockchain", zap.Error(err)) } }() persistTimer.Reset(persistInterval) @@ -302,11 +309,10 @@ func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) { if err = bc.dao.store.PutBatch(batch); err != nil { return } - log.WithFields(log.Fields{ - "headerIndex": headerList.Len() - 1, - "blockHeight": bc.BlockHeight(), - "took": time.Since(start), - }).Debug("done processing headers") + bc.log.Debug("done processing headers", + zap.Int("headerIndex", headerList.Len()-1), + zap.Uint32("blockHeight", bc.BlockHeight()), + zap.Duration("took", time.Since(start))) } } <-bc.headersOpDone @@ -502,7 +508,7 @@ func (bc *Blockchain) storeBlock(block *Block) error { return err } case *transaction.InvocationTX: - systemInterop := newInteropContext(trigger.Application, bc, cache.store, block, tx) + systemInterop := newInteropContext(trigger.Application, bc, cache.store, block, tx, bc.log) v := bc.spawnVMWithInterops(systemInterop) v.SetCheckedHash(tx.VerificationHash().BytesBE()) v.LoadScript(t.Script) @@ -537,11 +543,10 @@ func (bc *Blockchain) storeBlock(block *Block) error { _, _, _, _ = op, from, to, amount } } else { - log.WithFields(log.Fields{ - "tx": tx.Hash().StringLE(), - "block": block.Index, - "err": err, - }).Warn("contract invocation failed") + bc.log.Warn("contract invocation failed", + zap.String("tx", tx.Hash().StringLE()), + zap.Uint32("block", block.Index), + zap.Error(err)) } aer := &state.AppExecResult{ TxHash: tx.Hash(), @@ -710,13 +715,12 @@ func (bc *Blockchain) persist() error { if err != nil { return err } - log.WithFields(log.Fields{ - "persistedBlocks": diff, - "persistedKeys": persisted, - "headerHeight": storedHeaderHeight, - "blockHeight": bHeight, - "took": time.Since(start), - }).Info("blockchain persist completed") + bc.log.Info("blockchain persist completed", + zap.Uint32("persistedBlocks", diff), + zap.Int("persistedKeys", persisted), + zap.Uint32("headerHeight", storedHeaderHeight), + zap.Uint32("blockHeight", bHeight), + zap.Duration("took", time.Since(start))) // update monitoring metrics. updatePersistedHeightMetric(bHeight) @@ -849,7 +853,9 @@ func (bc *Blockchain) HeaderHeight() uint32 { func (bc *Blockchain) GetAssetState(assetID util.Uint256) *state.Asset { asset, err := bc.dao.GetAssetState(assetID) if asset == nil && err != storage.ErrKeyNotFound { - log.Warnf("failed to get asset state %s : %s", assetID, err) + bc.log.Warn("failed to get asset state", + zap.Stringer("asset", assetID), + zap.Error(err)) } return asset } @@ -858,7 +864,7 @@ func (bc *Blockchain) GetAssetState(assetID util.Uint256) *state.Asset { func (bc *Blockchain) GetContractState(hash util.Uint160) *state.Contract { contract, err := bc.dao.GetContractState(hash) if contract == nil && err != storage.ErrKeyNotFound { - log.Warnf("failed to get contract state: %s", err) + bc.log.Warn("failed to get contract state", zap.Error(err)) } return contract } @@ -867,7 +873,7 @@ func (bc *Blockchain) GetContractState(hash util.Uint160) *state.Contract { func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *state.Account { as, err := bc.dao.GetAccountState(scriptHash) if as == nil && err != storage.ErrKeyNotFound { - log.Warnf("failed to get account state: %s", err) + bc.log.Warn("failed to get account state", zap.Error(err)) } return as } @@ -876,7 +882,7 @@ func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *state.Account { func (bc *Blockchain) GetUnspentCoinState(hash util.Uint256) *UnspentCoinState { ucs, err := bc.dao.GetUnspentCoinState(hash) if ucs == nil && err != storage.ErrKeyNotFound { - log.Warnf("failed to get unspent coin state: %s", err) + bc.log.Warn("failed to get unspent coin state", zap.Error(err)) } return ucs } @@ -1367,7 +1373,7 @@ func (bc *Blockchain) spawnVMWithInterops(interopCtx *interopContext) *vm.VM { // GetTestVM returns a VM and a Store setup for a test run of some sort of code. func (bc *Blockchain) GetTestVM() (*vm.VM, storage.Store) { tmpStore := storage.NewMemCachedStore(bc.dao.store) - systemInterop := newInteropContext(trigger.Application, bc, tmpStore, nil, nil) + systemInterop := newInteropContext(trigger.Application, bc, tmpStore, nil, nil, bc.log) vm := bc.spawnVMWithInterops(systemInterop) return vm, tmpStore } @@ -1446,7 +1452,7 @@ func (bc *Blockchain) verifyTxWitnesses(t *transaction.Transaction, block *Block } sort.Slice(hashes, func(i, j int) bool { return hashes[i].Less(hashes[j]) }) sort.Slice(witnesses, func(i, j int) bool { return witnesses[i].ScriptHash().Less(witnesses[j].ScriptHash()) }) - interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, block, t) + interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, block, t, bc.log) for i := 0; i < len(hashes); i++ { err := bc.verifyHashAgainstScript(hashes[i], &witnesses[i], t.VerificationHash(), interopCtx, false) if err != nil { @@ -1466,7 +1472,7 @@ func (bc *Blockchain) verifyBlockWitnesses(block *Block, prevHeader *Header) err } else { hash = prevHeader.NextConsensus } - interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, nil, nil) + interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, nil, nil, bc.log) return bc.verifyHashAgainstScript(hash, &block.Script, block.VerificationHash(), interopCtx, true) } diff --git a/pkg/core/helper_test.go b/pkg/core/helper_test.go index 132dc5087..f750af78f 100644 --- a/pkg/core/helper_test.go +++ b/pkg/core/helper_test.go @@ -17,6 +17,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/vm/opcode" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) var newBlockPrevHash util.Uint256 @@ -37,7 +38,7 @@ func newTestChain(t *testing.T) *Blockchain { if err != nil { t.Fatal(err) } - chain, err := NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration) + chain, err := NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration, zaptest.NewLogger(t)) if err != nil { t.Fatal(err) } diff --git a/pkg/core/interop_neo_test.go b/pkg/core/interop_neo_test.go index 9498d414c..5793d0598 100644 --- a/pkg/core/interop_neo_test.go +++ b/pkg/core/interop_neo_test.go @@ -15,6 +15,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/vm" "github.com/CityOfZion/neo-go/pkg/vm/opcode" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) /* Missing tests: @@ -112,7 +113,7 @@ func TestHeaderGetVersion(t *testing.T) { func TestHeaderGetVersion_Negative(t *testing.T) { v := vm.New() block := newDumbBlock() - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil) + context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil, zaptest.NewLogger(t)) v.Estack().PushVal(vm.NewBoolItem(false)) err := context.headerGetVersion(v) @@ -197,7 +198,7 @@ func TestWitnessGetVerificationScript(t *testing.T) { script := []byte{byte(opcode.PUSHM1), byte(opcode.RET)} witness := transaction.Witness{InvocationScript: nil, VerificationScript: script} - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) + context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t)) v.Estack().PushVal(vm.NewInteropItem(&witness)) err := context.witnessGetVerificationScript(v) require.NoError(t, err) @@ -418,7 +419,7 @@ func TestAssetGetPrecision(t *testing.T) { func createVMAndPushBlock(t *testing.T) (*vm.VM, *Block, *interopContext) { v := vm.New() block := newDumbBlock() - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil) + context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil, zaptest.NewLogger(t)) v.Estack().PushVal(vm.NewInteropItem(block)) return v, block, context } @@ -447,7 +448,7 @@ func createVMAndAssetState(t *testing.T) (*vm.VM, *state.Asset, *interopContext) IsFrozen: false, } - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) + context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t)) return v, assetState, context } @@ -465,7 +466,7 @@ func createVMAndContractState(t *testing.T) (*vm.VM, *state.Contract, *interopCo Description: random.String(10), } - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) + context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t)) return v, contractState, context } @@ -479,7 +480,7 @@ func createVMAndAccState(t *testing.T) (*vm.VM, *state.Account, *interopContext) accountState.Votes = []*keys.PublicKey{key} require.NoError(t, err) - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) + context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t)) return v, accountState, context } @@ -509,6 +510,6 @@ func createVMAndTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopCont tx.Attributes = attributes tx.Inputs = inputs tx.Outputs = outputs - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, tx) + context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, tx, zaptest.NewLogger(t)) return v, tx, context } diff --git a/pkg/core/interop_system.go b/pkg/core/interop_system.go index 91f919d32..b96b5a0ec 100644 --- a/pkg/core/interop_system.go +++ b/pkg/core/interop_system.go @@ -13,7 +13,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/vm" gherr "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -350,7 +350,9 @@ func (ic *interopContext) runtimeNotify(v *vm.VM) error { // runtimeLog logs the message passed. func (ic *interopContext) runtimeLog(v *vm.VM) error { msg := fmt.Sprintf("%q", v.Estack().Pop().Bytes()) - log.Infof("script %s logs: %s", getContextScriptHash(v, 0), msg) + ic.log.Info("runtime log", + zap.Stringer("script", getContextScriptHash(v, 0)), + zap.String("logs", msg)) return nil } diff --git a/pkg/core/interops.go b/pkg/core/interops.go index 3e9b32941..bf33b347d 100644 --- a/pkg/core/interops.go +++ b/pkg/core/interops.go @@ -14,6 +14,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/vm" + "go.uber.org/zap" ) type interopContext struct { @@ -23,12 +24,13 @@ type interopContext struct { tx *transaction.Transaction dao *cachedDao notifications []state.NotificationEvent + log *zap.Logger } -func newInteropContext(trigger byte, bc Blockchainer, s storage.Store, block *Block, tx *transaction.Transaction) *interopContext { +func newInteropContext(trigger byte, bc Blockchainer, s storage.Store, block *Block, tx *transaction.Transaction, log *zap.Logger) *interopContext { dao := newCachedDao(s) nes := make([]state.NotificationEvent, 0) - return &interopContext{bc, trigger, block, tx, dao, nes} + return &interopContext{bc, trigger, block, tx, dao, nes, log} } // interopedFunction binds function name, id with the function itself and price, diff --git a/pkg/core/interops_test.go b/pkg/core/interops_test.go index 490a8402b..e6376b3ab 100644 --- a/pkg/core/interops_test.go +++ b/pkg/core/interops_test.go @@ -9,12 +9,13 @@ import ( "github.com/CityOfZion/neo-go/pkg/smartcontract/trigger" "github.com/CityOfZion/neo-go/pkg/vm" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) func testNonInterop(t *testing.T, value interface{}, f func(*interopContext, *vm.VM) error) { v := vm.New() v.Estack().PushVal(value) - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) + context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t)) require.Error(t, f(context, v)) } diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index 175aa7e7d..bbd9f176a 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -3,17 +3,23 @@ package network import ( "github.com/CityOfZion/neo-go/pkg/core" "github.com/Workiva/go-datastructures/queue" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) type blockQueue struct { + log *zap.Logger queue *queue.PriorityQueue checkBlocks chan struct{} chain core.Blockchainer } -func newBlockQueue(capacity int, bc core.Blockchainer) *blockQueue { +func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQueue { + if log == nil { + return nil + } + return &blockQueue{ + log: log, queue: queue.NewPriorityQueue(capacity, false), checkBlocks: make(chan struct{}, 1), chain: bc, @@ -38,11 +44,10 @@ func (bq *blockQueue) run() { if minblock.Index == bq.chain.BlockHeight()+1 { err := bq.chain.AddBlock(minblock) if err != nil { - log.WithFields(log.Fields{ - "error": err.Error(), - "blockHeight": bq.chain.BlockHeight(), - "nextIndex": minblock.Index, - }).Warn("blockQueue: failed adding block into the blockchain") + bq.log.Warn("blockQueue: failed adding block into the blockchain", + zap.String("error", err.Error()), + zap.Uint32("blockHeight", bq.chain.BlockHeight()), + zap.Uint32("nextIndex", minblock.Index)) } } } else { diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index da4124c06..f30b47a0a 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -6,12 +6,13 @@ import ( "github.com/CityOfZion/neo-go/pkg/core" "github.com/stretchr/testify/assert" + "go.uber.org/zap/zaptest" ) func TestBlockQueue(t *testing.T) { chain := &testChain{} // notice, it's not yet running - bq := newBlockQueue(0, chain) + bq := newBlockQueue(0, chain, zaptest.NewLogger(t)) blocks := make([]*core.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &core.Block{BlockBase: core.BlockBase{Index: uint32(i)}} diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index da2039650..5fdc86e5f 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -16,6 +16,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/vm" + "go.uber.org/zap/zaptest" ) type testChain struct { @@ -203,7 +204,7 @@ func (p *localPeer) Handshaked() bool { return p.handshaked } -func newTestServer() *Server { +func newTestServer(t *testing.T) *Server { return &Server{ ServerConfig: ServerConfig{}, chain: &testChain{}, @@ -214,6 +215,7 @@ func newTestServer() *Server { register: make(chan Peer), unregister: make(chan peerDrop), peers: make(map[Peer]bool), + log: zaptest.NewLogger(t), } } diff --git a/pkg/network/metrics/metrics.go b/pkg/network/metrics/metrics.go index ae528990e..15a711618 100644 --- a/pkg/network/metrics/metrics.go +++ b/pkg/network/metrics/metrics.go @@ -4,13 +4,14 @@ import ( "context" "net/http" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // Service serves metrics. type Service struct { *http.Server config Config + log *zap.Logger serviceType string } @@ -24,27 +25,21 @@ type Config struct { // Start runs http service with exposed endpoint on configured port. func (ms *Service) Start() { if ms.config.Enabled { - log.WithFields(log.Fields{ - "endpoint": ms.Addr, - "service": ms.serviceType, - }).Info("service running") + ms.log.Info("service is running", zap.String("endpoint", ms.Addr)) err := ms.ListenAndServe() if err != nil && err != http.ErrServerClosed { - log.Warnf("%s service couldn't start on configured port", ms.serviceType) + ms.log.Warn("service couldn't start on configured port") } } else { - log.Infof("%s service hasn't started since it's disabled", ms.serviceType) + ms.log.Info("service hasn't started since it's disabled") } } // ShutDown stops service. func (ms *Service) ShutDown() { - log.WithFields(log.Fields{ - "endpoint": ms.Addr, - "service": ms.serviceType, - }).Info("shutting down service") + ms.log.Info("shutting down service", zap.String("endpoint", ms.Addr)) err := ms.Shutdown(context.Background()) if err != nil { - log.Fatalf("can't shut down %s service", ms.serviceType) + ms.log.Panic("can't shut down service") } } diff --git a/pkg/network/metrics/pprof.go b/pkg/network/metrics/pprof.go index f41c560e6..13b853476 100644 --- a/pkg/network/metrics/pprof.go +++ b/pkg/network/metrics/pprof.go @@ -3,13 +3,19 @@ package metrics import ( "net/http" "net/http/pprof" + + "go.uber.org/zap" ) // PprofService https://golang.org/pkg/net/http/pprof/. type PprofService Service // NewPprofService created new service for gathering pprof metrics. -func NewPprofService(cfg Config) *Service { +func NewPprofService(cfg Config, log *zap.Logger) *Service { + if log == nil { + return nil + } + handler := http.NewServeMux() handler.HandleFunc("/debug/pprof/", pprof.Index) handler.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) @@ -18,9 +24,12 @@ func NewPprofService(cfg Config) *Service { handler.HandleFunc("/debug/pprof/trace", pprof.Trace) return &Service{ - &http.Server{ + Server: &http.Server{ Addr: cfg.Address + ":" + cfg.Port, Handler: handler, - }, cfg, "Pprof", + }, + config: cfg, + serviceType: "Pprof", + log: log.With(zap.String("service", "Pprof")), } } diff --git a/pkg/network/metrics/prometheus.go b/pkg/network/metrics/prometheus.go index 25cf80281..9bfe54205 100644 --- a/pkg/network/metrics/prometheus.go +++ b/pkg/network/metrics/prometheus.go @@ -4,17 +4,25 @@ import ( "net/http" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" ) // PrometheusService https://prometheus.io/docs/guides/go-application. type PrometheusService Service // NewPrometheusService creates new service for gathering prometheus metrics. -func NewPrometheusService(cfg Config) *Service { +func NewPrometheusService(cfg Config, log *zap.Logger) *Service { + if log == nil { + return nil + } + return &Service{ - &http.Server{ + Server: &http.Server{ Addr: cfg.Address + ":" + cfg.Port, Handler: promhttp.Handler(), - }, cfg, "Prometheus", + }, + config: cfg, + serviceType: "Prometheus", + log: log.With(zap.String("service", "Prometheus")), } } diff --git a/pkg/network/payload/headers.go b/pkg/network/payload/headers.go index 935bd05de..33f7b8e53 100644 --- a/pkg/network/payload/headers.go +++ b/pkg/network/payload/headers.go @@ -3,7 +3,6 @@ package payload import ( "github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/io" - log "github.com/sirupsen/logrus" ) // Headers payload. @@ -22,7 +21,6 @@ func (p *Headers) DecodeBinary(br *io.BinReader) { // C# node does it silently if lenHeaders > MaxHeadersAllowed { - log.Warnf("received %d headers, capping to %d", lenHeaders, MaxHeadersAllowed) lenHeaders = MaxHeadersAllowed } diff --git a/pkg/network/server.go b/pkg/network/server.go index a9229d790..a8812b474 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -15,8 +15,8 @@ import ( "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/util" - log "github.com/sirupsen/logrus" "go.uber.org/atomic" + "go.uber.org/zap" ) const ( @@ -65,6 +65,8 @@ type ( quit chan struct{} connected *atomic.Bool + + log *zap.Logger } peerDrop struct { @@ -80,11 +82,15 @@ func randomID() uint32 { } // NewServer returns a new Server, initialized with the given configuration. -func NewServer(config ServerConfig, chain core.Blockchainer) *Server { +func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) *Server { + if log == nil { + return nil + } + s := &Server{ ServerConfig: config, chain: chain, - bQueue: newBlockQueue(maxBlockBatch, chain), + bQueue: newBlockQueue(maxBlockBatch, chain, log), id: randomID(), quit: make(chan struct{}), addrReq: make(chan *Message, config.MinPeers), @@ -92,9 +98,11 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server { unregister: make(chan peerDrop), peers: make(map[Peer]bool), connected: atomic.NewBool(false), + log: log, } srv, err := consensus.NewService(consensus.Config{ + Logger: log, Broadcast: s.handleNewPayload, RelayBlock: s.relayBlock, Chain: chain, @@ -108,30 +116,27 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server { s.consensus = srv if s.MinPeers <= 0 { - log.WithFields(log.Fields{ - "MinPeers configured": s.MinPeers, - "MinPeers actual": defaultMinPeers, - }).Info("bad MinPeers configured, using the default value") + s.log.Info("bad MinPeers configured, using the default value", + zap.Int("configured", s.MinPeers), + zap.Int("actual", defaultMinPeers)) s.MinPeers = defaultMinPeers } if s.MaxPeers <= 0 { - log.WithFields(log.Fields{ - "MaxPeers configured": s.MaxPeers, - "MaxPeers actual": defaultMaxPeers, - }).Info("bad MaxPeers configured, using the default value") + s.log.Info("bad MaxPeers configured, using the default value", + zap.Int("configured", s.MaxPeers), + zap.Int("actual", defaultMaxPeers)) s.MaxPeers = defaultMaxPeers } if s.AttemptConnPeers <= 0 { - log.WithFields(log.Fields{ - "AttemptConnPeers configured": s.AttemptConnPeers, - "AttemptConnPeers actual": defaultAttemptConnPeers, - }).Info("bad AttemptConnPeers configured, using the default value") + s.log.Info("bad AttemptConnPeers configured, using the default value", + zap.Int("configured", s.AttemptConnPeers), + zap.Int("actual", defaultAttemptConnPeers)) s.AttemptConnPeers = defaultAttemptConnPeers } - s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port)) + s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port), s.log) s.discovery = NewDefaultDiscovery( s.DialTimeout, s.transport, @@ -147,10 +152,9 @@ func (s *Server) ID() uint32 { // Start will start the server and its underlying transport. func (s *Server) Start(errChan chan error) { - log.WithFields(log.Fields{ - "blockHeight": s.chain.BlockHeight(), - "headerHeight": s.chain.HeaderHeight(), - }).Info("node started") + s.log.Info("node started", + zap.Uint32("blockHeight", s.chain.BlockHeight()), + zap.Uint32("headerHeight", s.chain.HeaderHeight())) s.discovery.BackFill(s.Seeds...) @@ -162,9 +166,7 @@ func (s *Server) Start(errChan chan error) { // Shutdown disconnects all peers and stops listening. func (s *Server) Shutdown() { - log.WithFields(log.Fields{ - "peers": s.PeerCount(), - }).Info("shutting down server") + s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.bQueue.discard() close(s.quit) } @@ -205,9 +207,7 @@ func (s *Server) run() { s.lock.Lock() s.peers[p] = true s.lock.Unlock() - log.WithFields(log.Fields{ - "addr": p.RemoteAddr(), - }).Info("new peer connected") + s.log.Info("new peer connected", zap.Stringer("addr", p.RemoteAddr())) peerCount := s.PeerCount() if peerCount > s.MaxPeers { s.lock.RLock() @@ -225,11 +225,10 @@ func (s *Server) run() { if s.peers[drop.peer] { delete(s.peers, drop.peer) s.lock.Unlock() - log.WithFields(log.Fields{ - "addr": drop.peer.RemoteAddr(), - "reason": drop.reason, - "peerCount": s.PeerCount(), - }).Warn("peer disconnected") + s.log.Warn("peer disconnected", + zap.Stringer("addr", drop.peer.RemoteAddr()), + zap.String("reason", drop.reason.Error()), + zap.Int("peerCount", s.PeerCount())) addr := drop.peer.PeerAddr().String() if drop.reason == errIdenticalID { s.discovery.RegisterBadAddr(addr) @@ -254,7 +253,7 @@ func (s *Server) tryStartConsensus() { } if s.HandshakedPeersCount() >= s.MinPeers { - log.Info("minimum amount of peers were connected to") + s.log.Info("minimum amount of peers were connected to") if s.connected.CAS(false, true) { s.consensus.Start() } @@ -304,12 +303,11 @@ func (s *Server) HandshakedPeersCount() int { func (s *Server) startProtocol(p Peer) { var err error - log.WithFields(log.Fields{ - "addr": p.RemoteAddr(), - "userAgent": string(p.Version().UserAgent), - "startHeight": p.Version().StartHeight, - "id": p.Version().Nonce, - }).Info("started protocol") + s.log.Info("started protocol", + zap.Stringer("addr", p.RemoteAddr()), + zap.ByteString("userAgent", p.Version().UserAgent), + zap.Uint32("startHeight", p.Version().StartHeight), + zap.Uint32("id", p.Version().Nonce)) s.discovery.RegisterGoodAddr(p.PeerAddr().String()) if s.chain.HeaderHeight() < p.Version().StartHeight { @@ -386,7 +384,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // This method could best be called in a separate routine. func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) { if err := s.chain.AddHeaders(headers.Hdrs...); err != nil { - log.Warnf("failed processing headers: %s", err) + s.log.Warn("failed processing headers", zap.Error(err)) return } // The peer will respond with a maximum of 2000 headers in one batch. diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index b51daf0e1..524557d3e 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -4,7 +4,7 @@ import ( "time" "github.com/CityOfZion/neo-go/config" - log "github.com/sirupsen/logrus" + "go.uber.org/zap/zapcore" ) type ( @@ -53,7 +53,7 @@ type ( ProtoTickInterval time.Duration // Level of the internal logger. - LogLevel log.Level + LogLevel zapcore.Level // Wallet is a wallet configuration. Wallet *config.WalletConfig diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index bcde70b8c..39f2caedc 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -11,7 +11,7 @@ import ( func TestSendVersion(t *testing.T) { var ( - s = newTestServer() + s = newTestServer(t) p = newLocalPeer(t) ) s.Port = 3000 @@ -37,7 +37,7 @@ func TestSendVersion(t *testing.T) { // Server should reply with a verack after receiving a valid version. func TestVerackAfterHandleVersionCmd(t *testing.T) { var ( - s = newTestServer() + s = newTestServer(t) p = newLocalPeer(t) ) na, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:3000") @@ -58,7 +58,7 @@ func TestVerackAfterHandleVersionCmd(t *testing.T) { // invalid version and disconnects the peer. func TestServerNotSendsVerack(t *testing.T) { var ( - s = newTestServer() + s = newTestServer(t) p = newLocalPeer(t) p2 = newLocalPeer(t) ) @@ -91,7 +91,7 @@ func TestServerNotSendsVerack(t *testing.T) { func TestRequestHeaders(t *testing.T) { var ( - s = newTestServer() + s = newTestServer(t) p = newLocalPeer(t) ) p.messageHandler = func(t *testing.T, msg *Message) { diff --git a/pkg/network/tcp_transport.go b/pkg/network/tcp_transport.go index 654515758..f41015f53 100644 --- a/pkg/network/tcp_transport.go +++ b/pkg/network/tcp_transport.go @@ -6,11 +6,12 @@ import ( "time" "github.com/CityOfZion/neo-go/pkg/io" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // TCPTransport allows network communication over TCP. type TCPTransport struct { + log *zap.Logger server *Server listener net.Listener bindAddr string @@ -20,8 +21,9 @@ var reClosedNetwork = regexp.MustCompile(".* use of closed network connection") // NewTCPTransport returns a new TCPTransport that will listen for // new incoming peer connections. -func NewTCPTransport(s *Server, bindAddr string) *TCPTransport { +func NewTCPTransport(s *Server, bindAddr string, log *zap.Logger) *TCPTransport { return &TCPTransport{ + log: log, server: s, bindAddr: bindAddr, } @@ -41,7 +43,7 @@ func (t *TCPTransport) Dial(addr string, timeout time.Duration) error { func (t *TCPTransport) Accept() { l, err := net.Listen("tcp", t.bindAddr) if err != nil { - log.Fatalf("TCP listen error %s", err) + t.log.Panic("TCP listen error", zap.Error(err)) return } @@ -50,7 +52,7 @@ func (t *TCPTransport) Accept() { for { conn, err := l.Accept() if err != nil { - log.Warnf("TCP accept error: %s", err) + t.log.Warn("TCP accept error", zap.Error(err)) if t.isCloseError(err) { break } @@ -80,9 +82,7 @@ func (t *TCPTransport) handleConn(conn net.Conn) { // When a new peer is connected we send out our version immediately. if err := t.server.sendVersion(p); err != nil { - log.WithFields(log.Fields{ - "addr": p.RemoteAddr(), - }).Error(err) + t.log.Error("error on sendVersion", zap.Stringer("addr", p.RemoteAddr()), zap.Error(err)) } r := io.NewBinReaderFromIO(p.conn) for { diff --git a/pkg/rpc/server_helper_test.go b/pkg/rpc/server_helper_test.go index c1091d8d2..437c39b8b 100644 --- a/pkg/rpc/server_helper_test.go +++ b/pkg/rpc/server_helper_test.go @@ -13,6 +13,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/rpc/result" "github.com/CityOfZion/neo-go/pkg/rpc/wrappers" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) // ErrorResponse struct represents JSON-RPC error. @@ -180,7 +181,7 @@ func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFu require.NoError(t, err, "could not load config") memoryStore := storage.NewMemoryStore() - chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration) + chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration, zaptest.NewLogger(t)) require.NoError(t, err, "could not create chain") go chain.Run() @@ -198,7 +199,7 @@ func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFu } serverConfig := network.NewServerConfig(cfg) - server := network.NewServer(serverConfig, chain) + server := network.NewServer(serverConfig, chain, zaptest.NewLogger(t)) rpcServer := NewServer(chain, cfg.ApplicationConfiguration.RPC, server) handler := http.HandlerFunc(rpcServer.requestHandler) From 289cb1c1d9aa7a05f5f6dac1ec59591775170875 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 30 Dec 2019 11:44:52 +0300 Subject: [PATCH 3/8] rpc: use zap.Logger --- cli/server/server.go | 2 +- go.mod | 1 - pkg/rpc/request.go | 39 +++++++++++++++++------------------ pkg/rpc/server.go | 34 ++++++++++++++---------------- pkg/rpc/server_helper_test.go | 7 ++++--- 5 files changed, 40 insertions(+), 43 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index a1e194c81..87cc30a02 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -320,7 +320,7 @@ func startServer(ctx *cli.Context) error { } server := network.NewServer(serverConfig, chain, log) - rpcServer := rpc.NewServer(chain, cfg.ApplicationConfiguration.RPC, server) + rpcServer := rpc.NewServer(chain, cfg.ApplicationConfiguration.RPC, server, log) errChan := make(chan error) go server.Start(errChan) diff --git a/go.mod b/go.mod index 6121dd82e..bcd5cb4c6 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/nspcc-dev/rfc6979 v0.1.0 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.2.1 - github.com/sirupsen/logrus v1.4.2 github.com/stretchr/testify v1.4.0 github.com/syndtr/goleveldb v0.0.0-20180307113352-169b1b37be73 github.com/urfave/cli v1.20.0 diff --git a/pkg/rpc/request.go b/pkg/rpc/request.go index ddd2bb816..af8daf6c0 100644 --- a/pkg/rpc/request.go +++ b/pkg/rpc/request.go @@ -6,7 +6,7 @@ import ( "net/http" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -73,7 +73,7 @@ func (r *Request) Params() (*Params, error) { } // WriteErrorResponse writes an error response to the ResponseWriter. -func (r Request) WriteErrorResponse(w http.ResponseWriter, err error) { +func (s *Server) WriteErrorResponse(r *Request, w http.ResponseWriter, err error) { jsonErr, ok := err.(*Error) if !ok { jsonErr = NewInternalServerError("Internal server error", err) @@ -85,32 +85,34 @@ func (r Request) WriteErrorResponse(w http.ResponseWriter, err error) { ID: r.RawID, } - logFields := log.Fields{ - "err": jsonErr.Cause, - "method": r.Method, - } - params, err := r.Params() - if err == nil { - logFields["params"] = *params + logFields := []zap.Field{ + zap.Error(jsonErr.Cause), + zap.String("method", r.Method), } - log.WithFields(logFields).Error("Error encountered with rpc request") + params, err := r.Params() + if err == nil { + logFields = append(logFields, zap.Any("params", params)) + } + + s.log.Error("Error encountered with rpc request", logFields...) + w.WriteHeader(jsonErr.HTTPCode) - r.writeServerResponse(w, response) + s.writeServerResponse(r, w, response) } // WriteResponse encodes the response and writes it to the ResponseWriter. -func (r Request) WriteResponse(w http.ResponseWriter, result interface{}) { +func (s *Server) WriteResponse(r *Request, w http.ResponseWriter, result interface{}) { response := Response{ JSONRPC: r.JSONRPC, Result: result, ID: r.RawID, } - r.writeServerResponse(w, response) + s.writeServerResponse(r, w, response) } -func (r Request) writeServerResponse(w http.ResponseWriter, response Response) { +func (s *Server) writeServerResponse(r *Request, w http.ResponseWriter, response Response) { w.Header().Set("Content-Type", "application/json; charset=utf-8") if r.enableCORSWorkaround { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -120,12 +122,9 @@ func (r Request) writeServerResponse(w http.ResponseWriter, response Response) { encoder := json.NewEncoder(w) err := encoder.Encode(response) - logFields := log.Fields{ - "err": err, - "method": r.Method, - } - if err != nil { - log.WithFields(logFields).Error("Error encountered while encoding response") + s.log.Error("Error encountered while encoding response", + zap.String("err", err.Error()), + zap.String("method", r.Method)) } } diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index e9d30c8a0..79af22f8d 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -16,7 +16,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/rpc/wrappers" "github.com/CityOfZion/neo-go/pkg/util" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) type ( @@ -26,6 +26,7 @@ type ( chain core.Blockchainer config config.RPCConfig coreServer *network.Server + log *zap.Logger } ) @@ -34,7 +35,7 @@ var invalidBlockHeightError = func(index int, height int) error { } // NewServer creates a new Server struct. -func NewServer(chain core.Blockchainer, conf config.RPCConfig, coreServer *network.Server) Server { +func NewServer(chain core.Blockchainer, conf config.RPCConfig, coreServer *network.Server, log *zap.Logger) Server { httpServer := &http.Server{ Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10), } @@ -44,6 +45,7 @@ func NewServer(chain core.Blockchainer, conf config.RPCConfig, coreServer *netwo chain: chain, config: conf, coreServer: coreServer, + log: log, } } @@ -51,13 +53,11 @@ func NewServer(chain core.Blockchainer, conf config.RPCConfig, coreServer *netwo // listening on the configured port. func (s *Server) Start(errChan chan error) { if !s.config.Enabled { - log.Info("RPC server is not enabled") + s.log.Info("RPC server is not enabled") return } s.Handler = http.HandlerFunc(s.requestHandler) - log.WithFields(log.Fields{ - "endpoint": s.Addr, - }).Info("starting rpc-server") + s.log.Info("starting rpc-server", zap.String("endpoint", s.Addr)) errChan <- s.ListenAndServe() } @@ -65,9 +65,7 @@ func (s *Server) Start(errChan chan error) { // Shutdown overrides the http.Server Shutdown // method. func (s *Server) Shutdown() error { - log.WithFields(log.Fields{ - "endpoint": s.Addr, - }).Info("shutting down rpc-server") + s.log.Info("shutting down rpc-server", zap.String("endpoint", s.Addr)) return s.Server.Shutdown(context.Background()) } @@ -75,7 +73,8 @@ func (s *Server) requestHandler(w http.ResponseWriter, httpRequest *http.Request req := NewRequest(s.config.EnableCORSWorkaround) if httpRequest.Method != "POST" { - req.WriteErrorResponse( + s.WriteErrorResponse( + req, w, NewInvalidParamsError( fmt.Sprintf("Invalid method '%s', please retry with 'POST'", httpRequest.Method), nil, @@ -86,13 +85,13 @@ func (s *Server) requestHandler(w http.ResponseWriter, httpRequest *http.Request err := req.DecodeData(httpRequest.Body) if err != nil { - req.WriteErrorResponse(w, NewParseError("Problem parsing JSON-RPC request body", err)) + s.WriteErrorResponse(req, w, NewParseError("Problem parsing JSON-RPC request body", err)) return } reqParams, err := req.Params() if err != nil { - req.WriteErrorResponse(w, NewInvalidParamsError("Problem parsing request parameters", err)) + s.WriteErrorResponse(req, w, NewInvalidParamsError("Problem parsing request parameters", err)) return } @@ -100,10 +99,9 @@ func (s *Server) requestHandler(w http.ResponseWriter, httpRequest *http.Request } func (s *Server) methodHandler(w http.ResponseWriter, req *Request, reqParams Params) { - log.WithFields(log.Fields{ - "method": req.Method, - "params": fmt.Sprintf("%v", reqParams), - }).Info("processing rpc request") + s.log.Info("processing rpc request", + zap.String("method", req.Method), + zap.String("params", fmt.Sprintf("%v", reqParams))) var ( results interface{} @@ -268,11 +266,11 @@ Methods: } if resultsErr != nil { - req.WriteErrorResponse(w, resultsErr) + s.WriteErrorResponse(req, w, resultsErr) return } - req.WriteResponse(w, results) + s.WriteResponse(req, w, results) } func (s *Server) getrawtransaction(reqParams Params) (interface{}, error) { diff --git a/pkg/rpc/server_helper_test.go b/pkg/rpc/server_helper_test.go index 437c39b8b..5b0590f69 100644 --- a/pkg/rpc/server_helper_test.go +++ b/pkg/rpc/server_helper_test.go @@ -181,7 +181,8 @@ func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFu require.NoError(t, err, "could not load config") memoryStore := storage.NewMemoryStore() - chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration, zaptest.NewLogger(t)) + logger := zaptest.NewLogger(t) + chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration, logger) require.NoError(t, err, "could not create chain") go chain.Run() @@ -199,8 +200,8 @@ func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFu } serverConfig := network.NewServerConfig(cfg) - server := network.NewServer(serverConfig, chain, zaptest.NewLogger(t)) - rpcServer := NewServer(chain, cfg.ApplicationConfiguration.RPC, server) + server := network.NewServer(serverConfig, chain, logger) + rpcServer := NewServer(chain, cfg.ApplicationConfiguration.RPC, server, logger) handler := http.HandlerFunc(rpcServer.requestHandler) return chain, handler From 45a4524054b1c70892bb69d30d21e74ae962d40d Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 30 Dec 2019 13:42:33 +0300 Subject: [PATCH 4/8] rpc: remove EnableCORSWorkaround from Request --- pkg/rpc/request.go | 16 +++++++--------- pkg/rpc/server.go | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pkg/rpc/request.go b/pkg/rpc/request.go index af8daf6c0..5a3c21b9a 100644 --- a/pkg/rpc/request.go +++ b/pkg/rpc/request.go @@ -17,11 +17,10 @@ type ( // Request represents a standard JSON-RPC 2.0 // request: http://www.jsonrpc.org/specification#request_object. Request struct { - JSONRPC string `json:"jsonrpc"` - Method string `json:"method"` - RawParams json.RawMessage `json:"params,omitempty"` - RawID json.RawMessage `json:"id,omitempty"` - enableCORSWorkaround bool + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + RawParams json.RawMessage `json:"params,omitempty"` + RawID json.RawMessage `json:"id,omitempty"` } // Response represents a standard JSON-RPC 2.0 @@ -35,10 +34,9 @@ type ( ) // NewRequest creates a new Request struct. -func NewRequest(corsWorkaround bool) *Request { +func NewRequest() *Request { return &Request{ - JSONRPC: jsonRPCVersion, - enableCORSWorkaround: corsWorkaround, + JSONRPC: jsonRPCVersion, } } @@ -114,7 +112,7 @@ func (s *Server) WriteResponse(r *Request, w http.ResponseWriter, result interfa func (s *Server) writeServerResponse(r *Request, w http.ResponseWriter, response Response) { w.Header().Set("Content-Type", "application/json; charset=utf-8") - if r.enableCORSWorkaround { + if s.config.EnableCORSWorkaround { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Access-Control-Allow-Headers, Authorization, X-Requested-With") } diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index 79af22f8d..83c46e52d 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -70,7 +70,7 @@ func (s *Server) Shutdown() error { } func (s *Server) requestHandler(w http.ResponseWriter, httpRequest *http.Request) { - req := NewRequest(s.config.EnableCORSWorkaround) + req := NewRequest() if httpRequest.Method != "POST" { s.WriteErrorResponse( From 637c99eda7d006aa8ca70286d1c222cd9a58990b Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 30 Dec 2019 14:01:49 +0300 Subject: [PATCH 5/8] core: simplify newInteropContext --- pkg/core/blockchain.go | 12 ++++++++---- pkg/core/interop_neo_test.go | 15 +++++++-------- pkg/core/interops_test.go | 3 +-- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 4f8250aa7..69cb4f9b8 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -508,7 +508,7 @@ func (bc *Blockchain) storeBlock(block *Block) error { return err } case *transaction.InvocationTX: - systemInterop := newInteropContext(trigger.Application, bc, cache.store, block, tx, bc.log) + systemInterop := bc.newInteropContext(trigger.Application, cache.store, block, tx) v := bc.spawnVMWithInterops(systemInterop) v.SetCheckedHash(tx.VerificationHash().BytesBE()) v.LoadScript(t.Script) @@ -1373,7 +1373,7 @@ func (bc *Blockchain) spawnVMWithInterops(interopCtx *interopContext) *vm.VM { // GetTestVM returns a VM and a Store setup for a test run of some sort of code. func (bc *Blockchain) GetTestVM() (*vm.VM, storage.Store) { tmpStore := storage.NewMemCachedStore(bc.dao.store) - systemInterop := newInteropContext(trigger.Application, bc, tmpStore, nil, nil, bc.log) + systemInterop := bc.newInteropContext(trigger.Application, tmpStore, nil, nil) vm := bc.spawnVMWithInterops(systemInterop) return vm, tmpStore } @@ -1452,7 +1452,7 @@ func (bc *Blockchain) verifyTxWitnesses(t *transaction.Transaction, block *Block } sort.Slice(hashes, func(i, j int) bool { return hashes[i].Less(hashes[j]) }) sort.Slice(witnesses, func(i, j int) bool { return witnesses[i].ScriptHash().Less(witnesses[j].ScriptHash()) }) - interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, block, t, bc.log) + interopCtx := bc.newInteropContext(trigger.Verification, bc.dao.store, block, t) for i := 0; i < len(hashes); i++ { err := bc.verifyHashAgainstScript(hashes[i], &witnesses[i], t.VerificationHash(), interopCtx, false) if err != nil { @@ -1472,7 +1472,7 @@ func (bc *Blockchain) verifyBlockWitnesses(block *Block, prevHeader *Header) err } else { hash = prevHeader.NextConsensus } - interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, nil, nil, bc.log) + interopCtx := bc.newInteropContext(trigger.Verification, bc.dao.store, nil, nil) return bc.verifyHashAgainstScript(hash, &block.Script, block.VerificationHash(), interopCtx, true) } @@ -1486,3 +1486,7 @@ func hashAndIndexToBytes(h util.Uint256, index uint32) []byte { func (bc *Blockchain) secondsPerBlock() int { return bc.config.SecondsPerBlock } + +func (bc *Blockchain) newInteropContext(trigger byte, s storage.Store, block *Block, tx *transaction.Transaction) *interopContext { + return newInteropContext(trigger, bc, s, block, tx, bc.log) +} diff --git a/pkg/core/interop_neo_test.go b/pkg/core/interop_neo_test.go index 5793d0598..21a07769c 100644 --- a/pkg/core/interop_neo_test.go +++ b/pkg/core/interop_neo_test.go @@ -15,7 +15,6 @@ import ( "github.com/CityOfZion/neo-go/pkg/vm" "github.com/CityOfZion/neo-go/pkg/vm/opcode" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" ) /* Missing tests: @@ -113,7 +112,7 @@ func TestHeaderGetVersion(t *testing.T) { func TestHeaderGetVersion_Negative(t *testing.T) { v := vm.New() block := newDumbBlock() - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil, zaptest.NewLogger(t)) + context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), block, nil) v.Estack().PushVal(vm.NewBoolItem(false)) err := context.headerGetVersion(v) @@ -198,7 +197,7 @@ func TestWitnessGetVerificationScript(t *testing.T) { script := []byte{byte(opcode.PUSHM1), byte(opcode.RET)} witness := transaction.Witness{InvocationScript: nil, VerificationScript: script} - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t)) + context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) v.Estack().PushVal(vm.NewInteropItem(&witness)) err := context.witnessGetVerificationScript(v) require.NoError(t, err) @@ -419,7 +418,7 @@ func TestAssetGetPrecision(t *testing.T) { func createVMAndPushBlock(t *testing.T) (*vm.VM, *Block, *interopContext) { v := vm.New() block := newDumbBlock() - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil, zaptest.NewLogger(t)) + context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), block, nil) v.Estack().PushVal(vm.NewInteropItem(block)) return v, block, context } @@ -448,7 +447,7 @@ func createVMAndAssetState(t *testing.T) (*vm.VM, *state.Asset, *interopContext) IsFrozen: false, } - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t)) + context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) return v, assetState, context } @@ -466,7 +465,7 @@ func createVMAndContractState(t *testing.T) (*vm.VM, *state.Contract, *interopCo Description: random.String(10), } - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t)) + context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) return v, contractState, context } @@ -480,7 +479,7 @@ func createVMAndAccState(t *testing.T) (*vm.VM, *state.Account, *interopContext) accountState.Votes = []*keys.PublicKey{key} require.NoError(t, err) - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t)) + context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) return v, accountState, context } @@ -510,6 +509,6 @@ func createVMAndTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopCont tx.Attributes = attributes tx.Inputs = inputs tx.Outputs = outputs - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, tx, zaptest.NewLogger(t)) + context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, tx) return v, tx, context } diff --git a/pkg/core/interops_test.go b/pkg/core/interops_test.go index e6376b3ab..8f9adc35a 100644 --- a/pkg/core/interops_test.go +++ b/pkg/core/interops_test.go @@ -9,13 +9,12 @@ import ( "github.com/CityOfZion/neo-go/pkg/smartcontract/trigger" "github.com/CityOfZion/neo-go/pkg/vm" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" ) func testNonInterop(t *testing.T, value interface{}, f func(*interopContext, *vm.VM) error) { v := vm.New() v.Estack().PushVal(value) - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t)) + context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) require.Error(t, f(context, v)) } From 9b8b77c9ea9163ee0a37bbe494540804e6badbdf Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 30 Dec 2019 15:38:23 +0300 Subject: [PATCH 6/8] network: return error if header message is too big Big messages can still be processed but only first 2000 headers will be used. --- pkg/network/message.go | 8 ++-- pkg/network/payload/headers.go | 12 ++++- pkg/network/payload/headers_test.go | 72 ++++++++++++++++------------- pkg/network/tcp_transport.go | 8 +++- 4 files changed, 62 insertions(+), 38 deletions(-) diff --git a/pkg/network/message.go b/pkg/network/message.go index 16802a2ba..087724967 100644 --- a/pkg/network/message.go +++ b/pkg/network/message.go @@ -201,13 +201,11 @@ func (m *Message) decodePayload(br *io.BinReader) error { return fmt.Errorf("can't decode command %s", cmdByteArrayToString(m.Command)) } p.DecodeBinary(r) - if r.Err != nil { - return r.Err + if r.Err == nil || r.Err == payload.ErrTooManyHeaders { + m.Payload = p } - m.Payload = p - - return nil + return r.Err } // Encode encodes a Message to any given BinWriter. diff --git a/pkg/network/payload/headers.go b/pkg/network/payload/headers.go index 33f7b8e53..57160c190 100644 --- a/pkg/network/payload/headers.go +++ b/pkg/network/payload/headers.go @@ -3,6 +3,7 @@ package payload import ( "github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/io" + "github.com/pkg/errors" ) // Headers payload. @@ -15,12 +16,17 @@ const ( MaxHeadersAllowed = 2000 ) +// ErrTooManyHeaders is an error returned when too many headers were received. +var ErrTooManyHeaders = errors.Errorf("too many headers were received (max: %d)", MaxHeadersAllowed) + // DecodeBinary implements Serializable interface. func (p *Headers) DecodeBinary(br *io.BinReader) { lenHeaders := br.ReadVarUint() + var limitExceeded bool + // C# node does it silently - if lenHeaders > MaxHeadersAllowed { + if limitExceeded = lenHeaders > MaxHeadersAllowed; limitExceeded { lenHeaders = MaxHeadersAllowed } @@ -31,6 +37,10 @@ func (p *Headers) DecodeBinary(br *io.BinReader) { header.DecodeBinary(br) p.Hdrs[i] = header } + + if br.Err == nil && limitExceeded { + br.Err = ErrTooManyHeaders + } } // EncodeBinary implements Serializable interface. diff --git a/pkg/network/payload/headers_test.go b/pkg/network/payload/headers_test.go index 44c2b001f..22762cc0e 100644 --- a/pkg/network/payload/headers_test.go +++ b/pkg/network/payload/headers_test.go @@ -11,36 +11,39 @@ import ( ) func TestHeadersEncodeDecode(t *testing.T) { - headers := &Headers{[]*core.Header{ - { - BlockBase: core.BlockBase{ - Version: 0, - Index: 1, - Script: transaction.Witness{ - InvocationScript: []byte{0x0}, - VerificationScript: []byte{0x1}, - }, - }}, - { - BlockBase: core.BlockBase{ - Version: 0, - Index: 2, - Script: transaction.Witness{ - InvocationScript: []byte{0x0}, - VerificationScript: []byte{0x1}, - }, - }}, - { - BlockBase: core.BlockBase{ - Version: 0, - Index: 3, - Script: transaction.Witness{ - InvocationScript: []byte{0x0}, - VerificationScript: []byte{0x1}, - }, - }}, - }} + t.Run("normal case", func(t *testing.T) { + headers := newTestHeaders(3) + testHeadersEncodeDecode(t, headers, 3, false) + }) + + t.Run("more than max", func(t *testing.T) { + const sent = MaxHeadersAllowed + 1 + headers := newTestHeaders(sent) + + testHeadersEncodeDecode(t, headers, MaxHeadersAllowed, true) + }) +} + +func newTestHeaders(n int) *Headers { + headers := &Headers{Hdrs: make([]*core.Header, n)} + + for i := range headers.Hdrs { + headers.Hdrs[i] = &core.Header{ + BlockBase: core.BlockBase{ + Index: uint32(i + 1), + Script: transaction.Witness{ + InvocationScript: []byte{0x0}, + VerificationScript: []byte{0x1}, + }, + }, + } + } + + return headers +} + +func testHeadersEncodeDecode(t *testing.T, headers *Headers, expected int, limit bool) { buf := io.NewBufBinWriter() headers.EncodeBinary(buf.BinWriter) assert.Nil(t, buf.Err) @@ -49,9 +52,16 @@ func TestHeadersEncodeDecode(t *testing.T) { r := io.NewBinReaderFromBuf(b) headersDecode := &Headers{} headersDecode.DecodeBinary(r) - assert.Nil(t, r.Err) - for i := 0; i < len(headers.Hdrs); i++ { + var err error + if limit { + err = ErrTooManyHeaders + } + + assert.Equal(t, err, r.Err) + assert.Equal(t, expected, len(headersDecode.Hdrs)) + + for i := 0; i < len(headersDecode.Hdrs); i++ { assert.Equal(t, headers.Hdrs[i].Version, headersDecode.Hdrs[i].Version) assert.Equal(t, headers.Hdrs[i].Index, headersDecode.Hdrs[i].Index) assert.Equal(t, headers.Hdrs[i].Script, headersDecode.Hdrs[i].Script) diff --git a/pkg/network/tcp_transport.go b/pkg/network/tcp_transport.go index f41015f53..2055529d6 100644 --- a/pkg/network/tcp_transport.go +++ b/pkg/network/tcp_transport.go @@ -6,6 +6,7 @@ import ( "time" "github.com/CityOfZion/neo-go/pkg/io" + "github.com/CityOfZion/neo-go/pkg/network/payload" "go.uber.org/zap" ) @@ -87,7 +88,12 @@ func (t *TCPTransport) handleConn(conn net.Conn) { r := io.NewBinReaderFromIO(p.conn) for { msg := &Message{} - if err = msg.Decode(r); err != nil { + err := msg.Decode(r) + + if err == payload.ErrTooManyHeaders { + t.log.Warn("not all headers were processed") + r.Err = nil + } else if err != nil { break } if err = t.server.handleMessage(p, msg); err != nil { From dc65684708eff3e4625b2928bdb65c4022529375 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 9 Jan 2020 17:46:08 +0300 Subject: [PATCH 7/8] consensus: replace SugaredLogger with Logger SugaredLogger was used to make output look more similar to logrus. This is no longer needed. --- pkg/consensus/consensus.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 054a8761f..16a255bb6 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -46,7 +46,7 @@ type Service interface { type service struct { Config - log *zap.SugaredLogger + log *zap.Logger // cache is a fifo cache which stores recent payloads. cache *relayCache // txx is a fifo cache which stores miner transactions. @@ -92,7 +92,7 @@ func NewService(cfg Config) (Service, error) { srv := &service{ Config: cfg, - log: cfg.Logger.Sugar(), + log: cfg.Logger, cache: newFIFOCache(cacheMaxCapacity), txx: newFIFOCache(cacheMaxCapacity), messages: make(chan Payload, 100), @@ -107,7 +107,7 @@ func NewService(cfg Config) (Service, error) { priv, pub := getKeyPair(cfg.Wallet) srv.dbft = dbft.New( - dbft.WithLogger(srv.log.Desugar()), + dbft.WithLogger(srv.log), dbft.WithSecondsPerBlock(cfg.TimePerBlock), dbft.WithKeyPair(priv, pub), dbft.WithTxPerBlock(10000), @@ -154,10 +154,12 @@ func (s *service) eventLoop() { for { select { case hv := <-s.dbft.Timer.C(): - s.log.Debugf("timer fired (%d,%d)", hv.Height, hv.View) + s.log.Debug("timer fired", + zap.Uint32("height", hv.Height), + zap.Uint("view", uint(hv.View))) s.dbft.OnTimeout(hv) case msg := <-s.messages: - s.log.Debugf("received message from %d", msg.validatorIndex) + s.log.Debug("received message", zap.Uint16("from", msg.validatorIndex)) s.dbft.OnReceive(&msg) case tx := <-s.transactions: s.dbft.OnTransaction(tx) @@ -240,7 +242,7 @@ func (s *service) broadcast(p payload.ConsensusPayload) { } if err := p.(*Payload).Sign(s.dbft.Priv.(*privateKey)); err != nil { - s.log.Warnf("can't sign consensus payload: %v", err) + s.log.Warn("can't sign consensus payload", zap.Error(err)) } s.cache.Add(p) @@ -279,7 +281,7 @@ func (s *service) processBlock(b block.Block) { bb.Script = *(s.getBlockWitness(bb)) if err := s.Chain.AddBlock(bb); err != nil { - s.log.Warnf("error on add block: %v", err) + s.log.Warn("error on add block", zap.Error(err)) } else { s.Config.RelayBlock(bb) } @@ -299,7 +301,7 @@ func (s *service) getBlockWitness(b *core.Block) *transaction.Witness { m := s.dbft.Context.M() verif, err := smartcontract.CreateMultiSigRedeemScript(m, pubs) if err != nil { - s.log.Warnf("can't create multisig redeem script: %v", err) + s.log.Warn("can't create multisig redeem script", zap.Error(err)) return nil } From 9dc5571327df9fcbda75d996b0aa18bb7fe7c118 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 10 Jan 2020 11:47:55 +0300 Subject: [PATCH 8/8] core,rpc: close Blockchain in tests If blockchain is not closed, logging in defer can occur after test has finished, which will lead to a panic with "Log in goroutine after Test* has completed". --- pkg/core/interop_neo_test.go | 143 ++++++++++++++++++++++------------- pkg/core/interops_test.go | 4 +- pkg/rpc/server_test.go | 2 + 3 files changed, 97 insertions(+), 52 deletions(-) diff --git a/pkg/core/interop_neo_test.go b/pkg/core/interop_neo_test.go index 21a07769c..39c43a714 100644 --- a/pkg/core/interop_neo_test.go +++ b/pkg/core/interop_neo_test.go @@ -35,7 +35,8 @@ import ( */ func TestStorageFind(t *testing.T) { - v, contractState, context := createVMAndContractState(t) + v, contractState, context, chain := createVMAndContractState(t) + defer chain.Close() skeys := [][]byte{{0x01, 0x02}, {0x02, 0x01}} items := []*state.StorageItem{ @@ -101,7 +102,8 @@ func TestStorageFind(t *testing.T) { } func TestHeaderGetVersion(t *testing.T) { - v, block, context := createVMAndPushBlock(t) + v, block, context, chain := createVMAndPushBlock(t) + defer chain.Close() err := context.headerGetVersion(v) require.NoError(t, err) @@ -112,7 +114,9 @@ func TestHeaderGetVersion(t *testing.T) { func TestHeaderGetVersion_Negative(t *testing.T) { v := vm.New() block := newDumbBlock() - context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), block, nil) + chain := newTestChain(t) + defer chain.Close() + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), block, nil) v.Estack().PushVal(vm.NewBoolItem(false)) err := context.headerGetVersion(v) @@ -120,7 +124,8 @@ func TestHeaderGetVersion_Negative(t *testing.T) { } func TestHeaderGetConsensusData(t *testing.T) { - v, block, context := createVMAndPushBlock(t) + v, block, context, chain := createVMAndPushBlock(t) + defer chain.Close() err := context.headerGetConsensusData(v) require.NoError(t, err) @@ -129,7 +134,8 @@ func TestHeaderGetConsensusData(t *testing.T) { } func TestHeaderGetMerkleRoot(t *testing.T) { - v, block, context := createVMAndPushBlock(t) + v, block, context, chain := createVMAndPushBlock(t) + defer chain.Close() err := context.headerGetMerkleRoot(v) require.NoError(t, err) @@ -138,7 +144,8 @@ func TestHeaderGetMerkleRoot(t *testing.T) { } func TestHeaderGetNextConsensus(t *testing.T) { - v, block, context := createVMAndPushBlock(t) + v, block, context, chain := createVMAndPushBlock(t) + defer chain.Close() err := context.headerGetNextConsensus(v) require.NoError(t, err) @@ -147,7 +154,8 @@ func TestHeaderGetNextConsensus(t *testing.T) { } func TestTxGetAttributes(t *testing.T) { - v, tx, context := createVMAndPushTX(t) + v, tx, context, chain := createVMAndPushTX(t) + defer chain.Close() err := context.txGetAttributes(v) require.NoError(t, err) @@ -156,7 +164,8 @@ func TestTxGetAttributes(t *testing.T) { } func TestTxGetInputs(t *testing.T) { - v, tx, context := createVMAndPushTX(t) + v, tx, context, chain := createVMAndPushTX(t) + defer chain.Close() err := context.txGetInputs(v) require.NoError(t, err) @@ -165,7 +174,8 @@ func TestTxGetInputs(t *testing.T) { } func TestTxGetOutputs(t *testing.T) { - v, tx, context := createVMAndPushTX(t) + v, tx, context, chain := createVMAndPushTX(t) + defer chain.Close() err := context.txGetOutputs(v) require.NoError(t, err) @@ -174,7 +184,8 @@ func TestTxGetOutputs(t *testing.T) { } func TestTxGetType(t *testing.T) { - v, tx, context := createVMAndPushTX(t) + v, tx, context, chain := createVMAndPushTX(t) + defer chain.Close() err := context.txGetType(v) require.NoError(t, err) @@ -183,7 +194,8 @@ func TestTxGetType(t *testing.T) { } func TestInvocationTxGetScript(t *testing.T) { - v, tx, context := createVMAndPushTX(t) + v, tx, context, chain := createVMAndPushTX(t) + defer chain.Close() err := context.invocationTxGetScript(v) require.NoError(t, err) @@ -197,7 +209,10 @@ func TestWitnessGetVerificationScript(t *testing.T) { script := []byte{byte(opcode.PUSHM1), byte(opcode.RET)} witness := transaction.Witness{InvocationScript: nil, VerificationScript: script} - context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) + chain := newTestChain(t) + defer chain.Close() + + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) v.Estack().PushVal(vm.NewInteropItem(&witness)) err := context.witnessGetVerificationScript(v) require.NoError(t, err) @@ -206,7 +221,8 @@ func TestWitnessGetVerificationScript(t *testing.T) { } func TestPopInputFromVM(t *testing.T) { - v, tx, _ := createVMAndTX(t) + v, tx, _, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Inputs[0])) input, err := popInputFromVM(v) @@ -215,7 +231,8 @@ func TestPopInputFromVM(t *testing.T) { } func TestInputGetHash(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Inputs[0])) err := context.inputGetHash(v) @@ -225,7 +242,8 @@ func TestInputGetHash(t *testing.T) { } func TestInputGetIndex(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Inputs[0])) err := context.inputGetIndex(v) @@ -235,7 +253,8 @@ func TestInputGetIndex(t *testing.T) { } func TestPopOutputFromVM(t *testing.T) { - v, tx, _ := createVMAndTX(t) + v, tx, _, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Outputs[0])) output, err := popOutputFromVM(v) @@ -244,7 +263,8 @@ func TestPopOutputFromVM(t *testing.T) { } func TestOutputGetAssetID(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Outputs[0])) err := context.outputGetAssetID(v) @@ -254,7 +274,8 @@ func TestOutputGetAssetID(t *testing.T) { } func TestOutputGetScriptHash(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Outputs[0])) err := context.outputGetScriptHash(v) @@ -264,7 +285,8 @@ func TestOutputGetScriptHash(t *testing.T) { } func TestOutputGetValue(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Outputs[0])) err := context.outputGetValue(v) @@ -274,7 +296,8 @@ func TestOutputGetValue(t *testing.T) { } func TestAttrGetData(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Attributes[0])) err := context.attrGetData(v) @@ -284,7 +307,8 @@ func TestAttrGetData(t *testing.T) { } func TestAttrGetUsage(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Attributes[0])) err := context.attrGetUsage(v) @@ -294,7 +318,8 @@ func TestAttrGetUsage(t *testing.T) { } func TestAccountGetScriptHash(t *testing.T) { - v, accState, context := createVMAndAccState(t) + v, accState, context, chain := createVMAndAccState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(accState)) err := context.accountGetScriptHash(v) @@ -304,7 +329,8 @@ func TestAccountGetScriptHash(t *testing.T) { } func TestAccountGetVotes(t *testing.T) { - v, accState, context := createVMAndAccState(t) + v, accState, context, chain := createVMAndAccState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(accState)) err := context.accountGetVotes(v) @@ -314,7 +340,8 @@ func TestAccountGetVotes(t *testing.T) { } func TestContractGetScript(t *testing.T) { - v, contractState, context := createVMAndContractState(t) + v, contractState, context, chain := createVMAndContractState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(contractState)) err := context.contractGetScript(v) @@ -324,7 +351,8 @@ func TestContractGetScript(t *testing.T) { } func TestContractIsPayable(t *testing.T) { - v, contractState, context := createVMAndContractState(t) + v, contractState, context, chain := createVMAndContractState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(contractState)) err := context.contractIsPayable(v) @@ -334,7 +362,8 @@ func TestContractIsPayable(t *testing.T) { } func TestAssetGetAdmin(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetAdmin(v) @@ -344,7 +373,8 @@ func TestAssetGetAdmin(t *testing.T) { } func TestAssetGetAmount(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetAmount(v) @@ -354,7 +384,8 @@ func TestAssetGetAmount(t *testing.T) { } func TestAssetGetAssetID(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetAssetID(v) @@ -364,7 +395,8 @@ func TestAssetGetAssetID(t *testing.T) { } func TestAssetGetAssetType(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetAssetType(v) @@ -374,7 +406,8 @@ func TestAssetGetAssetType(t *testing.T) { } func TestAssetGetAvailable(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetAvailable(v) @@ -384,7 +417,8 @@ func TestAssetGetAvailable(t *testing.T) { } func TestAssetGetIssuer(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetIssuer(v) @@ -394,7 +428,8 @@ func TestAssetGetIssuer(t *testing.T) { } func TestAssetGetOwner(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetOwner(v) @@ -404,7 +439,8 @@ func TestAssetGetOwner(t *testing.T) { } func TestAssetGetPrecision(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetPrecision(v) @@ -415,21 +451,22 @@ func TestAssetGetPrecision(t *testing.T) { // Helper functions to create VM, InteropContext, TX, Account, Contract, Asset. -func createVMAndPushBlock(t *testing.T) (*vm.VM, *Block, *interopContext) { +func createVMAndPushBlock(t *testing.T) (*vm.VM, *Block, *interopContext, *Blockchain) { v := vm.New() block := newDumbBlock() - context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), block, nil) + chain := newTestChain(t) + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), block, nil) v.Estack().PushVal(vm.NewInteropItem(block)) - return v, block, context + return v, block, context, chain } -func createVMAndPushTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopContext) { - v, tx, context := createVMAndTX(t) +func createVMAndPushTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopContext, *Blockchain) { + v, tx, context, chain := createVMAndTX(t) v.Estack().PushVal(vm.NewInteropItem(tx)) - return v, tx, context + return v, tx, context, chain } -func createVMAndAssetState(t *testing.T) (*vm.VM, *state.Asset, *interopContext) { +func createVMAndAssetState(t *testing.T) (*vm.VM, *state.Asset, *interopContext, *Blockchain) { v := vm.New() assetState := &state.Asset{ ID: util.Uint256{}, @@ -447,11 +484,12 @@ func createVMAndAssetState(t *testing.T) (*vm.VM, *state.Asset, *interopContext) IsFrozen: false, } - context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) - return v, assetState, context + chain := newTestChain(t) + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) + return v, assetState, context, chain } -func createVMAndContractState(t *testing.T) (*vm.VM, *state.Contract, *interopContext) { +func createVMAndContractState(t *testing.T) (*vm.VM, *state.Contract, *interopContext, *Blockchain) { v := vm.New() contractState := &state.Contract{ Script: []byte("testscript"), @@ -465,11 +503,12 @@ func createVMAndContractState(t *testing.T) (*vm.VM, *state.Contract, *interopCo Description: random.String(10), } - context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) - return v, contractState, context + chain := newTestChain(t) + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) + return v, contractState, context, chain } -func createVMAndAccState(t *testing.T) (*vm.VM, *state.Account, *interopContext) { +func createVMAndAccState(t *testing.T) (*vm.VM, *state.Account, *interopContext, *Blockchain) { v := vm.New() rawHash := "4d3b96ae1bcc5a585e075e3b81920210dec16302" hash, err := util.Uint160DecodeStringBE(rawHash) @@ -479,11 +518,12 @@ func createVMAndAccState(t *testing.T) (*vm.VM, *state.Account, *interopContext) accountState.Votes = []*keys.PublicKey{key} require.NoError(t, err) - context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) - return v, accountState, context + chain := newTestChain(t) + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) + return v, accountState, context, chain } -func createVMAndTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopContext) { +func createVMAndTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopContext, *Blockchain) { v := vm.New() script := []byte{byte(opcode.PUSH1), byte(opcode.RET)} tx := transaction.NewInvocationTX(script, 0) @@ -509,6 +549,7 @@ func createVMAndTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopCont tx.Attributes = attributes tx.Inputs = inputs tx.Outputs = outputs - context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, tx) - return v, tx, context + chain := newTestChain(t) + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, tx) + return v, tx, context, chain } diff --git a/pkg/core/interops_test.go b/pkg/core/interops_test.go index 8f9adc35a..ab45d6dbd 100644 --- a/pkg/core/interops_test.go +++ b/pkg/core/interops_test.go @@ -14,7 +14,9 @@ import ( func testNonInterop(t *testing.T, value interface{}, f func(*interopContext, *vm.VM) error) { v := vm.New() v.Estack().PushVal(value) - context := newTestChain(t).newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) + chain := newTestChain(t) + defer chain.Close() + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) require.Error(t, f(context, v)) } diff --git a/pkg/rpc/server_test.go b/pkg/rpc/server_test.go index 4b86ac271..7656ebeff 100644 --- a/pkg/rpc/server_test.go +++ b/pkg/rpc/server_test.go @@ -417,6 +417,8 @@ var rpcTestCases = map[string][]rpcTestCase{ func TestRPC(t *testing.T) { chain, handler := initServerWithInMemoryChain(t) + defer chain.Close() + e := &executor{chain: chain, handler: handler} for method, cases := range rpcTestCases { t.Run(method, func(t *testing.T) {