network: unplug stateroot service from the Server

Notice that it makes the node accept Extensible payloads with any category
which is the same way C# node works. We're trusting Extensible senders,
improper payloads are harmless until they DoS the network, but we have some
protections against that too (and spamming with proper category doesn't differ
a lot).
This commit is contained in:
Roman Khimov 2022-01-12 21:09:37 +03:00
parent 0ad3ea5944
commit 66aafd868b
6 changed files with 46 additions and 55 deletions

View file

@ -18,6 +18,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network/metrics"
"github.com/nspcc-dev/neo-go/pkg/rpc/server"
"github.com/nspcc-dev/neo-go/pkg/services/oracle"
"github.com/nspcc-dev/neo-go/pkg/services/stateroot"
"github.com/urfave/cli"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -361,6 +362,12 @@ func startServer(ctx *cli.Context) error {
if err != nil {
return cli.NewExitError(fmt.Errorf("failed to create network server: %w", err), 1)
}
sr, err := stateroot.New(serverConfig.StateRootCfg, log, chain, serv.BroadcastExtensible)
if err != nil {
return cli.NewExitError(fmt.Errorf("can't initialize StateRoot service: %w", err), 1)
}
serv.AddExtensibleService(sr, stateroot.Category, sr.OnPayload)
oracleSrv, err := mkOracle(serverConfig, chain, serv, log)
if err != nil {
return err

View file

@ -52,7 +52,7 @@ type Service interface {
Shutdown()
// OnPayload is a callback to notify Service about new received payload.
OnPayload(p *npayload.Extensible)
OnPayload(p *npayload.Extensible) error
// OnTransaction is a callback to notify Service about new received transaction.
OnTransaction(tx *transaction.Transaction)
}
@ -365,26 +365,27 @@ func (s *service) payloadFromExtensible(ep *npayload.Extensible) *Payload {
}
// OnPayload handles Payload receive.
func (s *service) OnPayload(cp *npayload.Extensible) {
func (s *service) OnPayload(cp *npayload.Extensible) error {
log := s.log.With(zap.Stringer("hash", cp.Hash()))
p := s.payloadFromExtensible(cp)
// decode payload data into message
if err := p.decodeData(); err != nil {
log.Info("can't decode payload data", zap.Error(err))
return
return nil
}
if !s.validatePayload(p) {
log.Info("can't validate payload")
return
return nil
}
if s.dbft == nil || !s.started.Load() {
log.Debug("dbft is inactive or not started yet")
return
return nil
}
s.messages <- *p
return nil
}
func (s *service) OnTransaction(tx *transaction.Transaction) {

View file

@ -351,7 +351,7 @@ func TestService_OnPayload(t *testing.T) {
p.encodeData()
// sender is invalid
srv.OnPayload(&p.Extensible)
require.NoError(t, srv.OnPayload(&p.Extensible))
shouldNotReceive(t, srv.messages)
p = new(Payload)
@ -359,7 +359,7 @@ func TestService_OnPayload(t *testing.T) {
p.Sender = priv.GetScriptHash()
p.SetPayload(&prepareRequest{})
require.NoError(t, p.Sign(priv))
srv.OnPayload(&p.Extensible)
require.NoError(t, srv.OnPayload(&p.Extensible))
shouldReceive(t, srv.messages)
}

View file

@ -25,7 +25,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/notary"
"github.com/nspcc-dev/neo-go/pkg/services/stateroot"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/atomic"
"go.uber.org/zap"
@ -83,6 +82,7 @@ type (
notaryFeer NotaryFeer
notaryModule *notary.Notary
services []Service
extensHandlers map[string]func(*payload.Extensible) error
txInLock sync.Mutex
txInMap map[util.Uint256]struct{}
@ -103,7 +103,6 @@ type (
syncReached *atomic.Bool
stateRoot stateroot.Service
stateSync blockchainer.StateSync
log *zap.Logger
@ -159,6 +158,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log,
transactions: make(chan *transaction.Transaction, 64),
extensHandlers: make(map[string]func(*payload.Extensible) error),
}
if chain.P2PSigExtensionsEnabled() {
s.notaryFeer = NewNotaryFeer(chain)
@ -194,17 +194,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
s.tryStartServices()
})
if config.StateRootCfg.Enabled && chain.GetConfig().StateRootInHeader {
return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled")
}
sr, err := stateroot.New(config.StateRootCfg, s.log, chain, s.handleNewPayload)
if err != nil {
return nil, fmt.Errorf("can't initialize StateRoot service: %w", err)
}
s.stateRoot = sr
s.services = append(s.services, sr)
sSync := chain.GetStateSyncModule()
s.stateSync = sSync
s.bSyncQueue = newBlockQueue(maxBlockBatch, sSync, log, nil)
@ -212,7 +201,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
if config.Wallet != nil {
srv, err := newConsensus(consensus.Config{
Logger: log,
Broadcast: s.handleNewPayload,
Broadcast: s.BroadcastExtensible,
Chain: chain,
ProtocolConfiguration: chain.GetConfig(),
RequestTx: s.requestTx,
@ -225,7 +214,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
}
s.consensus = srv
s.services = append(s.services, srv)
s.AddExtensibleService(srv, consensus.Category, srv.OnPayload)
}
if s.MinPeers < 0 {
@ -306,9 +295,10 @@ func (s *Server) AddService(svc Service) {
s.services = append(s.services, svc)
}
// GetStateRoot returns state root service instance.
func (s *Server) GetStateRoot() stateroot.Service {
return s.stateRoot
// AddExtensibleService register a service that handles extensible payload of some kind.
func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) {
s.extensHandlers[category] = handler
s.AddService(svc)
}
// UnconnectedPeers returns a list of peers that are in the discovery peer list
@ -946,27 +936,26 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error {
if !ok { // payload is already in cache
return nil
}
switch e.Category {
case consensus.Category:
if s.consensus != nil {
s.consensus.OnPayload(e)
}
case stateroot.Category:
err := s.stateRoot.OnPayload(e)
handler := s.extensHandlers[e.Category]
if handler != nil {
err = handler(e)
if err != nil {
return err
}
default:
return errors.New("invalid category")
}
s.advertiseExtensible(e)
return nil
}
func (s *Server) advertiseExtensible(e *payload.Extensible) {
msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()}))
if e.Category == consensus.Category {
// It's high priority because it directly affects consensus process,
// even though it's just an inv.
s.broadcastHPMessage(msg)
} else {
s.broadcastMessage(msg)
}
return nil
}
// handleTxCmd processes received transaction.
@ -1253,22 +1242,17 @@ func (s *Server) tryInitStateSync() {
}
}
}
func (s *Server) handleNewPayload(p *payload.Extensible) {
// BroadcastExtensible add locally-generated Extensible payload to the pool
// and advertises it to peers.
func (s *Server) BroadcastExtensible(p *payload.Extensible) {
_, err := s.extensiblePool.Add(p)
if err != nil {
s.log.Error("created payload is not valid", zap.Error(err))
return
}
msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{p.Hash()}))
switch p.Category {
case consensus.Category:
// It's high priority because it directly affects consensus process,
// even though it's just an inv.
s.broadcastHPMessage(msg)
default:
s.broadcastMessage(msg)
}
s.advertiseExtensible(p)
}
func (s *Server) requestTx(hashes ...util.Uint256) {

View file

@ -43,7 +43,10 @@ func newFakeConsensus(c consensus.Config) (consensus.Service, error) {
}
func (f *fakeConsensus) Start() { f.started.Store(true) }
func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) }
func (f *fakeConsensus) OnPayload(p *payload.Extensible) { f.payloads = append(f.payloads, p) }
func (f *fakeConsensus) OnPayload(p *payload.Extensible) error {
f.payloads = append(f.payloads, p)
return nil
}
func (f *fakeConsensus) OnTransaction(tx *transaction.Transaction) { f.txs = append(f.txs, tx) }
func (f *fakeConsensus) GetPayload(h util.Uint256) *payload.Extensible { panic("implement me") }
@ -455,13 +458,6 @@ func TestConsensus(t *testing.T) {
msg := newConsensusMessage(s.chain.BlockHeight()+1, s.chain.BlockHeight()+2)
require.Error(t, s.handleMessage(p, msg))
})
t.Run("invalid category", func(t *testing.T) {
pl := payload.NewExtensible()
pl.Category = "invalid"
pl.ValidBlockEnd = s.chain.BlockHeight() + 1
msg := NewMessage(CMDExtensible, pl)
require.Error(t, s.handleMessage(p, msg))
})
}
func TestTransaction(t *testing.T) {

View file

@ -77,6 +77,9 @@ func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb
s.MainCfg = cfg
if cfg.Enabled {
if bcConf.StateRootInHeader {
return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled")
}
var err error
w := cfg.UnlockWallet
if s.wallet, err = wallet.NewWalletFromFile(w.Path); err != nil {