diff --git a/cli/server/server.go b/cli/server/server.go index 6ecdfdda7..87cc30a02 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -15,8 +15,9 @@ import ( "github.com/CityOfZion/neo-go/pkg/network/metrics" "github.com/CityOfZion/neo-go/pkg/rpc" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" "github.com/urfave/cli" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // NewCommands returns 'node' command. @@ -119,32 +120,40 @@ func getConfigFromContext(ctx *cli.Context) (config.Config, error) { // handleLoggingParams reads logging parameters. // If user selected debug level -- function enables it. // If logPath is configured -- function creates dir and file for logging. -func handleLoggingParams(ctx *cli.Context, cfg config.ApplicationConfiguration) error { +func handleLoggingParams(ctx *cli.Context, cfg config.ApplicationConfiguration) (*zap.Logger, error) { + level := zapcore.InfoLevel if ctx.Bool("debug") { - log.SetLevel(log.DebugLevel) + level = zapcore.DebugLevel } + cc := zap.NewProductionConfig() + cc.DisableCaller = true + cc.DisableStacktrace = true + cc.EncoderConfig.EncodeDuration = zapcore.StringDurationEncoder + cc.EncoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder + cc.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + cc.Encoding = "console" + cc.Level = zap.NewAtomicLevelAt(level) + if logPath := cfg.LogPath; logPath != "" { if err := io.MakeDirForFile(logPath, "logger"); err != nil { - return err + return nil, err } - f, err := os.Create(logPath) - if err != nil { - return err - } - log.SetOutput(f) + + cc.OutputPaths = []string{logPath} } - return nil + + return cc.Build() } -func initBCWithMetrics(cfg config.Config) (*core.Blockchain, *metrics.Service, *metrics.Service, error) { - chain, err := initBlockChain(cfg) +func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *metrics.Service, *metrics.Service, error) { + chain, err := initBlockChain(cfg, log) if err != nil { return nil, nil, nil, cli.NewExitError(err, 1) } configureAddresses(cfg.ApplicationConfiguration) - prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus) - pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof) + prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus, log) + pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof, log) go chain.Run() go prometheus.Start() @@ -158,7 +167,8 @@ func dumpDB(ctx *cli.Context) error { if err != nil { return cli.NewExitError(err, 1) } - if err := handleLoggingParams(ctx, cfg.ApplicationConfiguration); err != nil { + log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration) + if err != nil { return cli.NewExitError(err, 1) } count := uint32(ctx.Uint("count")) @@ -174,7 +184,7 @@ func dumpDB(ctx *cli.Context) error { defer outStream.Close() writer := io.NewBinWriterFromIO(outStream) - chain, prometheus, pprof, err := initBCWithMetrics(cfg) + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) if err != nil { return err } @@ -213,7 +223,8 @@ func restoreDB(ctx *cli.Context) error { if err != nil { return err } - if err := handleLoggingParams(ctx, cfg.ApplicationConfiguration); err != nil { + log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration) + if err != nil { return cli.NewExitError(err, 1) } count := uint32(ctx.Uint("count")) @@ -229,7 +240,7 @@ func restoreDB(ctx *cli.Context) error { defer inStream.Close() reader := io.NewBinReaderFromIO(inStream) - chain, prometheus, pprof, err := initBCWithMetrics(cfg) + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) if err != nil { return err } @@ -265,7 +276,7 @@ func restoreDB(ctx *cli.Context) error { if block.Index == 0 && i == 0 && skip == 0 { genesis, err := chain.GetBlock(block.Hash()) if err == nil && genesis.Index == 0 { - log.Info("skipped genesis block ", block.Hash().StringLE()) + log.Info("skipped genesis block", zap.String("hash", block.Hash().StringLE())) continue } } @@ -293,7 +304,8 @@ func startServer(ctx *cli.Context) error { if err != nil { return err } - if err := handleLoggingParams(ctx, cfg.ApplicationConfiguration); err != nil { + log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration) + if err != nil { return err } @@ -302,13 +314,13 @@ func startServer(ctx *cli.Context) error { serverConfig := network.NewServerConfig(cfg) - chain, prometheus, pprof, err := initBCWithMetrics(cfg) + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) if err != nil { return err } - server := network.NewServer(serverConfig, chain) - rpcServer := rpc.NewServer(chain, cfg.ApplicationConfiguration.RPC, server) + server := network.NewServer(serverConfig, chain, log) + rpcServer := rpc.NewServer(chain, cfg.ApplicationConfiguration.RPC, server, log) errChan := make(chan error) go server.Start(errChan) @@ -364,13 +376,13 @@ func configureAddresses(cfg config.ApplicationConfiguration) { } // initBlockChain initializes BlockChain with preselected DB. -func initBlockChain(cfg config.Config) (*core.Blockchain, error) { +func initBlockChain(cfg config.Config, log *zap.Logger) (*core.Blockchain, error) { store, err := storage.NewStore(cfg.ApplicationConfiguration.DBConfiguration) if err != nil { return nil, cli.NewExitError(fmt.Errorf("could not initialize storage: %s", err), 1) } - chain, err := core.NewBlockchain(store, cfg.ProtocolConfiguration) + chain, err := core.NewBlockchain(store, cfg.ProtocolConfiguration, log) if err != nil { return nil, cli.NewExitError(fmt.Errorf("could not initialize blockchain: %s", err), 1) } diff --git a/go.mod b/go.mod index 6121dd82e..bcd5cb4c6 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/nspcc-dev/rfc6979 v0.1.0 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.2.1 - github.com/sirupsen/logrus v1.4.2 github.com/stretchr/testify v1.4.0 github.com/syndtr/goleveldb v0.0.0-20180307113352-169b1b37be73 github.com/urfave/cli v1.20.0 diff --git a/integration/performance_test.go b/integration/performance_test.go index d81c42e58..738f660b5 100644 --- a/integration/performance_test.go +++ b/integration/performance_test.go @@ -13,6 +13,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/network" "github.com/CityOfZion/neo-go/pkg/rpc" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) // Benchmark test to measure number of processed TX. @@ -23,14 +24,15 @@ func BenchmarkTXPerformanceTest(t *testing.B) { cfg, err := config.Load(configPath, net) require.NoError(t, err, "could not load config") + logger := zaptest.NewLogger(t) memoryStore := storage.NewMemoryStore() - chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration) + chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration, logger) require.NoError(t, err, "could not create chain") go chain.Run() serverConfig := network.NewServerConfig(cfg) - server := network.NewServer(serverConfig, chain) + server := network.NewServer(serverConfig, chain, logger) data := prepareData(t) t.ResetTimer() diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 5f8999644..38326766e 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -45,7 +45,7 @@ type Service interface { type service struct { Config - log *zap.SugaredLogger + log *zap.Logger // cache is a fifo cache which stores recent payloads. cache *relayCache // txx is a fifo cache which stores miner transactions. @@ -59,6 +59,8 @@ type service struct { // Config is a configuration for consensus services. type Config struct { + // Logger is a logger instance. + Logger *zap.Logger // Broadcast is a callback which is called to notify server // about new consensus payload to sent. Broadcast func(p *Payload) @@ -78,19 +80,18 @@ type Config struct { // NewService returns new consensus.Service instance. func NewService(cfg Config) (Service, error) { - log, err := getLogger() - if err != nil { - return nil, err - } - if cfg.TimePerBlock <= 0 { cfg.TimePerBlock = defaultTimePerBlock } + if cfg.Logger == nil { + return nil, errors.New("empty logger") + } + srv := &service{ Config: cfg, - log: log.Sugar(), + log: cfg.Logger, cache: newFIFOCache(cacheMaxCapacity), txx: newFIFOCache(cacheMaxCapacity), messages: make(chan Payload, 100), @@ -105,7 +106,7 @@ func NewService(cfg Config) (Service, error) { priv, pub := getKeyPair(cfg.Wallet) srv.dbft = dbft.New( - dbft.WithLogger(srv.log.Desugar()), + dbft.WithLogger(srv.log), dbft.WithSecondsPerBlock(cfg.TimePerBlock), dbft.WithKeyPair(priv, pub), dbft.WithTxPerBlock(10000), @@ -152,10 +153,12 @@ func (s *service) eventLoop() { for { select { case hv := <-s.dbft.Timer.C(): - s.log.Debugf("timer fired (%d,%d)", hv.Height, hv.View) + s.log.Debug("timer fired", + zap.Uint32("height", hv.Height), + zap.Uint("view", uint(hv.View))) s.dbft.OnTimeout(hv) case msg := <-s.messages: - s.log.Debugf("received message from %d", msg.validatorIndex) + s.log.Debug("received message", zap.Uint16("from", msg.validatorIndex)) s.dbft.OnReceive(&msg) case tx := <-s.transactions: s.dbft.OnTransaction(tx) @@ -234,7 +237,7 @@ func (s *service) broadcast(p payload.ConsensusPayload) { } if err := p.(*Payload).Sign(s.dbft.Priv.(*privateKey)); err != nil { - s.log.Warnf("can't sign consensus payload: %v", err) + s.log.Warn("can't sign consensus payload", zap.Error(err)) } s.cache.Add(p) @@ -273,7 +276,7 @@ func (s *service) processBlock(b block.Block) { bb.Script = *(s.getBlockWitness(bb)) if err := s.Chain.AddBlock(bb); err != nil { - s.log.Warnf("error on add block: %v", err) + s.log.Warn("error on add block", zap.Error(err)) } else { s.Config.RelayBlock(bb) } @@ -293,7 +296,7 @@ func (s *service) getBlockWitness(b *core.Block) *transaction.Witness { m := s.dbft.Context.M() verif, err := smartcontract.CreateMultiSigRedeemScript(m, pubs) if err != nil { - s.log.Warnf("can't create multisig redeem script: %v", err) + s.log.Warn("can't create multisig redeem script", zap.Error(err)) return nil } diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 3d86e969a..66909fcf2 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -10,6 +10,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" "github.com/nspcc-dev/dbft/block" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) func TestNewService(t *testing.T) { @@ -128,6 +129,7 @@ func shouldNotReceive(t *testing.T, ch chan Payload) { func newTestService(t *testing.T) *service { srv, err := NewService(Config{ + Logger: zaptest.NewLogger(t), Broadcast: func(*Payload) {}, Chain: newTestChain(t), RequestTx: func(...util.Uint256) {}, @@ -177,7 +179,7 @@ func newTestChain(t *testing.T) *core.Blockchain { unitTestNetCfg, err := config.Load("../../config", config.ModeUnitTestNet) require.NoError(t, err) - chain, err := core.NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration) + chain, err := core.NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration, zaptest.NewLogger(t)) require.NoError(t, err) go chain.Run() diff --git a/pkg/consensus/logger.go b/pkg/consensus/logger.go deleted file mode 100644 index 49dba8e69..000000000 --- a/pkg/consensus/logger.go +++ /dev/null @@ -1,19 +0,0 @@ -package consensus - -import ( - "go.uber.org/zap" -) - -func getLogger() (*zap.Logger, error) { - cc := zap.NewDevelopmentConfig() - cc.DisableCaller = true - cc.DisableStacktrace = true - cc.Encoding = "console" - - log, err := cc.Build() - if err != nil { - return nil, err - } - - return log.With(zap.String("module", "dbft")), nil -} diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index a44a03386..69cb4f9b8 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -21,7 +21,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/vm" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // Tuning parameters. @@ -80,13 +80,19 @@ type Blockchain struct { // cache for block verification keys. keyCache map[util.Uint160]map[string]*keys.PublicKey + + log *zap.Logger } type headersOpFunc func(headerList *HeaderHashList) // NewBlockchain returns a new blockchain object the will use the // given Store as its underlying storage. -func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockchain, error) { +func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.Logger) (*Blockchain, error) { + if log == nil { + return nil, errors.New("empty logger") + } + bc := &Blockchain{ config: cfg, dao: newDao(s), @@ -96,6 +102,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha runToExitCh: make(chan struct{}), memPool: NewMemPool(50000), keyCache: make(map[util.Uint160]map[string]*keys.PublicKey), + log: log, } if err := bc.init(); err != nil { @@ -109,7 +116,7 @@ func (bc *Blockchain) init() error { // If we could not find the version in the Store, we know that there is nothing stored. ver, err := bc.dao.GetVersion() if err != nil { - log.Infof("no storage version found! creating genesis block") + bc.log.Info("no storage version found! creating genesis block") if err = bc.dao.PutVersion(version); err != nil { return err } @@ -131,7 +138,7 @@ func (bc *Blockchain) init() error { // At this point there was no version found in the storage which // implies a creating fresh storage with the version specified // and the genesis block as first block. - log.Infof("restoring blockchain with version: %s", version) + bc.log.Info("restoring blockchain", zap.String("version", version)) bHeight, err := bc.dao.GetCurrentBlockHeight() if err != nil { @@ -200,10 +207,10 @@ func (bc *Blockchain) Run() { defer func() { persistTimer.Stop() if err := bc.persist(); err != nil { - log.Warnf("failed to persist: %s", err) + bc.log.Warn("failed to persist", zap.Error(err)) } if err := bc.dao.store.Close(); err != nil { - log.Warnf("failed to close db: %s", err) + bc.log.Warn("failed to close db", zap.Error(err)) } close(bc.runToExitCh) }() @@ -218,7 +225,7 @@ func (bc *Blockchain) Run() { go func() { err := bc.persist() if err != nil { - log.Warnf("failed to persist blockchain: %s", err) + bc.log.Warn("failed to persist blockchain", zap.Error(err)) } }() persistTimer.Reset(persistInterval) @@ -302,11 +309,10 @@ func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) { if err = bc.dao.store.PutBatch(batch); err != nil { return } - log.WithFields(log.Fields{ - "headerIndex": headerList.Len() - 1, - "blockHeight": bc.BlockHeight(), - "took": time.Since(start), - }).Debug("done processing headers") + bc.log.Debug("done processing headers", + zap.Int("headerIndex", headerList.Len()-1), + zap.Uint32("blockHeight", bc.BlockHeight()), + zap.Duration("took", time.Since(start))) } } <-bc.headersOpDone @@ -502,7 +508,7 @@ func (bc *Blockchain) storeBlock(block *Block) error { return err } case *transaction.InvocationTX: - systemInterop := newInteropContext(trigger.Application, bc, cache.store, block, tx) + systemInterop := bc.newInteropContext(trigger.Application, cache.store, block, tx) v := bc.spawnVMWithInterops(systemInterop) v.SetCheckedHash(tx.VerificationHash().BytesBE()) v.LoadScript(t.Script) @@ -537,11 +543,10 @@ func (bc *Blockchain) storeBlock(block *Block) error { _, _, _, _ = op, from, to, amount } } else { - log.WithFields(log.Fields{ - "tx": tx.Hash().StringLE(), - "block": block.Index, - "err": err, - }).Warn("contract invocation failed") + bc.log.Warn("contract invocation failed", + zap.String("tx", tx.Hash().StringLE()), + zap.Uint32("block", block.Index), + zap.Error(err)) } aer := &state.AppExecResult{ TxHash: tx.Hash(), @@ -710,13 +715,12 @@ func (bc *Blockchain) persist() error { if err != nil { return err } - log.WithFields(log.Fields{ - "persistedBlocks": diff, - "persistedKeys": persisted, - "headerHeight": storedHeaderHeight, - "blockHeight": bHeight, - "took": time.Since(start), - }).Info("blockchain persist completed") + bc.log.Info("blockchain persist completed", + zap.Uint32("persistedBlocks", diff), + zap.Int("persistedKeys", persisted), + zap.Uint32("headerHeight", storedHeaderHeight), + zap.Uint32("blockHeight", bHeight), + zap.Duration("took", time.Since(start))) // update monitoring metrics. updatePersistedHeightMetric(bHeight) @@ -849,7 +853,9 @@ func (bc *Blockchain) HeaderHeight() uint32 { func (bc *Blockchain) GetAssetState(assetID util.Uint256) *state.Asset { asset, err := bc.dao.GetAssetState(assetID) if asset == nil && err != storage.ErrKeyNotFound { - log.Warnf("failed to get asset state %s : %s", assetID, err) + bc.log.Warn("failed to get asset state", + zap.Stringer("asset", assetID), + zap.Error(err)) } return asset } @@ -858,7 +864,7 @@ func (bc *Blockchain) GetAssetState(assetID util.Uint256) *state.Asset { func (bc *Blockchain) GetContractState(hash util.Uint160) *state.Contract { contract, err := bc.dao.GetContractState(hash) if contract == nil && err != storage.ErrKeyNotFound { - log.Warnf("failed to get contract state: %s", err) + bc.log.Warn("failed to get contract state", zap.Error(err)) } return contract } @@ -867,7 +873,7 @@ func (bc *Blockchain) GetContractState(hash util.Uint160) *state.Contract { func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *state.Account { as, err := bc.dao.GetAccountState(scriptHash) if as == nil && err != storage.ErrKeyNotFound { - log.Warnf("failed to get account state: %s", err) + bc.log.Warn("failed to get account state", zap.Error(err)) } return as } @@ -876,7 +882,7 @@ func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *state.Account { func (bc *Blockchain) GetUnspentCoinState(hash util.Uint256) *UnspentCoinState { ucs, err := bc.dao.GetUnspentCoinState(hash) if ucs == nil && err != storage.ErrKeyNotFound { - log.Warnf("failed to get unspent coin state: %s", err) + bc.log.Warn("failed to get unspent coin state", zap.Error(err)) } return ucs } @@ -1367,7 +1373,7 @@ func (bc *Blockchain) spawnVMWithInterops(interopCtx *interopContext) *vm.VM { // GetTestVM returns a VM and a Store setup for a test run of some sort of code. func (bc *Blockchain) GetTestVM() (*vm.VM, storage.Store) { tmpStore := storage.NewMemCachedStore(bc.dao.store) - systemInterop := newInteropContext(trigger.Application, bc, tmpStore, nil, nil) + systemInterop := bc.newInteropContext(trigger.Application, tmpStore, nil, nil) vm := bc.spawnVMWithInterops(systemInterop) return vm, tmpStore } @@ -1446,7 +1452,7 @@ func (bc *Blockchain) verifyTxWitnesses(t *transaction.Transaction, block *Block } sort.Slice(hashes, func(i, j int) bool { return hashes[i].Less(hashes[j]) }) sort.Slice(witnesses, func(i, j int) bool { return witnesses[i].ScriptHash().Less(witnesses[j].ScriptHash()) }) - interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, block, t) + interopCtx := bc.newInteropContext(trigger.Verification, bc.dao.store, block, t) for i := 0; i < len(hashes); i++ { err := bc.verifyHashAgainstScript(hashes[i], &witnesses[i], t.VerificationHash(), interopCtx, false) if err != nil { @@ -1466,7 +1472,7 @@ func (bc *Blockchain) verifyBlockWitnesses(block *Block, prevHeader *Header) err } else { hash = prevHeader.NextConsensus } - interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, nil, nil) + interopCtx := bc.newInteropContext(trigger.Verification, bc.dao.store, nil, nil) return bc.verifyHashAgainstScript(hash, &block.Script, block.VerificationHash(), interopCtx, true) } @@ -1480,3 +1486,7 @@ func hashAndIndexToBytes(h util.Uint256, index uint32) []byte { func (bc *Blockchain) secondsPerBlock() int { return bc.config.SecondsPerBlock } + +func (bc *Blockchain) newInteropContext(trigger byte, s storage.Store, block *Block, tx *transaction.Transaction) *interopContext { + return newInteropContext(trigger, bc, s, block, tx, bc.log) +} diff --git a/pkg/core/helper_test.go b/pkg/core/helper_test.go index 132dc5087..f750af78f 100644 --- a/pkg/core/helper_test.go +++ b/pkg/core/helper_test.go @@ -17,6 +17,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/vm/opcode" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) var newBlockPrevHash util.Uint256 @@ -37,7 +38,7 @@ func newTestChain(t *testing.T) *Blockchain { if err != nil { t.Fatal(err) } - chain, err := NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration) + chain, err := NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration, zaptest.NewLogger(t)) if err != nil { t.Fatal(err) } diff --git a/pkg/core/interop_neo_test.go b/pkg/core/interop_neo_test.go index 9498d414c..39c43a714 100644 --- a/pkg/core/interop_neo_test.go +++ b/pkg/core/interop_neo_test.go @@ -35,7 +35,8 @@ import ( */ func TestStorageFind(t *testing.T) { - v, contractState, context := createVMAndContractState(t) + v, contractState, context, chain := createVMAndContractState(t) + defer chain.Close() skeys := [][]byte{{0x01, 0x02}, {0x02, 0x01}} items := []*state.StorageItem{ @@ -101,7 +102,8 @@ func TestStorageFind(t *testing.T) { } func TestHeaderGetVersion(t *testing.T) { - v, block, context := createVMAndPushBlock(t) + v, block, context, chain := createVMAndPushBlock(t) + defer chain.Close() err := context.headerGetVersion(v) require.NoError(t, err) @@ -112,7 +114,9 @@ func TestHeaderGetVersion(t *testing.T) { func TestHeaderGetVersion_Negative(t *testing.T) { v := vm.New() block := newDumbBlock() - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil) + chain := newTestChain(t) + defer chain.Close() + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), block, nil) v.Estack().PushVal(vm.NewBoolItem(false)) err := context.headerGetVersion(v) @@ -120,7 +124,8 @@ func TestHeaderGetVersion_Negative(t *testing.T) { } func TestHeaderGetConsensusData(t *testing.T) { - v, block, context := createVMAndPushBlock(t) + v, block, context, chain := createVMAndPushBlock(t) + defer chain.Close() err := context.headerGetConsensusData(v) require.NoError(t, err) @@ -129,7 +134,8 @@ func TestHeaderGetConsensusData(t *testing.T) { } func TestHeaderGetMerkleRoot(t *testing.T) { - v, block, context := createVMAndPushBlock(t) + v, block, context, chain := createVMAndPushBlock(t) + defer chain.Close() err := context.headerGetMerkleRoot(v) require.NoError(t, err) @@ -138,7 +144,8 @@ func TestHeaderGetMerkleRoot(t *testing.T) { } func TestHeaderGetNextConsensus(t *testing.T) { - v, block, context := createVMAndPushBlock(t) + v, block, context, chain := createVMAndPushBlock(t) + defer chain.Close() err := context.headerGetNextConsensus(v) require.NoError(t, err) @@ -147,7 +154,8 @@ func TestHeaderGetNextConsensus(t *testing.T) { } func TestTxGetAttributes(t *testing.T) { - v, tx, context := createVMAndPushTX(t) + v, tx, context, chain := createVMAndPushTX(t) + defer chain.Close() err := context.txGetAttributes(v) require.NoError(t, err) @@ -156,7 +164,8 @@ func TestTxGetAttributes(t *testing.T) { } func TestTxGetInputs(t *testing.T) { - v, tx, context := createVMAndPushTX(t) + v, tx, context, chain := createVMAndPushTX(t) + defer chain.Close() err := context.txGetInputs(v) require.NoError(t, err) @@ -165,7 +174,8 @@ func TestTxGetInputs(t *testing.T) { } func TestTxGetOutputs(t *testing.T) { - v, tx, context := createVMAndPushTX(t) + v, tx, context, chain := createVMAndPushTX(t) + defer chain.Close() err := context.txGetOutputs(v) require.NoError(t, err) @@ -174,7 +184,8 @@ func TestTxGetOutputs(t *testing.T) { } func TestTxGetType(t *testing.T) { - v, tx, context := createVMAndPushTX(t) + v, tx, context, chain := createVMAndPushTX(t) + defer chain.Close() err := context.txGetType(v) require.NoError(t, err) @@ -183,7 +194,8 @@ func TestTxGetType(t *testing.T) { } func TestInvocationTxGetScript(t *testing.T) { - v, tx, context := createVMAndPushTX(t) + v, tx, context, chain := createVMAndPushTX(t) + defer chain.Close() err := context.invocationTxGetScript(v) require.NoError(t, err) @@ -197,7 +209,10 @@ func TestWitnessGetVerificationScript(t *testing.T) { script := []byte{byte(opcode.PUSHM1), byte(opcode.RET)} witness := transaction.Witness{InvocationScript: nil, VerificationScript: script} - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) + chain := newTestChain(t) + defer chain.Close() + + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) v.Estack().PushVal(vm.NewInteropItem(&witness)) err := context.witnessGetVerificationScript(v) require.NoError(t, err) @@ -206,7 +221,8 @@ func TestWitnessGetVerificationScript(t *testing.T) { } func TestPopInputFromVM(t *testing.T) { - v, tx, _ := createVMAndTX(t) + v, tx, _, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Inputs[0])) input, err := popInputFromVM(v) @@ -215,7 +231,8 @@ func TestPopInputFromVM(t *testing.T) { } func TestInputGetHash(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Inputs[0])) err := context.inputGetHash(v) @@ -225,7 +242,8 @@ func TestInputGetHash(t *testing.T) { } func TestInputGetIndex(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Inputs[0])) err := context.inputGetIndex(v) @@ -235,7 +253,8 @@ func TestInputGetIndex(t *testing.T) { } func TestPopOutputFromVM(t *testing.T) { - v, tx, _ := createVMAndTX(t) + v, tx, _, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Outputs[0])) output, err := popOutputFromVM(v) @@ -244,7 +263,8 @@ func TestPopOutputFromVM(t *testing.T) { } func TestOutputGetAssetID(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Outputs[0])) err := context.outputGetAssetID(v) @@ -254,7 +274,8 @@ func TestOutputGetAssetID(t *testing.T) { } func TestOutputGetScriptHash(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Outputs[0])) err := context.outputGetScriptHash(v) @@ -264,7 +285,8 @@ func TestOutputGetScriptHash(t *testing.T) { } func TestOutputGetValue(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Outputs[0])) err := context.outputGetValue(v) @@ -274,7 +296,8 @@ func TestOutputGetValue(t *testing.T) { } func TestAttrGetData(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Attributes[0])) err := context.attrGetData(v) @@ -284,7 +307,8 @@ func TestAttrGetData(t *testing.T) { } func TestAttrGetUsage(t *testing.T) { - v, tx, context := createVMAndTX(t) + v, tx, context, chain := createVMAndTX(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(&tx.Attributes[0])) err := context.attrGetUsage(v) @@ -294,7 +318,8 @@ func TestAttrGetUsage(t *testing.T) { } func TestAccountGetScriptHash(t *testing.T) { - v, accState, context := createVMAndAccState(t) + v, accState, context, chain := createVMAndAccState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(accState)) err := context.accountGetScriptHash(v) @@ -304,7 +329,8 @@ func TestAccountGetScriptHash(t *testing.T) { } func TestAccountGetVotes(t *testing.T) { - v, accState, context := createVMAndAccState(t) + v, accState, context, chain := createVMAndAccState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(accState)) err := context.accountGetVotes(v) @@ -314,7 +340,8 @@ func TestAccountGetVotes(t *testing.T) { } func TestContractGetScript(t *testing.T) { - v, contractState, context := createVMAndContractState(t) + v, contractState, context, chain := createVMAndContractState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(contractState)) err := context.contractGetScript(v) @@ -324,7 +351,8 @@ func TestContractGetScript(t *testing.T) { } func TestContractIsPayable(t *testing.T) { - v, contractState, context := createVMAndContractState(t) + v, contractState, context, chain := createVMAndContractState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(contractState)) err := context.contractIsPayable(v) @@ -334,7 +362,8 @@ func TestContractIsPayable(t *testing.T) { } func TestAssetGetAdmin(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetAdmin(v) @@ -344,7 +373,8 @@ func TestAssetGetAdmin(t *testing.T) { } func TestAssetGetAmount(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetAmount(v) @@ -354,7 +384,8 @@ func TestAssetGetAmount(t *testing.T) { } func TestAssetGetAssetID(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetAssetID(v) @@ -364,7 +395,8 @@ func TestAssetGetAssetID(t *testing.T) { } func TestAssetGetAssetType(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetAssetType(v) @@ -374,7 +406,8 @@ func TestAssetGetAssetType(t *testing.T) { } func TestAssetGetAvailable(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetAvailable(v) @@ -384,7 +417,8 @@ func TestAssetGetAvailable(t *testing.T) { } func TestAssetGetIssuer(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetIssuer(v) @@ -394,7 +428,8 @@ func TestAssetGetIssuer(t *testing.T) { } func TestAssetGetOwner(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetOwner(v) @@ -404,7 +439,8 @@ func TestAssetGetOwner(t *testing.T) { } func TestAssetGetPrecision(t *testing.T) { - v, assetState, context := createVMAndAssetState(t) + v, assetState, context, chain := createVMAndAssetState(t) + defer chain.Close() v.Estack().PushVal(vm.NewInteropItem(assetState)) err := context.assetGetPrecision(v) @@ -415,21 +451,22 @@ func TestAssetGetPrecision(t *testing.T) { // Helper functions to create VM, InteropContext, TX, Account, Contract, Asset. -func createVMAndPushBlock(t *testing.T) (*vm.VM, *Block, *interopContext) { +func createVMAndPushBlock(t *testing.T) (*vm.VM, *Block, *interopContext, *Blockchain) { v := vm.New() block := newDumbBlock() - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil) + chain := newTestChain(t) + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), block, nil) v.Estack().PushVal(vm.NewInteropItem(block)) - return v, block, context + return v, block, context, chain } -func createVMAndPushTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopContext) { - v, tx, context := createVMAndTX(t) +func createVMAndPushTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopContext, *Blockchain) { + v, tx, context, chain := createVMAndTX(t) v.Estack().PushVal(vm.NewInteropItem(tx)) - return v, tx, context + return v, tx, context, chain } -func createVMAndAssetState(t *testing.T) (*vm.VM, *state.Asset, *interopContext) { +func createVMAndAssetState(t *testing.T) (*vm.VM, *state.Asset, *interopContext, *Blockchain) { v := vm.New() assetState := &state.Asset{ ID: util.Uint256{}, @@ -447,11 +484,12 @@ func createVMAndAssetState(t *testing.T) (*vm.VM, *state.Asset, *interopContext) IsFrozen: false, } - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) - return v, assetState, context + chain := newTestChain(t) + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) + return v, assetState, context, chain } -func createVMAndContractState(t *testing.T) (*vm.VM, *state.Contract, *interopContext) { +func createVMAndContractState(t *testing.T) (*vm.VM, *state.Contract, *interopContext, *Blockchain) { v := vm.New() contractState := &state.Contract{ Script: []byte("testscript"), @@ -465,11 +503,12 @@ func createVMAndContractState(t *testing.T) (*vm.VM, *state.Contract, *interopCo Description: random.String(10), } - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) - return v, contractState, context + chain := newTestChain(t) + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) + return v, contractState, context, chain } -func createVMAndAccState(t *testing.T) (*vm.VM, *state.Account, *interopContext) { +func createVMAndAccState(t *testing.T) (*vm.VM, *state.Account, *interopContext, *Blockchain) { v := vm.New() rawHash := "4d3b96ae1bcc5a585e075e3b81920210dec16302" hash, err := util.Uint160DecodeStringBE(rawHash) @@ -479,11 +518,12 @@ func createVMAndAccState(t *testing.T) (*vm.VM, *state.Account, *interopContext) accountState.Votes = []*keys.PublicKey{key} require.NoError(t, err) - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) - return v, accountState, context + chain := newTestChain(t) + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) + return v, accountState, context, chain } -func createVMAndTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopContext) { +func createVMAndTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopContext, *Blockchain) { v := vm.New() script := []byte{byte(opcode.PUSH1), byte(opcode.RET)} tx := transaction.NewInvocationTX(script, 0) @@ -509,6 +549,7 @@ func createVMAndTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopCont tx.Attributes = attributes tx.Inputs = inputs tx.Outputs = outputs - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, tx) - return v, tx, context + chain := newTestChain(t) + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, tx) + return v, tx, context, chain } diff --git a/pkg/core/interop_system.go b/pkg/core/interop_system.go index 91f919d32..b96b5a0ec 100644 --- a/pkg/core/interop_system.go +++ b/pkg/core/interop_system.go @@ -13,7 +13,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/vm" gherr "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -350,7 +350,9 @@ func (ic *interopContext) runtimeNotify(v *vm.VM) error { // runtimeLog logs the message passed. func (ic *interopContext) runtimeLog(v *vm.VM) error { msg := fmt.Sprintf("%q", v.Estack().Pop().Bytes()) - log.Infof("script %s logs: %s", getContextScriptHash(v, 0), msg) + ic.log.Info("runtime log", + zap.Stringer("script", getContextScriptHash(v, 0)), + zap.String("logs", msg)) return nil } diff --git a/pkg/core/interops.go b/pkg/core/interops.go index 3e9b32941..bf33b347d 100644 --- a/pkg/core/interops.go +++ b/pkg/core/interops.go @@ -14,6 +14,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/vm" + "go.uber.org/zap" ) type interopContext struct { @@ -23,12 +24,13 @@ type interopContext struct { tx *transaction.Transaction dao *cachedDao notifications []state.NotificationEvent + log *zap.Logger } -func newInteropContext(trigger byte, bc Blockchainer, s storage.Store, block *Block, tx *transaction.Transaction) *interopContext { +func newInteropContext(trigger byte, bc Blockchainer, s storage.Store, block *Block, tx *transaction.Transaction, log *zap.Logger) *interopContext { dao := newCachedDao(s) nes := make([]state.NotificationEvent, 0) - return &interopContext{bc, trigger, block, tx, dao, nes} + return &interopContext{bc, trigger, block, tx, dao, nes, log} } // interopedFunction binds function name, id with the function itself and price, diff --git a/pkg/core/interops_test.go b/pkg/core/interops_test.go index 490a8402b..ab45d6dbd 100644 --- a/pkg/core/interops_test.go +++ b/pkg/core/interops_test.go @@ -14,7 +14,9 @@ import ( func testNonInterop(t *testing.T, value interface{}, f func(*interopContext, *vm.VM) error) { v := vm.New() v.Estack().PushVal(value) - context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) + chain := newTestChain(t) + defer chain.Close() + context := chain.newInteropContext(trigger.Application, storage.NewMemoryStore(), nil, nil) require.Error(t, f(context, v)) } diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index c0c1e5f9e..4d1d10ed3 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -7,7 +7,6 @@ import ( "github.com/CityOfZion/neo-go/pkg/io" "github.com/etcd-io/bbolt" - log "github.com/sirupsen/logrus" "github.com/syndtr/goleveldb/leveldb/util" ) @@ -109,7 +108,7 @@ func (s *BoltDBStore) Seek(key []byte, f func(k, v []byte)) { return nil }) if err != nil { - log.Error("error while executing seek in boltDB") + panic(err) } } diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index 175aa7e7d..bbd9f176a 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -3,17 +3,23 @@ package network import ( "github.com/CityOfZion/neo-go/pkg/core" "github.com/Workiva/go-datastructures/queue" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) type blockQueue struct { + log *zap.Logger queue *queue.PriorityQueue checkBlocks chan struct{} chain core.Blockchainer } -func newBlockQueue(capacity int, bc core.Blockchainer) *blockQueue { +func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQueue { + if log == nil { + return nil + } + return &blockQueue{ + log: log, queue: queue.NewPriorityQueue(capacity, false), checkBlocks: make(chan struct{}, 1), chain: bc, @@ -38,11 +44,10 @@ func (bq *blockQueue) run() { if minblock.Index == bq.chain.BlockHeight()+1 { err := bq.chain.AddBlock(minblock) if err != nil { - log.WithFields(log.Fields{ - "error": err.Error(), - "blockHeight": bq.chain.BlockHeight(), - "nextIndex": minblock.Index, - }).Warn("blockQueue: failed adding block into the blockchain") + bq.log.Warn("blockQueue: failed adding block into the blockchain", + zap.String("error", err.Error()), + zap.Uint32("blockHeight", bq.chain.BlockHeight()), + zap.Uint32("nextIndex", minblock.Index)) } } } else { diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index da4124c06..f30b47a0a 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -6,12 +6,13 @@ import ( "github.com/CityOfZion/neo-go/pkg/core" "github.com/stretchr/testify/assert" + "go.uber.org/zap/zaptest" ) func TestBlockQueue(t *testing.T) { chain := &testChain{} // notice, it's not yet running - bq := newBlockQueue(0, chain) + bq := newBlockQueue(0, chain, zaptest.NewLogger(t)) blocks := make([]*core.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &core.Block{BlockBase: core.BlockBase{Index: uint32(i)}} diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index da2039650..5fdc86e5f 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -16,6 +16,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/vm" + "go.uber.org/zap/zaptest" ) type testChain struct { @@ -203,7 +204,7 @@ func (p *localPeer) Handshaked() bool { return p.handshaked } -func newTestServer() *Server { +func newTestServer(t *testing.T) *Server { return &Server{ ServerConfig: ServerConfig{}, chain: &testChain{}, @@ -214,6 +215,7 @@ func newTestServer() *Server { register: make(chan Peer), unregister: make(chan peerDrop), peers: make(map[Peer]bool), + log: zaptest.NewLogger(t), } } diff --git a/pkg/network/message.go b/pkg/network/message.go index 16802a2ba..087724967 100644 --- a/pkg/network/message.go +++ b/pkg/network/message.go @@ -201,13 +201,11 @@ func (m *Message) decodePayload(br *io.BinReader) error { return fmt.Errorf("can't decode command %s", cmdByteArrayToString(m.Command)) } p.DecodeBinary(r) - if r.Err != nil { - return r.Err + if r.Err == nil || r.Err == payload.ErrTooManyHeaders { + m.Payload = p } - m.Payload = p - - return nil + return r.Err } // Encode encodes a Message to any given BinWriter. diff --git a/pkg/network/metrics/metrics.go b/pkg/network/metrics/metrics.go index ae528990e..15a711618 100644 --- a/pkg/network/metrics/metrics.go +++ b/pkg/network/metrics/metrics.go @@ -4,13 +4,14 @@ import ( "context" "net/http" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // Service serves metrics. type Service struct { *http.Server config Config + log *zap.Logger serviceType string } @@ -24,27 +25,21 @@ type Config struct { // Start runs http service with exposed endpoint on configured port. func (ms *Service) Start() { if ms.config.Enabled { - log.WithFields(log.Fields{ - "endpoint": ms.Addr, - "service": ms.serviceType, - }).Info("service running") + ms.log.Info("service is running", zap.String("endpoint", ms.Addr)) err := ms.ListenAndServe() if err != nil && err != http.ErrServerClosed { - log.Warnf("%s service couldn't start on configured port", ms.serviceType) + ms.log.Warn("service couldn't start on configured port") } } else { - log.Infof("%s service hasn't started since it's disabled", ms.serviceType) + ms.log.Info("service hasn't started since it's disabled") } } // ShutDown stops service. func (ms *Service) ShutDown() { - log.WithFields(log.Fields{ - "endpoint": ms.Addr, - "service": ms.serviceType, - }).Info("shutting down service") + ms.log.Info("shutting down service", zap.String("endpoint", ms.Addr)) err := ms.Shutdown(context.Background()) if err != nil { - log.Fatalf("can't shut down %s service", ms.serviceType) + ms.log.Panic("can't shut down service") } } diff --git a/pkg/network/metrics/pprof.go b/pkg/network/metrics/pprof.go index f41c560e6..13b853476 100644 --- a/pkg/network/metrics/pprof.go +++ b/pkg/network/metrics/pprof.go @@ -3,13 +3,19 @@ package metrics import ( "net/http" "net/http/pprof" + + "go.uber.org/zap" ) // PprofService https://golang.org/pkg/net/http/pprof/. type PprofService Service // NewPprofService created new service for gathering pprof metrics. -func NewPprofService(cfg Config) *Service { +func NewPprofService(cfg Config, log *zap.Logger) *Service { + if log == nil { + return nil + } + handler := http.NewServeMux() handler.HandleFunc("/debug/pprof/", pprof.Index) handler.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) @@ -18,9 +24,12 @@ func NewPprofService(cfg Config) *Service { handler.HandleFunc("/debug/pprof/trace", pprof.Trace) return &Service{ - &http.Server{ + Server: &http.Server{ Addr: cfg.Address + ":" + cfg.Port, Handler: handler, - }, cfg, "Pprof", + }, + config: cfg, + serviceType: "Pprof", + log: log.With(zap.String("service", "Pprof")), } } diff --git a/pkg/network/metrics/prometheus.go b/pkg/network/metrics/prometheus.go index 25cf80281..9bfe54205 100644 --- a/pkg/network/metrics/prometheus.go +++ b/pkg/network/metrics/prometheus.go @@ -4,17 +4,25 @@ import ( "net/http" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" ) // PrometheusService https://prometheus.io/docs/guides/go-application. type PrometheusService Service // NewPrometheusService creates new service for gathering prometheus metrics. -func NewPrometheusService(cfg Config) *Service { +func NewPrometheusService(cfg Config, log *zap.Logger) *Service { + if log == nil { + return nil + } + return &Service{ - &http.Server{ + Server: &http.Server{ Addr: cfg.Address + ":" + cfg.Port, Handler: promhttp.Handler(), - }, cfg, "Prometheus", + }, + config: cfg, + serviceType: "Prometheus", + log: log.With(zap.String("service", "Prometheus")), } } diff --git a/pkg/network/payload/headers.go b/pkg/network/payload/headers.go index 935bd05de..57160c190 100644 --- a/pkg/network/payload/headers.go +++ b/pkg/network/payload/headers.go @@ -3,7 +3,7 @@ package payload import ( "github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/io" - log "github.com/sirupsen/logrus" + "github.com/pkg/errors" ) // Headers payload. @@ -16,13 +16,17 @@ const ( MaxHeadersAllowed = 2000 ) +// ErrTooManyHeaders is an error returned when too many headers were received. +var ErrTooManyHeaders = errors.Errorf("too many headers were received (max: %d)", MaxHeadersAllowed) + // DecodeBinary implements Serializable interface. func (p *Headers) DecodeBinary(br *io.BinReader) { lenHeaders := br.ReadVarUint() + var limitExceeded bool + // C# node does it silently - if lenHeaders > MaxHeadersAllowed { - log.Warnf("received %d headers, capping to %d", lenHeaders, MaxHeadersAllowed) + if limitExceeded = lenHeaders > MaxHeadersAllowed; limitExceeded { lenHeaders = MaxHeadersAllowed } @@ -33,6 +37,10 @@ func (p *Headers) DecodeBinary(br *io.BinReader) { header.DecodeBinary(br) p.Hdrs[i] = header } + + if br.Err == nil && limitExceeded { + br.Err = ErrTooManyHeaders + } } // EncodeBinary implements Serializable interface. diff --git a/pkg/network/payload/headers_test.go b/pkg/network/payload/headers_test.go index 44c2b001f..22762cc0e 100644 --- a/pkg/network/payload/headers_test.go +++ b/pkg/network/payload/headers_test.go @@ -11,36 +11,39 @@ import ( ) func TestHeadersEncodeDecode(t *testing.T) { - headers := &Headers{[]*core.Header{ - { - BlockBase: core.BlockBase{ - Version: 0, - Index: 1, - Script: transaction.Witness{ - InvocationScript: []byte{0x0}, - VerificationScript: []byte{0x1}, - }, - }}, - { - BlockBase: core.BlockBase{ - Version: 0, - Index: 2, - Script: transaction.Witness{ - InvocationScript: []byte{0x0}, - VerificationScript: []byte{0x1}, - }, - }}, - { - BlockBase: core.BlockBase{ - Version: 0, - Index: 3, - Script: transaction.Witness{ - InvocationScript: []byte{0x0}, - VerificationScript: []byte{0x1}, - }, - }}, - }} + t.Run("normal case", func(t *testing.T) { + headers := newTestHeaders(3) + testHeadersEncodeDecode(t, headers, 3, false) + }) + + t.Run("more than max", func(t *testing.T) { + const sent = MaxHeadersAllowed + 1 + headers := newTestHeaders(sent) + + testHeadersEncodeDecode(t, headers, MaxHeadersAllowed, true) + }) +} + +func newTestHeaders(n int) *Headers { + headers := &Headers{Hdrs: make([]*core.Header, n)} + + for i := range headers.Hdrs { + headers.Hdrs[i] = &core.Header{ + BlockBase: core.BlockBase{ + Index: uint32(i + 1), + Script: transaction.Witness{ + InvocationScript: []byte{0x0}, + VerificationScript: []byte{0x1}, + }, + }, + } + } + + return headers +} + +func testHeadersEncodeDecode(t *testing.T, headers *Headers, expected int, limit bool) { buf := io.NewBufBinWriter() headers.EncodeBinary(buf.BinWriter) assert.Nil(t, buf.Err) @@ -49,9 +52,16 @@ func TestHeadersEncodeDecode(t *testing.T) { r := io.NewBinReaderFromBuf(b) headersDecode := &Headers{} headersDecode.DecodeBinary(r) - assert.Nil(t, r.Err) - for i := 0; i < len(headers.Hdrs); i++ { + var err error + if limit { + err = ErrTooManyHeaders + } + + assert.Equal(t, err, r.Err) + assert.Equal(t, expected, len(headersDecode.Hdrs)) + + for i := 0; i < len(headersDecode.Hdrs); i++ { assert.Equal(t, headers.Hdrs[i].Version, headersDecode.Hdrs[i].Version) assert.Equal(t, headers.Hdrs[i].Index, headersDecode.Hdrs[i].Index) assert.Equal(t, headers.Hdrs[i].Script, headersDecode.Hdrs[i].Script) diff --git a/pkg/network/server.go b/pkg/network/server.go index a9229d790..a8812b474 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -15,8 +15,8 @@ import ( "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/util" - log "github.com/sirupsen/logrus" "go.uber.org/atomic" + "go.uber.org/zap" ) const ( @@ -65,6 +65,8 @@ type ( quit chan struct{} connected *atomic.Bool + + log *zap.Logger } peerDrop struct { @@ -80,11 +82,15 @@ func randomID() uint32 { } // NewServer returns a new Server, initialized with the given configuration. -func NewServer(config ServerConfig, chain core.Blockchainer) *Server { +func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) *Server { + if log == nil { + return nil + } + s := &Server{ ServerConfig: config, chain: chain, - bQueue: newBlockQueue(maxBlockBatch, chain), + bQueue: newBlockQueue(maxBlockBatch, chain, log), id: randomID(), quit: make(chan struct{}), addrReq: make(chan *Message, config.MinPeers), @@ -92,9 +98,11 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server { unregister: make(chan peerDrop), peers: make(map[Peer]bool), connected: atomic.NewBool(false), + log: log, } srv, err := consensus.NewService(consensus.Config{ + Logger: log, Broadcast: s.handleNewPayload, RelayBlock: s.relayBlock, Chain: chain, @@ -108,30 +116,27 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server { s.consensus = srv if s.MinPeers <= 0 { - log.WithFields(log.Fields{ - "MinPeers configured": s.MinPeers, - "MinPeers actual": defaultMinPeers, - }).Info("bad MinPeers configured, using the default value") + s.log.Info("bad MinPeers configured, using the default value", + zap.Int("configured", s.MinPeers), + zap.Int("actual", defaultMinPeers)) s.MinPeers = defaultMinPeers } if s.MaxPeers <= 0 { - log.WithFields(log.Fields{ - "MaxPeers configured": s.MaxPeers, - "MaxPeers actual": defaultMaxPeers, - }).Info("bad MaxPeers configured, using the default value") + s.log.Info("bad MaxPeers configured, using the default value", + zap.Int("configured", s.MaxPeers), + zap.Int("actual", defaultMaxPeers)) s.MaxPeers = defaultMaxPeers } if s.AttemptConnPeers <= 0 { - log.WithFields(log.Fields{ - "AttemptConnPeers configured": s.AttemptConnPeers, - "AttemptConnPeers actual": defaultAttemptConnPeers, - }).Info("bad AttemptConnPeers configured, using the default value") + s.log.Info("bad AttemptConnPeers configured, using the default value", + zap.Int("configured", s.AttemptConnPeers), + zap.Int("actual", defaultAttemptConnPeers)) s.AttemptConnPeers = defaultAttemptConnPeers } - s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port)) + s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port), s.log) s.discovery = NewDefaultDiscovery( s.DialTimeout, s.transport, @@ -147,10 +152,9 @@ func (s *Server) ID() uint32 { // Start will start the server and its underlying transport. func (s *Server) Start(errChan chan error) { - log.WithFields(log.Fields{ - "blockHeight": s.chain.BlockHeight(), - "headerHeight": s.chain.HeaderHeight(), - }).Info("node started") + s.log.Info("node started", + zap.Uint32("blockHeight", s.chain.BlockHeight()), + zap.Uint32("headerHeight", s.chain.HeaderHeight())) s.discovery.BackFill(s.Seeds...) @@ -162,9 +166,7 @@ func (s *Server) Start(errChan chan error) { // Shutdown disconnects all peers and stops listening. func (s *Server) Shutdown() { - log.WithFields(log.Fields{ - "peers": s.PeerCount(), - }).Info("shutting down server") + s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.bQueue.discard() close(s.quit) } @@ -205,9 +207,7 @@ func (s *Server) run() { s.lock.Lock() s.peers[p] = true s.lock.Unlock() - log.WithFields(log.Fields{ - "addr": p.RemoteAddr(), - }).Info("new peer connected") + s.log.Info("new peer connected", zap.Stringer("addr", p.RemoteAddr())) peerCount := s.PeerCount() if peerCount > s.MaxPeers { s.lock.RLock() @@ -225,11 +225,10 @@ func (s *Server) run() { if s.peers[drop.peer] { delete(s.peers, drop.peer) s.lock.Unlock() - log.WithFields(log.Fields{ - "addr": drop.peer.RemoteAddr(), - "reason": drop.reason, - "peerCount": s.PeerCount(), - }).Warn("peer disconnected") + s.log.Warn("peer disconnected", + zap.Stringer("addr", drop.peer.RemoteAddr()), + zap.String("reason", drop.reason.Error()), + zap.Int("peerCount", s.PeerCount())) addr := drop.peer.PeerAddr().String() if drop.reason == errIdenticalID { s.discovery.RegisterBadAddr(addr) @@ -254,7 +253,7 @@ func (s *Server) tryStartConsensus() { } if s.HandshakedPeersCount() >= s.MinPeers { - log.Info("minimum amount of peers were connected to") + s.log.Info("minimum amount of peers were connected to") if s.connected.CAS(false, true) { s.consensus.Start() } @@ -304,12 +303,11 @@ func (s *Server) HandshakedPeersCount() int { func (s *Server) startProtocol(p Peer) { var err error - log.WithFields(log.Fields{ - "addr": p.RemoteAddr(), - "userAgent": string(p.Version().UserAgent), - "startHeight": p.Version().StartHeight, - "id": p.Version().Nonce, - }).Info("started protocol") + s.log.Info("started protocol", + zap.Stringer("addr", p.RemoteAddr()), + zap.ByteString("userAgent", p.Version().UserAgent), + zap.Uint32("startHeight", p.Version().StartHeight), + zap.Uint32("id", p.Version().Nonce)) s.discovery.RegisterGoodAddr(p.PeerAddr().String()) if s.chain.HeaderHeight() < p.Version().StartHeight { @@ -386,7 +384,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // This method could best be called in a separate routine. func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) { if err := s.chain.AddHeaders(headers.Hdrs...); err != nil { - log.Warnf("failed processing headers: %s", err) + s.log.Warn("failed processing headers", zap.Error(err)) return } // The peer will respond with a maximum of 2000 headers in one batch. diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index b51daf0e1..524557d3e 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -4,7 +4,7 @@ import ( "time" "github.com/CityOfZion/neo-go/config" - log "github.com/sirupsen/logrus" + "go.uber.org/zap/zapcore" ) type ( @@ -53,7 +53,7 @@ type ( ProtoTickInterval time.Duration // Level of the internal logger. - LogLevel log.Level + LogLevel zapcore.Level // Wallet is a wallet configuration. Wallet *config.WalletConfig diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index bcde70b8c..39f2caedc 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -11,7 +11,7 @@ import ( func TestSendVersion(t *testing.T) { var ( - s = newTestServer() + s = newTestServer(t) p = newLocalPeer(t) ) s.Port = 3000 @@ -37,7 +37,7 @@ func TestSendVersion(t *testing.T) { // Server should reply with a verack after receiving a valid version. func TestVerackAfterHandleVersionCmd(t *testing.T) { var ( - s = newTestServer() + s = newTestServer(t) p = newLocalPeer(t) ) na, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:3000") @@ -58,7 +58,7 @@ func TestVerackAfterHandleVersionCmd(t *testing.T) { // invalid version and disconnects the peer. func TestServerNotSendsVerack(t *testing.T) { var ( - s = newTestServer() + s = newTestServer(t) p = newLocalPeer(t) p2 = newLocalPeer(t) ) @@ -91,7 +91,7 @@ func TestServerNotSendsVerack(t *testing.T) { func TestRequestHeaders(t *testing.T) { var ( - s = newTestServer() + s = newTestServer(t) p = newLocalPeer(t) ) p.messageHandler = func(t *testing.T, msg *Message) { diff --git a/pkg/network/tcp_transport.go b/pkg/network/tcp_transport.go index 654515758..2055529d6 100644 --- a/pkg/network/tcp_transport.go +++ b/pkg/network/tcp_transport.go @@ -6,11 +6,13 @@ import ( "time" "github.com/CityOfZion/neo-go/pkg/io" - log "github.com/sirupsen/logrus" + "github.com/CityOfZion/neo-go/pkg/network/payload" + "go.uber.org/zap" ) // TCPTransport allows network communication over TCP. type TCPTransport struct { + log *zap.Logger server *Server listener net.Listener bindAddr string @@ -20,8 +22,9 @@ var reClosedNetwork = regexp.MustCompile(".* use of closed network connection") // NewTCPTransport returns a new TCPTransport that will listen for // new incoming peer connections. -func NewTCPTransport(s *Server, bindAddr string) *TCPTransport { +func NewTCPTransport(s *Server, bindAddr string, log *zap.Logger) *TCPTransport { return &TCPTransport{ + log: log, server: s, bindAddr: bindAddr, } @@ -41,7 +44,7 @@ func (t *TCPTransport) Dial(addr string, timeout time.Duration) error { func (t *TCPTransport) Accept() { l, err := net.Listen("tcp", t.bindAddr) if err != nil { - log.Fatalf("TCP listen error %s", err) + t.log.Panic("TCP listen error", zap.Error(err)) return } @@ -50,7 +53,7 @@ func (t *TCPTransport) Accept() { for { conn, err := l.Accept() if err != nil { - log.Warnf("TCP accept error: %s", err) + t.log.Warn("TCP accept error", zap.Error(err)) if t.isCloseError(err) { break } @@ -80,14 +83,17 @@ func (t *TCPTransport) handleConn(conn net.Conn) { // When a new peer is connected we send out our version immediately. if err := t.server.sendVersion(p); err != nil { - log.WithFields(log.Fields{ - "addr": p.RemoteAddr(), - }).Error(err) + t.log.Error("error on sendVersion", zap.Stringer("addr", p.RemoteAddr()), zap.Error(err)) } r := io.NewBinReaderFromIO(p.conn) for { msg := &Message{} - if err = msg.Decode(r); err != nil { + err := msg.Decode(r) + + if err == payload.ErrTooManyHeaders { + t.log.Warn("not all headers were processed") + r.Err = nil + } else if err != nil { break } if err = t.server.handleMessage(p, msg); err != nil { diff --git a/pkg/rpc/request.go b/pkg/rpc/request.go index ddd2bb816..5a3c21b9a 100644 --- a/pkg/rpc/request.go +++ b/pkg/rpc/request.go @@ -6,7 +6,7 @@ import ( "net/http" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -17,11 +17,10 @@ type ( // Request represents a standard JSON-RPC 2.0 // request: http://www.jsonrpc.org/specification#request_object. Request struct { - JSONRPC string `json:"jsonrpc"` - Method string `json:"method"` - RawParams json.RawMessage `json:"params,omitempty"` - RawID json.RawMessage `json:"id,omitempty"` - enableCORSWorkaround bool + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + RawParams json.RawMessage `json:"params,omitempty"` + RawID json.RawMessage `json:"id,omitempty"` } // Response represents a standard JSON-RPC 2.0 @@ -35,10 +34,9 @@ type ( ) // NewRequest creates a new Request struct. -func NewRequest(corsWorkaround bool) *Request { +func NewRequest() *Request { return &Request{ - JSONRPC: jsonRPCVersion, - enableCORSWorkaround: corsWorkaround, + JSONRPC: jsonRPCVersion, } } @@ -73,7 +71,7 @@ func (r *Request) Params() (*Params, error) { } // WriteErrorResponse writes an error response to the ResponseWriter. -func (r Request) WriteErrorResponse(w http.ResponseWriter, err error) { +func (s *Server) WriteErrorResponse(r *Request, w http.ResponseWriter, err error) { jsonErr, ok := err.(*Error) if !ok { jsonErr = NewInternalServerError("Internal server error", err) @@ -85,34 +83,36 @@ func (r Request) WriteErrorResponse(w http.ResponseWriter, err error) { ID: r.RawID, } - logFields := log.Fields{ - "err": jsonErr.Cause, - "method": r.Method, - } - params, err := r.Params() - if err == nil { - logFields["params"] = *params + logFields := []zap.Field{ + zap.Error(jsonErr.Cause), + zap.String("method", r.Method), } - log.WithFields(logFields).Error("Error encountered with rpc request") + params, err := r.Params() + if err == nil { + logFields = append(logFields, zap.Any("params", params)) + } + + s.log.Error("Error encountered with rpc request", logFields...) + w.WriteHeader(jsonErr.HTTPCode) - r.writeServerResponse(w, response) + s.writeServerResponse(r, w, response) } // WriteResponse encodes the response and writes it to the ResponseWriter. -func (r Request) WriteResponse(w http.ResponseWriter, result interface{}) { +func (s *Server) WriteResponse(r *Request, w http.ResponseWriter, result interface{}) { response := Response{ JSONRPC: r.JSONRPC, Result: result, ID: r.RawID, } - r.writeServerResponse(w, response) + s.writeServerResponse(r, w, response) } -func (r Request) writeServerResponse(w http.ResponseWriter, response Response) { +func (s *Server) writeServerResponse(r *Request, w http.ResponseWriter, response Response) { w.Header().Set("Content-Type", "application/json; charset=utf-8") - if r.enableCORSWorkaround { + if s.config.EnableCORSWorkaround { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Access-Control-Allow-Headers, Authorization, X-Requested-With") } @@ -120,12 +120,9 @@ func (r Request) writeServerResponse(w http.ResponseWriter, response Response) { encoder := json.NewEncoder(w) err := encoder.Encode(response) - logFields := log.Fields{ - "err": err, - "method": r.Method, - } - if err != nil { - log.WithFields(logFields).Error("Error encountered while encoding response") + s.log.Error("Error encountered while encoding response", + zap.String("err", err.Error()), + zap.String("method", r.Method)) } } diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index e9d30c8a0..83c46e52d 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -16,7 +16,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/rpc/wrappers" "github.com/CityOfZion/neo-go/pkg/util" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) type ( @@ -26,6 +26,7 @@ type ( chain core.Blockchainer config config.RPCConfig coreServer *network.Server + log *zap.Logger } ) @@ -34,7 +35,7 @@ var invalidBlockHeightError = func(index int, height int) error { } // NewServer creates a new Server struct. -func NewServer(chain core.Blockchainer, conf config.RPCConfig, coreServer *network.Server) Server { +func NewServer(chain core.Blockchainer, conf config.RPCConfig, coreServer *network.Server, log *zap.Logger) Server { httpServer := &http.Server{ Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10), } @@ -44,6 +45,7 @@ func NewServer(chain core.Blockchainer, conf config.RPCConfig, coreServer *netwo chain: chain, config: conf, coreServer: coreServer, + log: log, } } @@ -51,13 +53,11 @@ func NewServer(chain core.Blockchainer, conf config.RPCConfig, coreServer *netwo // listening on the configured port. func (s *Server) Start(errChan chan error) { if !s.config.Enabled { - log.Info("RPC server is not enabled") + s.log.Info("RPC server is not enabled") return } s.Handler = http.HandlerFunc(s.requestHandler) - log.WithFields(log.Fields{ - "endpoint": s.Addr, - }).Info("starting rpc-server") + s.log.Info("starting rpc-server", zap.String("endpoint", s.Addr)) errChan <- s.ListenAndServe() } @@ -65,17 +65,16 @@ func (s *Server) Start(errChan chan error) { // Shutdown overrides the http.Server Shutdown // method. func (s *Server) Shutdown() error { - log.WithFields(log.Fields{ - "endpoint": s.Addr, - }).Info("shutting down rpc-server") + s.log.Info("shutting down rpc-server", zap.String("endpoint", s.Addr)) return s.Server.Shutdown(context.Background()) } func (s *Server) requestHandler(w http.ResponseWriter, httpRequest *http.Request) { - req := NewRequest(s.config.EnableCORSWorkaround) + req := NewRequest() if httpRequest.Method != "POST" { - req.WriteErrorResponse( + s.WriteErrorResponse( + req, w, NewInvalidParamsError( fmt.Sprintf("Invalid method '%s', please retry with 'POST'", httpRequest.Method), nil, @@ -86,13 +85,13 @@ func (s *Server) requestHandler(w http.ResponseWriter, httpRequest *http.Request err := req.DecodeData(httpRequest.Body) if err != nil { - req.WriteErrorResponse(w, NewParseError("Problem parsing JSON-RPC request body", err)) + s.WriteErrorResponse(req, w, NewParseError("Problem parsing JSON-RPC request body", err)) return } reqParams, err := req.Params() if err != nil { - req.WriteErrorResponse(w, NewInvalidParamsError("Problem parsing request parameters", err)) + s.WriteErrorResponse(req, w, NewInvalidParamsError("Problem parsing request parameters", err)) return } @@ -100,10 +99,9 @@ func (s *Server) requestHandler(w http.ResponseWriter, httpRequest *http.Request } func (s *Server) methodHandler(w http.ResponseWriter, req *Request, reqParams Params) { - log.WithFields(log.Fields{ - "method": req.Method, - "params": fmt.Sprintf("%v", reqParams), - }).Info("processing rpc request") + s.log.Info("processing rpc request", + zap.String("method", req.Method), + zap.String("params", fmt.Sprintf("%v", reqParams))) var ( results interface{} @@ -268,11 +266,11 @@ Methods: } if resultsErr != nil { - req.WriteErrorResponse(w, resultsErr) + s.WriteErrorResponse(req, w, resultsErr) return } - req.WriteResponse(w, results) + s.WriteResponse(req, w, results) } func (s *Server) getrawtransaction(reqParams Params) (interface{}, error) { diff --git a/pkg/rpc/server_helper_test.go b/pkg/rpc/server_helper_test.go index c1091d8d2..5b0590f69 100644 --- a/pkg/rpc/server_helper_test.go +++ b/pkg/rpc/server_helper_test.go @@ -13,6 +13,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/rpc/result" "github.com/CityOfZion/neo-go/pkg/rpc/wrappers" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" ) // ErrorResponse struct represents JSON-RPC error. @@ -180,7 +181,8 @@ func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFu require.NoError(t, err, "could not load config") memoryStore := storage.NewMemoryStore() - chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration) + logger := zaptest.NewLogger(t) + chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration, logger) require.NoError(t, err, "could not create chain") go chain.Run() @@ -198,8 +200,8 @@ func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFu } serverConfig := network.NewServerConfig(cfg) - server := network.NewServer(serverConfig, chain) - rpcServer := NewServer(chain, cfg.ApplicationConfiguration.RPC, server) + server := network.NewServer(serverConfig, chain, logger) + rpcServer := NewServer(chain, cfg.ApplicationConfiguration.RPC, server, logger) handler := http.HandlerFunc(rpcServer.requestHandler) return chain, handler diff --git a/pkg/rpc/server_test.go b/pkg/rpc/server_test.go index 4b86ac271..7656ebeff 100644 --- a/pkg/rpc/server_test.go +++ b/pkg/rpc/server_test.go @@ -417,6 +417,8 @@ var rpcTestCases = map[string][]rpcTestCase{ func TestRPC(t *testing.T) { chain, handler := initServerWithInMemoryChain(t) + defer chain.Close() + e := &executor{chain: chain, handler: handler} for method, cases := range rpcTestCases { t.Run(method, func(t *testing.T) {