cli,pkg: use zap.Logger

This commit is contained in:
Evgenii Stratonikov 2019-12-30 10:43:05 +03:00
parent 9c79684516
commit aecdf470e7
23 changed files with 208 additions and 180 deletions

View file

@ -15,8 +15,9 @@ import (
"github.com/CityOfZion/neo-go/pkg/network/metrics" "github.com/CityOfZion/neo-go/pkg/network/metrics"
"github.com/CityOfZion/neo-go/pkg/rpc" "github.com/CityOfZion/neo-go/pkg/rpc"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli" "github.com/urfave/cli"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
) )
// NewCommands returns 'node' command. // NewCommands returns 'node' command.
@ -119,32 +120,40 @@ func getConfigFromContext(ctx *cli.Context) (config.Config, error) {
// handleLoggingParams reads logging parameters. // handleLoggingParams reads logging parameters.
// If user selected debug level -- function enables it. // If user selected debug level -- function enables it.
// If logPath is configured -- function creates dir and file for logging. // If logPath is configured -- function creates dir and file for logging.
func handleLoggingParams(ctx *cli.Context, cfg config.ApplicationConfiguration) error { func handleLoggingParams(ctx *cli.Context, cfg config.ApplicationConfiguration) (*zap.Logger, error) {
level := zapcore.InfoLevel
if ctx.Bool("debug") { if ctx.Bool("debug") {
log.SetLevel(log.DebugLevel) level = zapcore.DebugLevel
} }
cc := zap.NewProductionConfig()
cc.DisableCaller = true
cc.DisableStacktrace = true
cc.EncoderConfig.EncodeDuration = zapcore.StringDurationEncoder
cc.EncoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
cc.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
cc.Encoding = "console"
cc.Level = zap.NewAtomicLevelAt(level)
if logPath := cfg.LogPath; logPath != "" { if logPath := cfg.LogPath; logPath != "" {
if err := io.MakeDirForFile(logPath, "logger"); err != nil { if err := io.MakeDirForFile(logPath, "logger"); err != nil {
return err return nil, err
} }
f, err := os.Create(logPath)
if err != nil { cc.OutputPaths = []string{logPath}
return err
}
log.SetOutput(f)
} }
return nil
return cc.Build()
} }
func initBCWithMetrics(cfg config.Config) (*core.Blockchain, *metrics.Service, *metrics.Service, error) { func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *metrics.Service, *metrics.Service, error) {
chain, err := initBlockChain(cfg) chain, err := initBlockChain(cfg, log)
if err != nil { if err != nil {
return nil, nil, nil, cli.NewExitError(err, 1) return nil, nil, nil, cli.NewExitError(err, 1)
} }
configureAddresses(cfg.ApplicationConfiguration) configureAddresses(cfg.ApplicationConfiguration)
prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus) prometheus := metrics.NewPrometheusService(cfg.ApplicationConfiguration.Prometheus, log)
pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof) pprof := metrics.NewPprofService(cfg.ApplicationConfiguration.Pprof, log)
go chain.Run() go chain.Run()
go prometheus.Start() go prometheus.Start()
@ -158,7 +167,8 @@ func dumpDB(ctx *cli.Context) error {
if err != nil { if err != nil {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
if err := handleLoggingParams(ctx, cfg.ApplicationConfiguration); err != nil { log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration)
if err != nil {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
count := uint32(ctx.Uint("count")) count := uint32(ctx.Uint("count"))
@ -174,7 +184,7 @@ func dumpDB(ctx *cli.Context) error {
defer outStream.Close() defer outStream.Close()
writer := io.NewBinWriterFromIO(outStream) writer := io.NewBinWriterFromIO(outStream)
chain, prometheus, pprof, err := initBCWithMetrics(cfg) chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
if err != nil { if err != nil {
return err return err
} }
@ -213,7 +223,8 @@ func restoreDB(ctx *cli.Context) error {
if err != nil { if err != nil {
return err return err
} }
if err := handleLoggingParams(ctx, cfg.ApplicationConfiguration); err != nil { log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration)
if err != nil {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
count := uint32(ctx.Uint("count")) count := uint32(ctx.Uint("count"))
@ -229,7 +240,7 @@ func restoreDB(ctx *cli.Context) error {
defer inStream.Close() defer inStream.Close()
reader := io.NewBinReaderFromIO(inStream) reader := io.NewBinReaderFromIO(inStream)
chain, prometheus, pprof, err := initBCWithMetrics(cfg) chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
if err != nil { if err != nil {
return err return err
} }
@ -265,7 +276,7 @@ func restoreDB(ctx *cli.Context) error {
if block.Index == 0 && i == 0 && skip == 0 { if block.Index == 0 && i == 0 && skip == 0 {
genesis, err := chain.GetBlock(block.Hash()) genesis, err := chain.GetBlock(block.Hash())
if err == nil && genesis.Index == 0 { if err == nil && genesis.Index == 0 {
log.Info("skipped genesis block ", block.Hash().StringLE()) log.Info("skipped genesis block", zap.String("hash", block.Hash().StringLE()))
continue continue
} }
} }
@ -293,7 +304,8 @@ func startServer(ctx *cli.Context) error {
if err != nil { if err != nil {
return err return err
} }
if err := handleLoggingParams(ctx, cfg.ApplicationConfiguration); err != nil { log, err := handleLoggingParams(ctx, cfg.ApplicationConfiguration)
if err != nil {
return err return err
} }
@ -302,12 +314,12 @@ func startServer(ctx *cli.Context) error {
serverConfig := network.NewServerConfig(cfg) serverConfig := network.NewServerConfig(cfg)
chain, prometheus, pprof, err := initBCWithMetrics(cfg) chain, prometheus, pprof, err := initBCWithMetrics(cfg, log)
if err != nil { if err != nil {
return err return err
} }
server := network.NewServer(serverConfig, chain) server := network.NewServer(serverConfig, chain, log)
rpcServer := rpc.NewServer(chain, cfg.ApplicationConfiguration.RPC, server) rpcServer := rpc.NewServer(chain, cfg.ApplicationConfiguration.RPC, server)
errChan := make(chan error) errChan := make(chan error)
@ -364,13 +376,13 @@ func configureAddresses(cfg config.ApplicationConfiguration) {
} }
// initBlockChain initializes BlockChain with preselected DB. // initBlockChain initializes BlockChain with preselected DB.
func initBlockChain(cfg config.Config) (*core.Blockchain, error) { func initBlockChain(cfg config.Config, log *zap.Logger) (*core.Blockchain, error) {
store, err := storage.NewStore(cfg.ApplicationConfiguration.DBConfiguration) store, err := storage.NewStore(cfg.ApplicationConfiguration.DBConfiguration)
if err != nil { if err != nil {
return nil, cli.NewExitError(fmt.Errorf("could not initialize storage: %s", err), 1) return nil, cli.NewExitError(fmt.Errorf("could not initialize storage: %s", err), 1)
} }
chain, err := core.NewBlockchain(store, cfg.ProtocolConfiguration) chain, err := core.NewBlockchain(store, cfg.ProtocolConfiguration, log)
if err != nil { if err != nil {
return nil, cli.NewExitError(fmt.Errorf("could not initialize blockchain: %s", err), 1) return nil, cli.NewExitError(fmt.Errorf("could not initialize blockchain: %s", err), 1)
} }

View file

@ -13,6 +13,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/network" "github.com/CityOfZion/neo-go/pkg/network"
"github.com/CityOfZion/neo-go/pkg/rpc" "github.com/CityOfZion/neo-go/pkg/rpc"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
) )
// Benchmark test to measure number of processed TX. // Benchmark test to measure number of processed TX.
@ -23,14 +24,15 @@ func BenchmarkTXPerformanceTest(t *testing.B) {
cfg, err := config.Load(configPath, net) cfg, err := config.Load(configPath, net)
require.NoError(t, err, "could not load config") require.NoError(t, err, "could not load config")
logger := zaptest.NewLogger(t)
memoryStore := storage.NewMemoryStore() memoryStore := storage.NewMemoryStore()
chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration) chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration, logger)
require.NoError(t, err, "could not create chain") require.NoError(t, err, "could not create chain")
go chain.Run() go chain.Run()
serverConfig := network.NewServerConfig(cfg) serverConfig := network.NewServerConfig(cfg)
server := network.NewServer(serverConfig, chain) server := network.NewServer(serverConfig, chain, logger)
data := prepareData(t) data := prepareData(t)
t.ResetTimer() t.ResetTimer()

View file

@ -60,6 +60,8 @@ type service struct {
// Config is a configuration for consensus services. // Config is a configuration for consensus services.
type Config struct { type Config struct {
// Logger is a logger instance.
Logger *zap.Logger
// Broadcast is a callback which is called to notify server // Broadcast is a callback which is called to notify server
// about new consensus payload to sent. // about new consensus payload to sent.
Broadcast func(p *Payload) Broadcast func(p *Payload)
@ -79,19 +81,18 @@ type Config struct {
// NewService returns new consensus.Service instance. // NewService returns new consensus.Service instance.
func NewService(cfg Config) (Service, error) { func NewService(cfg Config) (Service, error) {
log, err := getLogger()
if err != nil {
return nil, err
}
if cfg.TimePerBlock <= 0 { if cfg.TimePerBlock <= 0 {
cfg.TimePerBlock = defaultTimePerBlock cfg.TimePerBlock = defaultTimePerBlock
} }
if cfg.Logger == nil {
return nil, errors.New("empty logger")
}
srv := &service{ srv := &service{
Config: cfg, Config: cfg,
log: log.Sugar(), log: cfg.Logger.Sugar(),
cache: newFIFOCache(cacheMaxCapacity), cache: newFIFOCache(cacheMaxCapacity),
txx: newFIFOCache(cacheMaxCapacity), txx: newFIFOCache(cacheMaxCapacity),
messages: make(chan Payload, 100), messages: make(chan Payload, 100),

View file

@ -10,6 +10,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
"github.com/nspcc-dev/dbft/block" "github.com/nspcc-dev/dbft/block"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
) )
func TestNewService(t *testing.T) { func TestNewService(t *testing.T) {
@ -128,6 +129,7 @@ func shouldNotReceive(t *testing.T, ch chan Payload) {
func newTestService(t *testing.T) *service { func newTestService(t *testing.T) *service {
srv, err := NewService(Config{ srv, err := NewService(Config{
Logger: zaptest.NewLogger(t),
Broadcast: func(*Payload) {}, Broadcast: func(*Payload) {},
Chain: newTestChain(t), Chain: newTestChain(t),
RequestTx: func(...util.Uint256) {}, RequestTx: func(...util.Uint256) {},
@ -177,7 +179,7 @@ func newTestChain(t *testing.T) *core.Blockchain {
unitTestNetCfg, err := config.Load("../../config", config.ModeUnitTestNet) unitTestNetCfg, err := config.Load("../../config", config.ModeUnitTestNet)
require.NoError(t, err) require.NoError(t, err)
chain, err := core.NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration) chain, err := core.NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration, zaptest.NewLogger(t))
require.NoError(t, err) require.NoError(t, err)
go chain.Run() go chain.Run()

View file

@ -1,19 +0,0 @@
package consensus
import (
"go.uber.org/zap"
)
func getLogger() (*zap.Logger, error) {
cc := zap.NewDevelopmentConfig()
cc.DisableCaller = true
cc.DisableStacktrace = true
cc.Encoding = "console"
log, err := cc.Build()
if err != nil {
return nil, err
}
return log.With(zap.String("module", "dbft")), nil
}

View file

@ -21,7 +21,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
"github.com/CityOfZion/neo-go/pkg/vm" "github.com/CityOfZion/neo-go/pkg/vm"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" "go.uber.org/zap"
) )
// Tuning parameters. // Tuning parameters.
@ -80,13 +80,19 @@ type Blockchain struct {
// cache for block verification keys. // cache for block verification keys.
keyCache map[util.Uint160]map[string]*keys.PublicKey keyCache map[util.Uint160]map[string]*keys.PublicKey
log *zap.Logger
} }
type headersOpFunc func(headerList *HeaderHashList) type headersOpFunc func(headerList *HeaderHashList)
// NewBlockchain returns a new blockchain object the will use the // NewBlockchain returns a new blockchain object the will use the
// given Store as its underlying storage. // given Store as its underlying storage.
func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockchain, error) { func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.Logger) (*Blockchain, error) {
if log == nil {
return nil, errors.New("empty logger")
}
bc := &Blockchain{ bc := &Blockchain{
config: cfg, config: cfg,
dao: newDao(s), dao: newDao(s),
@ -96,6 +102,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha
runToExitCh: make(chan struct{}), runToExitCh: make(chan struct{}),
memPool: NewMemPool(50000), memPool: NewMemPool(50000),
keyCache: make(map[util.Uint160]map[string]*keys.PublicKey), keyCache: make(map[util.Uint160]map[string]*keys.PublicKey),
log: log,
} }
if err := bc.init(); err != nil { if err := bc.init(); err != nil {
@ -109,7 +116,7 @@ func (bc *Blockchain) init() error {
// If we could not find the version in the Store, we know that there is nothing stored. // If we could not find the version in the Store, we know that there is nothing stored.
ver, err := bc.dao.GetVersion() ver, err := bc.dao.GetVersion()
if err != nil { if err != nil {
log.Infof("no storage version found! creating genesis block") bc.log.Info("no storage version found! creating genesis block")
if err = bc.dao.PutVersion(version); err != nil { if err = bc.dao.PutVersion(version); err != nil {
return err return err
} }
@ -131,7 +138,7 @@ func (bc *Blockchain) init() error {
// At this point there was no version found in the storage which // At this point there was no version found in the storage which
// implies a creating fresh storage with the version specified // implies a creating fresh storage with the version specified
// and the genesis block as first block. // and the genesis block as first block.
log.Infof("restoring blockchain with version: %s", version) bc.log.Info("restoring blockchain", zap.String("version", version))
bHeight, err := bc.dao.GetCurrentBlockHeight() bHeight, err := bc.dao.GetCurrentBlockHeight()
if err != nil { if err != nil {
@ -200,10 +207,10 @@ func (bc *Blockchain) Run() {
defer func() { defer func() {
persistTimer.Stop() persistTimer.Stop()
if err := bc.persist(); err != nil { if err := bc.persist(); err != nil {
log.Warnf("failed to persist: %s", err) bc.log.Warn("failed to persist", zap.Error(err))
} }
if err := bc.dao.store.Close(); err != nil { if err := bc.dao.store.Close(); err != nil {
log.Warnf("failed to close db: %s", err) bc.log.Warn("failed to close db", zap.Error(err))
} }
close(bc.runToExitCh) close(bc.runToExitCh)
}() }()
@ -218,7 +225,7 @@ func (bc *Blockchain) Run() {
go func() { go func() {
err := bc.persist() err := bc.persist()
if err != nil { if err != nil {
log.Warnf("failed to persist blockchain: %s", err) bc.log.Warn("failed to persist blockchain", zap.Error(err))
} }
}() }()
persistTimer.Reset(persistInterval) persistTimer.Reset(persistInterval)
@ -302,11 +309,10 @@ func (bc *Blockchain) AddHeaders(headers ...*Header) (err error) {
if err = bc.dao.store.PutBatch(batch); err != nil { if err = bc.dao.store.PutBatch(batch); err != nil {
return return
} }
log.WithFields(log.Fields{ bc.log.Debug("done processing headers",
"headerIndex": headerList.Len() - 1, zap.Int("headerIndex", headerList.Len()-1),
"blockHeight": bc.BlockHeight(), zap.Uint32("blockHeight", bc.BlockHeight()),
"took": time.Since(start), zap.Duration("took", time.Since(start)))
}).Debug("done processing headers")
} }
} }
<-bc.headersOpDone <-bc.headersOpDone
@ -502,7 +508,7 @@ func (bc *Blockchain) storeBlock(block *Block) error {
return err return err
} }
case *transaction.InvocationTX: case *transaction.InvocationTX:
systemInterop := newInteropContext(trigger.Application, bc, cache.store, block, tx) systemInterop := newInteropContext(trigger.Application, bc, cache.store, block, tx, bc.log)
v := bc.spawnVMWithInterops(systemInterop) v := bc.spawnVMWithInterops(systemInterop)
v.SetCheckedHash(tx.VerificationHash().BytesBE()) v.SetCheckedHash(tx.VerificationHash().BytesBE())
v.LoadScript(t.Script) v.LoadScript(t.Script)
@ -537,11 +543,10 @@ func (bc *Blockchain) storeBlock(block *Block) error {
_, _, _, _ = op, from, to, amount _, _, _, _ = op, from, to, amount
} }
} else { } else {
log.WithFields(log.Fields{ bc.log.Warn("contract invocation failed",
"tx": tx.Hash().StringLE(), zap.String("tx", tx.Hash().StringLE()),
"block": block.Index, zap.Uint32("block", block.Index),
"err": err, zap.Error(err))
}).Warn("contract invocation failed")
} }
aer := &state.AppExecResult{ aer := &state.AppExecResult{
TxHash: tx.Hash(), TxHash: tx.Hash(),
@ -710,13 +715,12 @@ func (bc *Blockchain) persist() error {
if err != nil { if err != nil {
return err return err
} }
log.WithFields(log.Fields{ bc.log.Info("blockchain persist completed",
"persistedBlocks": diff, zap.Uint32("persistedBlocks", diff),
"persistedKeys": persisted, zap.Int("persistedKeys", persisted),
"headerHeight": storedHeaderHeight, zap.Uint32("headerHeight", storedHeaderHeight),
"blockHeight": bHeight, zap.Uint32("blockHeight", bHeight),
"took": time.Since(start), zap.Duration("took", time.Since(start)))
}).Info("blockchain persist completed")
// update monitoring metrics. // update monitoring metrics.
updatePersistedHeightMetric(bHeight) updatePersistedHeightMetric(bHeight)
@ -849,7 +853,9 @@ func (bc *Blockchain) HeaderHeight() uint32 {
func (bc *Blockchain) GetAssetState(assetID util.Uint256) *state.Asset { func (bc *Blockchain) GetAssetState(assetID util.Uint256) *state.Asset {
asset, err := bc.dao.GetAssetState(assetID) asset, err := bc.dao.GetAssetState(assetID)
if asset == nil && err != storage.ErrKeyNotFound { if asset == nil && err != storage.ErrKeyNotFound {
log.Warnf("failed to get asset state %s : %s", assetID, err) bc.log.Warn("failed to get asset state",
zap.Stringer("asset", assetID),
zap.Error(err))
} }
return asset return asset
} }
@ -858,7 +864,7 @@ func (bc *Blockchain) GetAssetState(assetID util.Uint256) *state.Asset {
func (bc *Blockchain) GetContractState(hash util.Uint160) *state.Contract { func (bc *Blockchain) GetContractState(hash util.Uint160) *state.Contract {
contract, err := bc.dao.GetContractState(hash) contract, err := bc.dao.GetContractState(hash)
if contract == nil && err != storage.ErrKeyNotFound { if contract == nil && err != storage.ErrKeyNotFound {
log.Warnf("failed to get contract state: %s", err) bc.log.Warn("failed to get contract state", zap.Error(err))
} }
return contract return contract
} }
@ -867,7 +873,7 @@ func (bc *Blockchain) GetContractState(hash util.Uint160) *state.Contract {
func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *state.Account { func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *state.Account {
as, err := bc.dao.GetAccountState(scriptHash) as, err := bc.dao.GetAccountState(scriptHash)
if as == nil && err != storage.ErrKeyNotFound { if as == nil && err != storage.ErrKeyNotFound {
log.Warnf("failed to get account state: %s", err) bc.log.Warn("failed to get account state", zap.Error(err))
} }
return as return as
} }
@ -876,7 +882,7 @@ func (bc *Blockchain) GetAccountState(scriptHash util.Uint160) *state.Account {
func (bc *Blockchain) GetUnspentCoinState(hash util.Uint256) *UnspentCoinState { func (bc *Blockchain) GetUnspentCoinState(hash util.Uint256) *UnspentCoinState {
ucs, err := bc.dao.GetUnspentCoinState(hash) ucs, err := bc.dao.GetUnspentCoinState(hash)
if ucs == nil && err != storage.ErrKeyNotFound { if ucs == nil && err != storage.ErrKeyNotFound {
log.Warnf("failed to get unspent coin state: %s", err) bc.log.Warn("failed to get unspent coin state", zap.Error(err))
} }
return ucs return ucs
} }
@ -1367,7 +1373,7 @@ func (bc *Blockchain) spawnVMWithInterops(interopCtx *interopContext) *vm.VM {
// GetTestVM returns a VM and a Store setup for a test run of some sort of code. // GetTestVM returns a VM and a Store setup for a test run of some sort of code.
func (bc *Blockchain) GetTestVM() (*vm.VM, storage.Store) { func (bc *Blockchain) GetTestVM() (*vm.VM, storage.Store) {
tmpStore := storage.NewMemCachedStore(bc.dao.store) tmpStore := storage.NewMemCachedStore(bc.dao.store)
systemInterop := newInteropContext(trigger.Application, bc, tmpStore, nil, nil) systemInterop := newInteropContext(trigger.Application, bc, tmpStore, nil, nil, bc.log)
vm := bc.spawnVMWithInterops(systemInterop) vm := bc.spawnVMWithInterops(systemInterop)
return vm, tmpStore return vm, tmpStore
} }
@ -1446,7 +1452,7 @@ func (bc *Blockchain) verifyTxWitnesses(t *transaction.Transaction, block *Block
} }
sort.Slice(hashes, func(i, j int) bool { return hashes[i].Less(hashes[j]) }) sort.Slice(hashes, func(i, j int) bool { return hashes[i].Less(hashes[j]) })
sort.Slice(witnesses, func(i, j int) bool { return witnesses[i].ScriptHash().Less(witnesses[j].ScriptHash()) }) sort.Slice(witnesses, func(i, j int) bool { return witnesses[i].ScriptHash().Less(witnesses[j].ScriptHash()) })
interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, block, t) interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, block, t, bc.log)
for i := 0; i < len(hashes); i++ { for i := 0; i < len(hashes); i++ {
err := bc.verifyHashAgainstScript(hashes[i], &witnesses[i], t.VerificationHash(), interopCtx, false) err := bc.verifyHashAgainstScript(hashes[i], &witnesses[i], t.VerificationHash(), interopCtx, false)
if err != nil { if err != nil {
@ -1466,7 +1472,7 @@ func (bc *Blockchain) verifyBlockWitnesses(block *Block, prevHeader *Header) err
} else { } else {
hash = prevHeader.NextConsensus hash = prevHeader.NextConsensus
} }
interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, nil, nil) interopCtx := newInteropContext(trigger.Verification, bc, bc.dao.store, nil, nil, bc.log)
return bc.verifyHashAgainstScript(hash, &block.Script, block.VerificationHash(), interopCtx, true) return bc.verifyHashAgainstScript(hash, &block.Script, block.VerificationHash(), interopCtx, true)
} }

View file

@ -17,6 +17,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
"github.com/CityOfZion/neo-go/pkg/vm/opcode" "github.com/CityOfZion/neo-go/pkg/vm/opcode"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
) )
var newBlockPrevHash util.Uint256 var newBlockPrevHash util.Uint256
@ -37,7 +38,7 @@ func newTestChain(t *testing.T) *Blockchain {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
chain, err := NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration) chain, err := NewBlockchain(storage.NewMemoryStore(), unitTestNetCfg.ProtocolConfiguration, zaptest.NewLogger(t))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -15,6 +15,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/vm" "github.com/CityOfZion/neo-go/pkg/vm"
"github.com/CityOfZion/neo-go/pkg/vm/opcode" "github.com/CityOfZion/neo-go/pkg/vm/opcode"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
) )
/* Missing tests: /* Missing tests:
@ -112,7 +113,7 @@ func TestHeaderGetVersion(t *testing.T) {
func TestHeaderGetVersion_Negative(t *testing.T) { func TestHeaderGetVersion_Negative(t *testing.T) {
v := vm.New() v := vm.New()
block := newDumbBlock() block := newDumbBlock()
context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil) context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil, zaptest.NewLogger(t))
v.Estack().PushVal(vm.NewBoolItem(false)) v.Estack().PushVal(vm.NewBoolItem(false))
err := context.headerGetVersion(v) err := context.headerGetVersion(v)
@ -197,7 +198,7 @@ func TestWitnessGetVerificationScript(t *testing.T) {
script := []byte{byte(opcode.PUSHM1), byte(opcode.RET)} script := []byte{byte(opcode.PUSHM1), byte(opcode.RET)}
witness := transaction.Witness{InvocationScript: nil, VerificationScript: script} witness := transaction.Witness{InvocationScript: nil, VerificationScript: script}
context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t))
v.Estack().PushVal(vm.NewInteropItem(&witness)) v.Estack().PushVal(vm.NewInteropItem(&witness))
err := context.witnessGetVerificationScript(v) err := context.witnessGetVerificationScript(v)
require.NoError(t, err) require.NoError(t, err)
@ -418,7 +419,7 @@ func TestAssetGetPrecision(t *testing.T) {
func createVMAndPushBlock(t *testing.T) (*vm.VM, *Block, *interopContext) { func createVMAndPushBlock(t *testing.T) (*vm.VM, *Block, *interopContext) {
v := vm.New() v := vm.New()
block := newDumbBlock() block := newDumbBlock()
context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil) context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), block, nil, zaptest.NewLogger(t))
v.Estack().PushVal(vm.NewInteropItem(block)) v.Estack().PushVal(vm.NewInteropItem(block))
return v, block, context return v, block, context
} }
@ -447,7 +448,7 @@ func createVMAndAssetState(t *testing.T) (*vm.VM, *state.Asset, *interopContext)
IsFrozen: false, IsFrozen: false,
} }
context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t))
return v, assetState, context return v, assetState, context
} }
@ -465,7 +466,7 @@ func createVMAndContractState(t *testing.T) (*vm.VM, *state.Contract, *interopCo
Description: random.String(10), Description: random.String(10),
} }
context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t))
return v, contractState, context return v, contractState, context
} }
@ -479,7 +480,7 @@ func createVMAndAccState(t *testing.T) (*vm.VM, *state.Account, *interopContext)
accountState.Votes = []*keys.PublicKey{key} accountState.Votes = []*keys.PublicKey{key}
require.NoError(t, err) require.NoError(t, err)
context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t))
return v, accountState, context return v, accountState, context
} }
@ -509,6 +510,6 @@ func createVMAndTX(t *testing.T) (*vm.VM, *transaction.Transaction, *interopCont
tx.Attributes = attributes tx.Attributes = attributes
tx.Inputs = inputs tx.Inputs = inputs
tx.Outputs = outputs tx.Outputs = outputs
context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, tx) context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, tx, zaptest.NewLogger(t))
return v, tx, context return v, tx, context
} }

View file

@ -13,7 +13,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
"github.com/CityOfZion/neo-go/pkg/vm" "github.com/CityOfZion/neo-go/pkg/vm"
gherr "github.com/pkg/errors" gherr "github.com/pkg/errors"
log "github.com/sirupsen/logrus" "go.uber.org/zap"
) )
const ( const (
@ -350,7 +350,9 @@ func (ic *interopContext) runtimeNotify(v *vm.VM) error {
// runtimeLog logs the message passed. // runtimeLog logs the message passed.
func (ic *interopContext) runtimeLog(v *vm.VM) error { func (ic *interopContext) runtimeLog(v *vm.VM) error {
msg := fmt.Sprintf("%q", v.Estack().Pop().Bytes()) msg := fmt.Sprintf("%q", v.Estack().Pop().Bytes())
log.Infof("script %s logs: %s", getContextScriptHash(v, 0), msg) ic.log.Info("runtime log",
zap.Stringer("script", getContextScriptHash(v, 0)),
zap.String("logs", msg))
return nil return nil
} }

View file

@ -14,6 +14,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/storage"
"github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/vm" "github.com/CityOfZion/neo-go/pkg/vm"
"go.uber.org/zap"
) )
type interopContext struct { type interopContext struct {
@ -23,12 +24,13 @@ type interopContext struct {
tx *transaction.Transaction tx *transaction.Transaction
dao *cachedDao dao *cachedDao
notifications []state.NotificationEvent notifications []state.NotificationEvent
log *zap.Logger
} }
func newInteropContext(trigger byte, bc Blockchainer, s storage.Store, block *Block, tx *transaction.Transaction) *interopContext { func newInteropContext(trigger byte, bc Blockchainer, s storage.Store, block *Block, tx *transaction.Transaction, log *zap.Logger) *interopContext {
dao := newCachedDao(s) dao := newCachedDao(s)
nes := make([]state.NotificationEvent, 0) nes := make([]state.NotificationEvent, 0)
return &interopContext{bc, trigger, block, tx, dao, nes} return &interopContext{bc, trigger, block, tx, dao, nes, log}
} }
// interopedFunction binds function name, id with the function itself and price, // interopedFunction binds function name, id with the function itself and price,

View file

@ -9,12 +9,13 @@ import (
"github.com/CityOfZion/neo-go/pkg/smartcontract/trigger" "github.com/CityOfZion/neo-go/pkg/smartcontract/trigger"
"github.com/CityOfZion/neo-go/pkg/vm" "github.com/CityOfZion/neo-go/pkg/vm"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
) )
func testNonInterop(t *testing.T, value interface{}, f func(*interopContext, *vm.VM) error) { func testNonInterop(t *testing.T, value interface{}, f func(*interopContext, *vm.VM) error) {
v := vm.New() v := vm.New()
v.Estack().PushVal(value) v.Estack().PushVal(value)
context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil) context := newInteropContext(trigger.Application, newTestChain(t), storage.NewMemoryStore(), nil, nil, zaptest.NewLogger(t))
require.Error(t, f(context, v)) require.Error(t, f(context, v))
} }

View file

@ -3,17 +3,23 @@ package network
import ( import (
"github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core"
"github.com/Workiva/go-datastructures/queue" "github.com/Workiva/go-datastructures/queue"
log "github.com/sirupsen/logrus" "go.uber.org/zap"
) )
type blockQueue struct { type blockQueue struct {
log *zap.Logger
queue *queue.PriorityQueue queue *queue.PriorityQueue
checkBlocks chan struct{} checkBlocks chan struct{}
chain core.Blockchainer chain core.Blockchainer
} }
func newBlockQueue(capacity int, bc core.Blockchainer) *blockQueue { func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQueue {
if log == nil {
return nil
}
return &blockQueue{ return &blockQueue{
log: log,
queue: queue.NewPriorityQueue(capacity, false), queue: queue.NewPriorityQueue(capacity, false),
checkBlocks: make(chan struct{}, 1), checkBlocks: make(chan struct{}, 1),
chain: bc, chain: bc,
@ -38,11 +44,10 @@ func (bq *blockQueue) run() {
if minblock.Index == bq.chain.BlockHeight()+1 { if minblock.Index == bq.chain.BlockHeight()+1 {
err := bq.chain.AddBlock(minblock) err := bq.chain.AddBlock(minblock)
if err != nil { if err != nil {
log.WithFields(log.Fields{ bq.log.Warn("blockQueue: failed adding block into the blockchain",
"error": err.Error(), zap.String("error", err.Error()),
"blockHeight": bq.chain.BlockHeight(), zap.Uint32("blockHeight", bq.chain.BlockHeight()),
"nextIndex": minblock.Index, zap.Uint32("nextIndex", minblock.Index))
}).Warn("blockQueue: failed adding block into the blockchain")
} }
} }
} else { } else {

View file

@ -6,12 +6,13 @@ import (
"github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
) )
func TestBlockQueue(t *testing.T) { func TestBlockQueue(t *testing.T) {
chain := &testChain{} chain := &testChain{}
// notice, it's not yet running // notice, it's not yet running
bq := newBlockQueue(0, chain) bq := newBlockQueue(0, chain, zaptest.NewLogger(t))
blocks := make([]*core.Block, 11) blocks := make([]*core.Block, 11)
for i := 1; i < 11; i++ { for i := 1; i < 11; i++ {
blocks[i] = &core.Block{BlockBase: core.BlockBase{Index: uint32(i)}} blocks[i] = &core.Block{BlockBase: core.BlockBase{Index: uint32(i)}}

View file

@ -16,6 +16,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/network/payload"
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
"github.com/CityOfZion/neo-go/pkg/vm" "github.com/CityOfZion/neo-go/pkg/vm"
"go.uber.org/zap/zaptest"
) )
type testChain struct { type testChain struct {
@ -203,7 +204,7 @@ func (p *localPeer) Handshaked() bool {
return p.handshaked return p.handshaked
} }
func newTestServer() *Server { func newTestServer(t *testing.T) *Server {
return &Server{ return &Server{
ServerConfig: ServerConfig{}, ServerConfig: ServerConfig{},
chain: &testChain{}, chain: &testChain{},
@ -214,6 +215,7 @@ func newTestServer() *Server {
register: make(chan Peer), register: make(chan Peer),
unregister: make(chan peerDrop), unregister: make(chan peerDrop),
peers: make(map[Peer]bool), peers: make(map[Peer]bool),
log: zaptest.NewLogger(t),
} }
} }

View file

@ -4,13 +4,14 @@ import (
"context" "context"
"net/http" "net/http"
log "github.com/sirupsen/logrus" "go.uber.org/zap"
) )
// Service serves metrics. // Service serves metrics.
type Service struct { type Service struct {
*http.Server *http.Server
config Config config Config
log *zap.Logger
serviceType string serviceType string
} }
@ -24,27 +25,21 @@ type Config struct {
// Start runs http service with exposed endpoint on configured port. // Start runs http service with exposed endpoint on configured port.
func (ms *Service) Start() { func (ms *Service) Start() {
if ms.config.Enabled { if ms.config.Enabled {
log.WithFields(log.Fields{ ms.log.Info("service is running", zap.String("endpoint", ms.Addr))
"endpoint": ms.Addr,
"service": ms.serviceType,
}).Info("service running")
err := ms.ListenAndServe() err := ms.ListenAndServe()
if err != nil && err != http.ErrServerClosed { if err != nil && err != http.ErrServerClosed {
log.Warnf("%s service couldn't start on configured port", ms.serviceType) ms.log.Warn("service couldn't start on configured port")
} }
} else { } else {
log.Infof("%s service hasn't started since it's disabled", ms.serviceType) ms.log.Info("service hasn't started since it's disabled")
} }
} }
// ShutDown stops service. // ShutDown stops service.
func (ms *Service) ShutDown() { func (ms *Service) ShutDown() {
log.WithFields(log.Fields{ ms.log.Info("shutting down service", zap.String("endpoint", ms.Addr))
"endpoint": ms.Addr,
"service": ms.serviceType,
}).Info("shutting down service")
err := ms.Shutdown(context.Background()) err := ms.Shutdown(context.Background())
if err != nil { if err != nil {
log.Fatalf("can't shut down %s service", ms.serviceType) ms.log.Panic("can't shut down service")
} }
} }

View file

@ -3,13 +3,19 @@ package metrics
import ( import (
"net/http" "net/http"
"net/http/pprof" "net/http/pprof"
"go.uber.org/zap"
) )
// PprofService https://golang.org/pkg/net/http/pprof/. // PprofService https://golang.org/pkg/net/http/pprof/.
type PprofService Service type PprofService Service
// NewPprofService created new service for gathering pprof metrics. // NewPprofService created new service for gathering pprof metrics.
func NewPprofService(cfg Config) *Service { func NewPprofService(cfg Config, log *zap.Logger) *Service {
if log == nil {
return nil
}
handler := http.NewServeMux() handler := http.NewServeMux()
handler.HandleFunc("/debug/pprof/", pprof.Index) handler.HandleFunc("/debug/pprof/", pprof.Index)
handler.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) handler.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
@ -18,9 +24,12 @@ func NewPprofService(cfg Config) *Service {
handler.HandleFunc("/debug/pprof/trace", pprof.Trace) handler.HandleFunc("/debug/pprof/trace", pprof.Trace)
return &Service{ return &Service{
&http.Server{ Server: &http.Server{
Addr: cfg.Address + ":" + cfg.Port, Addr: cfg.Address + ":" + cfg.Port,
Handler: handler, Handler: handler,
}, cfg, "Pprof", },
config: cfg,
serviceType: "Pprof",
log: log.With(zap.String("service", "Pprof")),
} }
} }

View file

@ -4,17 +4,25 @@ import (
"net/http" "net/http"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
) )
// PrometheusService https://prometheus.io/docs/guides/go-application. // PrometheusService https://prometheus.io/docs/guides/go-application.
type PrometheusService Service type PrometheusService Service
// NewPrometheusService creates new service for gathering prometheus metrics. // NewPrometheusService creates new service for gathering prometheus metrics.
func NewPrometheusService(cfg Config) *Service { func NewPrometheusService(cfg Config, log *zap.Logger) *Service {
if log == nil {
return nil
}
return &Service{ return &Service{
&http.Server{ Server: &http.Server{
Addr: cfg.Address + ":" + cfg.Port, Addr: cfg.Address + ":" + cfg.Port,
Handler: promhttp.Handler(), Handler: promhttp.Handler(),
}, cfg, "Prometheus", },
config: cfg,
serviceType: "Prometheus",
log: log.With(zap.String("service", "Prometheus")),
} }
} }

View file

@ -3,7 +3,6 @@ package payload
import ( import (
"github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/io" "github.com/CityOfZion/neo-go/pkg/io"
log "github.com/sirupsen/logrus"
) )
// Headers payload. // Headers payload.
@ -22,7 +21,6 @@ func (p *Headers) DecodeBinary(br *io.BinReader) {
// C# node does it silently // C# node does it silently
if lenHeaders > MaxHeadersAllowed { if lenHeaders > MaxHeadersAllowed {
log.Warnf("received %d headers, capping to %d", lenHeaders, MaxHeadersAllowed)
lenHeaders = MaxHeadersAllowed lenHeaders = MaxHeadersAllowed
} }

View file

@ -15,8 +15,8 @@ import (
"github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/network/payload"
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap"
) )
const ( const (
@ -65,6 +65,8 @@ type (
quit chan struct{} quit chan struct{}
connected *atomic.Bool connected *atomic.Bool
log *zap.Logger
} }
peerDrop struct { peerDrop struct {
@ -80,11 +82,15 @@ func randomID() uint32 {
} }
// NewServer returns a new Server, initialized with the given configuration. // NewServer returns a new Server, initialized with the given configuration.
func NewServer(config ServerConfig, chain core.Blockchainer) *Server { func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) *Server {
if log == nil {
return nil
}
s := &Server{ s := &Server{
ServerConfig: config, ServerConfig: config,
chain: chain, chain: chain,
bQueue: newBlockQueue(maxBlockBatch, chain), bQueue: newBlockQueue(maxBlockBatch, chain, log),
id: randomID(), id: randomID(),
quit: make(chan struct{}), quit: make(chan struct{}),
addrReq: make(chan *Message, config.MinPeers), addrReq: make(chan *Message, config.MinPeers),
@ -92,9 +98,11 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server {
unregister: make(chan peerDrop), unregister: make(chan peerDrop),
peers: make(map[Peer]bool), peers: make(map[Peer]bool),
connected: atomic.NewBool(false), connected: atomic.NewBool(false),
log: log,
} }
srv, err := consensus.NewService(consensus.Config{ srv, err := consensus.NewService(consensus.Config{
Logger: log,
Broadcast: s.handleNewPayload, Broadcast: s.handleNewPayload,
RelayBlock: s.relayBlock, RelayBlock: s.relayBlock,
Chain: chain, Chain: chain,
@ -108,30 +116,27 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server {
s.consensus = srv s.consensus = srv
if s.MinPeers <= 0 { if s.MinPeers <= 0 {
log.WithFields(log.Fields{ s.log.Info("bad MinPeers configured, using the default value",
"MinPeers configured": s.MinPeers, zap.Int("configured", s.MinPeers),
"MinPeers actual": defaultMinPeers, zap.Int("actual", defaultMinPeers))
}).Info("bad MinPeers configured, using the default value")
s.MinPeers = defaultMinPeers s.MinPeers = defaultMinPeers
} }
if s.MaxPeers <= 0 { if s.MaxPeers <= 0 {
log.WithFields(log.Fields{ s.log.Info("bad MaxPeers configured, using the default value",
"MaxPeers configured": s.MaxPeers, zap.Int("configured", s.MaxPeers),
"MaxPeers actual": defaultMaxPeers, zap.Int("actual", defaultMaxPeers))
}).Info("bad MaxPeers configured, using the default value")
s.MaxPeers = defaultMaxPeers s.MaxPeers = defaultMaxPeers
} }
if s.AttemptConnPeers <= 0 { if s.AttemptConnPeers <= 0 {
log.WithFields(log.Fields{ s.log.Info("bad AttemptConnPeers configured, using the default value",
"AttemptConnPeers configured": s.AttemptConnPeers, zap.Int("configured", s.AttemptConnPeers),
"AttemptConnPeers actual": defaultAttemptConnPeers, zap.Int("actual", defaultAttemptConnPeers))
}).Info("bad AttemptConnPeers configured, using the default value")
s.AttemptConnPeers = defaultAttemptConnPeers s.AttemptConnPeers = defaultAttemptConnPeers
} }
s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port)) s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", config.Address, config.Port), s.log)
s.discovery = NewDefaultDiscovery( s.discovery = NewDefaultDiscovery(
s.DialTimeout, s.DialTimeout,
s.transport, s.transport,
@ -147,10 +152,9 @@ func (s *Server) ID() uint32 {
// Start will start the server and its underlying transport. // Start will start the server and its underlying transport.
func (s *Server) Start(errChan chan error) { func (s *Server) Start(errChan chan error) {
log.WithFields(log.Fields{ s.log.Info("node started",
"blockHeight": s.chain.BlockHeight(), zap.Uint32("blockHeight", s.chain.BlockHeight()),
"headerHeight": s.chain.HeaderHeight(), zap.Uint32("headerHeight", s.chain.HeaderHeight()))
}).Info("node started")
s.discovery.BackFill(s.Seeds...) s.discovery.BackFill(s.Seeds...)
@ -162,9 +166,7 @@ func (s *Server) Start(errChan chan error) {
// Shutdown disconnects all peers and stops listening. // Shutdown disconnects all peers and stops listening.
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
log.WithFields(log.Fields{ s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
"peers": s.PeerCount(),
}).Info("shutting down server")
s.bQueue.discard() s.bQueue.discard()
close(s.quit) close(s.quit)
} }
@ -205,9 +207,7 @@ func (s *Server) run() {
s.lock.Lock() s.lock.Lock()
s.peers[p] = true s.peers[p] = true
s.lock.Unlock() s.lock.Unlock()
log.WithFields(log.Fields{ s.log.Info("new peer connected", zap.Stringer("addr", p.RemoteAddr()))
"addr": p.RemoteAddr(),
}).Info("new peer connected")
peerCount := s.PeerCount() peerCount := s.PeerCount()
if peerCount > s.MaxPeers { if peerCount > s.MaxPeers {
s.lock.RLock() s.lock.RLock()
@ -225,11 +225,10 @@ func (s *Server) run() {
if s.peers[drop.peer] { if s.peers[drop.peer] {
delete(s.peers, drop.peer) delete(s.peers, drop.peer)
s.lock.Unlock() s.lock.Unlock()
log.WithFields(log.Fields{ s.log.Warn("peer disconnected",
"addr": drop.peer.RemoteAddr(), zap.Stringer("addr", drop.peer.RemoteAddr()),
"reason": drop.reason, zap.String("reason", drop.reason.Error()),
"peerCount": s.PeerCount(), zap.Int("peerCount", s.PeerCount()))
}).Warn("peer disconnected")
addr := drop.peer.PeerAddr().String() addr := drop.peer.PeerAddr().String()
if drop.reason == errIdenticalID { if drop.reason == errIdenticalID {
s.discovery.RegisterBadAddr(addr) s.discovery.RegisterBadAddr(addr)
@ -254,7 +253,7 @@ func (s *Server) tryStartConsensus() {
} }
if s.HandshakedPeersCount() >= s.MinPeers { if s.HandshakedPeersCount() >= s.MinPeers {
log.Info("minimum amount of peers were connected to") s.log.Info("minimum amount of peers were connected to")
if s.connected.CAS(false, true) { if s.connected.CAS(false, true) {
s.consensus.Start() s.consensus.Start()
} }
@ -304,12 +303,11 @@ func (s *Server) HandshakedPeersCount() int {
func (s *Server) startProtocol(p Peer) { func (s *Server) startProtocol(p Peer) {
var err error var err error
log.WithFields(log.Fields{ s.log.Info("started protocol",
"addr": p.RemoteAddr(), zap.Stringer("addr", p.RemoteAddr()),
"userAgent": string(p.Version().UserAgent), zap.ByteString("userAgent", p.Version().UserAgent),
"startHeight": p.Version().StartHeight, zap.Uint32("startHeight", p.Version().StartHeight),
"id": p.Version().Nonce, zap.Uint32("id", p.Version().Nonce))
}).Info("started protocol")
s.discovery.RegisterGoodAddr(p.PeerAddr().String()) s.discovery.RegisterGoodAddr(p.PeerAddr().String())
if s.chain.HeaderHeight() < p.Version().StartHeight { if s.chain.HeaderHeight() < p.Version().StartHeight {
@ -386,7 +384,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
// This method could best be called in a separate routine. // This method could best be called in a separate routine.
func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) { func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) {
if err := s.chain.AddHeaders(headers.Hdrs...); err != nil { if err := s.chain.AddHeaders(headers.Hdrs...); err != nil {
log.Warnf("failed processing headers: %s", err) s.log.Warn("failed processing headers", zap.Error(err))
return return
} }
// The peer will respond with a maximum of 2000 headers in one batch. // The peer will respond with a maximum of 2000 headers in one batch.

View file

@ -4,7 +4,7 @@ import (
"time" "time"
"github.com/CityOfZion/neo-go/config" "github.com/CityOfZion/neo-go/config"
log "github.com/sirupsen/logrus" "go.uber.org/zap/zapcore"
) )
type ( type (
@ -53,7 +53,7 @@ type (
ProtoTickInterval time.Duration ProtoTickInterval time.Duration
// Level of the internal logger. // Level of the internal logger.
LogLevel log.Level LogLevel zapcore.Level
// Wallet is a wallet configuration. // Wallet is a wallet configuration.
Wallet *config.WalletConfig Wallet *config.WalletConfig

View file

@ -11,7 +11,7 @@ import (
func TestSendVersion(t *testing.T) { func TestSendVersion(t *testing.T) {
var ( var (
s = newTestServer() s = newTestServer(t)
p = newLocalPeer(t) p = newLocalPeer(t)
) )
s.Port = 3000 s.Port = 3000
@ -37,7 +37,7 @@ func TestSendVersion(t *testing.T) {
// Server should reply with a verack after receiving a valid version. // Server should reply with a verack after receiving a valid version.
func TestVerackAfterHandleVersionCmd(t *testing.T) { func TestVerackAfterHandleVersionCmd(t *testing.T) {
var ( var (
s = newTestServer() s = newTestServer(t)
p = newLocalPeer(t) p = newLocalPeer(t)
) )
na, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:3000") na, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:3000")
@ -58,7 +58,7 @@ func TestVerackAfterHandleVersionCmd(t *testing.T) {
// invalid version and disconnects the peer. // invalid version and disconnects the peer.
func TestServerNotSendsVerack(t *testing.T) { func TestServerNotSendsVerack(t *testing.T) {
var ( var (
s = newTestServer() s = newTestServer(t)
p = newLocalPeer(t) p = newLocalPeer(t)
p2 = newLocalPeer(t) p2 = newLocalPeer(t)
) )
@ -91,7 +91,7 @@ func TestServerNotSendsVerack(t *testing.T) {
func TestRequestHeaders(t *testing.T) { func TestRequestHeaders(t *testing.T) {
var ( var (
s = newTestServer() s = newTestServer(t)
p = newLocalPeer(t) p = newLocalPeer(t)
) )
p.messageHandler = func(t *testing.T, msg *Message) { p.messageHandler = func(t *testing.T, msg *Message) {

View file

@ -6,11 +6,12 @@ import (
"time" "time"
"github.com/CityOfZion/neo-go/pkg/io" "github.com/CityOfZion/neo-go/pkg/io"
log "github.com/sirupsen/logrus" "go.uber.org/zap"
) )
// TCPTransport allows network communication over TCP. // TCPTransport allows network communication over TCP.
type TCPTransport struct { type TCPTransport struct {
log *zap.Logger
server *Server server *Server
listener net.Listener listener net.Listener
bindAddr string bindAddr string
@ -20,8 +21,9 @@ var reClosedNetwork = regexp.MustCompile(".* use of closed network connection")
// NewTCPTransport returns a new TCPTransport that will listen for // NewTCPTransport returns a new TCPTransport that will listen for
// new incoming peer connections. // new incoming peer connections.
func NewTCPTransport(s *Server, bindAddr string) *TCPTransport { func NewTCPTransport(s *Server, bindAddr string, log *zap.Logger) *TCPTransport {
return &TCPTransport{ return &TCPTransport{
log: log,
server: s, server: s,
bindAddr: bindAddr, bindAddr: bindAddr,
} }
@ -41,7 +43,7 @@ func (t *TCPTransport) Dial(addr string, timeout time.Duration) error {
func (t *TCPTransport) Accept() { func (t *TCPTransport) Accept() {
l, err := net.Listen("tcp", t.bindAddr) l, err := net.Listen("tcp", t.bindAddr)
if err != nil { if err != nil {
log.Fatalf("TCP listen error %s", err) t.log.Panic("TCP listen error", zap.Error(err))
return return
} }
@ -50,7 +52,7 @@ func (t *TCPTransport) Accept() {
for { for {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
log.Warnf("TCP accept error: %s", err) t.log.Warn("TCP accept error", zap.Error(err))
if t.isCloseError(err) { if t.isCloseError(err) {
break break
} }
@ -80,9 +82,7 @@ func (t *TCPTransport) handleConn(conn net.Conn) {
// When a new peer is connected we send out our version immediately. // When a new peer is connected we send out our version immediately.
if err := t.server.sendVersion(p); err != nil { if err := t.server.sendVersion(p); err != nil {
log.WithFields(log.Fields{ t.log.Error("error on sendVersion", zap.Stringer("addr", p.RemoteAddr()), zap.Error(err))
"addr": p.RemoteAddr(),
}).Error(err)
} }
r := io.NewBinReaderFromIO(p.conn) r := io.NewBinReaderFromIO(p.conn)
for { for {

View file

@ -13,6 +13,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/rpc/result" "github.com/CityOfZion/neo-go/pkg/rpc/result"
"github.com/CityOfZion/neo-go/pkg/rpc/wrappers" "github.com/CityOfZion/neo-go/pkg/rpc/wrappers"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
) )
// ErrorResponse struct represents JSON-RPC error. // ErrorResponse struct represents JSON-RPC error.
@ -180,7 +181,7 @@ func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFu
require.NoError(t, err, "could not load config") require.NoError(t, err, "could not load config")
memoryStore := storage.NewMemoryStore() memoryStore := storage.NewMemoryStore()
chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration) chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration, zaptest.NewLogger(t))
require.NoError(t, err, "could not create chain") require.NoError(t, err, "could not create chain")
go chain.Run() go chain.Run()
@ -198,7 +199,7 @@ func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFu
} }
serverConfig := network.NewServerConfig(cfg) serverConfig := network.NewServerConfig(cfg)
server := network.NewServer(serverConfig, chain) server := network.NewServer(serverConfig, chain, zaptest.NewLogger(t))
rpcServer := NewServer(chain, cfg.ApplicationConfiguration.RPC, server) rpcServer := NewServer(chain, cfg.ApplicationConfiguration.RPC, server)
handler := http.HandlerFunc(rpcServer.requestHandler) handler := http.HandlerFunc(rpcServer.requestHandler)