From 66aafd868b545ab6c53d6efa88e1880031d6c3f4 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 12 Jan 2022 21:09:37 +0300 Subject: [PATCH] network: unplug stateroot service from the Server Notice that it makes the node accept Extensible payloads with any category which is the same way C# node works. We're trusting Extensible senders, improper payloads are harmless until they DoS the network, but we have some protections against that too (and spamming with proper category doesn't differ a lot). --- cli/server/server.go | 7 ++++ pkg/consensus/consensus.go | 11 +++--- pkg/consensus/consensus_test.go | 4 +-- pkg/network/server.go | 60 ++++++++++++------------------- pkg/network/server_test.go | 16 ++++----- pkg/services/stateroot/service.go | 3 ++ 6 files changed, 46 insertions(+), 55 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index 957b1fa3b..e2638a2e8 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -18,6 +18,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/metrics" "github.com/nspcc-dev/neo-go/pkg/rpc/server" "github.com/nspcc-dev/neo-go/pkg/services/oracle" + "github.com/nspcc-dev/neo-go/pkg/services/stateroot" "github.com/urfave/cli" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -361,6 +362,12 @@ func startServer(ctx *cli.Context) error { if err != nil { return cli.NewExitError(fmt.Errorf("failed to create network server: %w", err), 1) } + sr, err := stateroot.New(serverConfig.StateRootCfg, log, chain, serv.BroadcastExtensible) + if err != nil { + return cli.NewExitError(fmt.Errorf("can't initialize StateRoot service: %w", err), 1) + } + serv.AddExtensibleService(sr, stateroot.Category, sr.OnPayload) + oracleSrv, err := mkOracle(serverConfig, chain, serv, log) if err != nil { return err diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 76fd67428..01178ae5f 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -52,7 +52,7 @@ type Service interface { Shutdown() // OnPayload is a callback to notify Service about new received payload. - OnPayload(p *npayload.Extensible) + OnPayload(p *npayload.Extensible) error // OnTransaction is a callback to notify Service about new received transaction. OnTransaction(tx *transaction.Transaction) } @@ -365,26 +365,27 @@ func (s *service) payloadFromExtensible(ep *npayload.Extensible) *Payload { } // OnPayload handles Payload receive. -func (s *service) OnPayload(cp *npayload.Extensible) { +func (s *service) OnPayload(cp *npayload.Extensible) error { log := s.log.With(zap.Stringer("hash", cp.Hash())) p := s.payloadFromExtensible(cp) // decode payload data into message if err := p.decodeData(); err != nil { log.Info("can't decode payload data", zap.Error(err)) - return + return nil } if !s.validatePayload(p) { log.Info("can't validate payload") - return + return nil } if s.dbft == nil || !s.started.Load() { log.Debug("dbft is inactive or not started yet") - return + return nil } s.messages <- *p + return nil } func (s *service) OnTransaction(tx *transaction.Transaction) { diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 71bb5d7d5..6bc2e6b45 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -351,7 +351,7 @@ func TestService_OnPayload(t *testing.T) { p.encodeData() // sender is invalid - srv.OnPayload(&p.Extensible) + require.NoError(t, srv.OnPayload(&p.Extensible)) shouldNotReceive(t, srv.messages) p = new(Payload) @@ -359,7 +359,7 @@ func TestService_OnPayload(t *testing.T) { p.Sender = priv.GetScriptHash() p.SetPayload(&prepareRequest{}) require.NoError(t, p.Sign(priv)) - srv.OnPayload(&p.Extensible) + require.NoError(t, srv.OnPayload(&p.Extensible)) shouldReceive(t, srv.messages) } diff --git a/pkg/network/server.go b/pkg/network/server.go index 6353587cb..732f31f94 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -25,7 +25,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/services/notary" - "github.com/nspcc-dev/neo-go/pkg/services/stateroot" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" "go.uber.org/zap" @@ -83,6 +82,7 @@ type ( notaryFeer NotaryFeer notaryModule *notary.Notary services []Service + extensHandlers map[string]func(*payload.Extensible) error txInLock sync.Mutex txInMap map[util.Uint256]struct{} @@ -103,7 +103,6 @@ type ( syncReached *atomic.Bool - stateRoot stateroot.Service stateSync blockchainer.StateSync log *zap.Logger @@ -159,6 +158,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), log: log, transactions: make(chan *transaction.Transaction, 64), + extensHandlers: make(map[string]func(*payload.Extensible) error), } if chain.P2PSigExtensionsEnabled() { s.notaryFeer = NewNotaryFeer(chain) @@ -194,17 +194,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai s.tryStartServices() }) - if config.StateRootCfg.Enabled && chain.GetConfig().StateRootInHeader { - return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled") - } - - sr, err := stateroot.New(config.StateRootCfg, s.log, chain, s.handleNewPayload) - if err != nil { - return nil, fmt.Errorf("can't initialize StateRoot service: %w", err) - } - s.stateRoot = sr - s.services = append(s.services, sr) - sSync := chain.GetStateSyncModule() s.stateSync = sSync s.bSyncQueue = newBlockQueue(maxBlockBatch, sSync, log, nil) @@ -212,7 +201,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai if config.Wallet != nil { srv, err := newConsensus(consensus.Config{ Logger: log, - Broadcast: s.handleNewPayload, + Broadcast: s.BroadcastExtensible, Chain: chain, ProtocolConfiguration: chain.GetConfig(), RequestTx: s.requestTx, @@ -225,7 +214,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai } s.consensus = srv - s.services = append(s.services, srv) + s.AddExtensibleService(srv, consensus.Category, srv.OnPayload) } if s.MinPeers < 0 { @@ -306,9 +295,10 @@ func (s *Server) AddService(svc Service) { s.services = append(s.services, svc) } -// GetStateRoot returns state root service instance. -func (s *Server) GetStateRoot() stateroot.Service { - return s.stateRoot +// AddExtensibleService register a service that handles extensible payload of some kind. +func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) { + s.extensHandlers[category] = handler + s.AddService(svc) } // UnconnectedPeers returns a list of peers that are in the discovery peer list @@ -946,27 +936,26 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { if !ok { // payload is already in cache return nil } - switch e.Category { - case consensus.Category: - if s.consensus != nil { - s.consensus.OnPayload(e) - } - case stateroot.Category: - err := s.stateRoot.OnPayload(e) + handler := s.extensHandlers[e.Category] + if handler != nil { + err = handler(e) if err != nil { return err } - default: - return errors.New("invalid category") } + s.advertiseExtensible(e) + return nil +} +func (s *Server) advertiseExtensible(e *payload.Extensible) { msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()})) if e.Category == consensus.Category { + // It's high priority because it directly affects consensus process, + // even though it's just an inv. s.broadcastHPMessage(msg) } else { s.broadcastMessage(msg) } - return nil } // handleTxCmd processes received transaction. @@ -1253,22 +1242,17 @@ func (s *Server) tryInitStateSync() { } } } -func (s *Server) handleNewPayload(p *payload.Extensible) { + +// BroadcastExtensible add locally-generated Extensible payload to the pool +// and advertises it to peers. +func (s *Server) BroadcastExtensible(p *payload.Extensible) { _, err := s.extensiblePool.Add(p) if err != nil { s.log.Error("created payload is not valid", zap.Error(err)) return } - msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{p.Hash()})) - switch p.Category { - case consensus.Category: - // It's high priority because it directly affects consensus process, - // even though it's just an inv. - s.broadcastHPMessage(msg) - default: - s.broadcastMessage(msg) - } + s.advertiseExtensible(p) } func (s *Server) requestTx(hashes ...util.Uint256) { diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 872501484..ec3bc9162 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -41,9 +41,12 @@ var _ consensus.Service = (*fakeConsensus)(nil) func newFakeConsensus(c consensus.Config) (consensus.Service, error) { return new(fakeConsensus), nil } -func (f *fakeConsensus) Start() { f.started.Store(true) } -func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } -func (f *fakeConsensus) OnPayload(p *payload.Extensible) { f.payloads = append(f.payloads, p) } +func (f *fakeConsensus) Start() { f.started.Store(true) } +func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } +func (f *fakeConsensus) OnPayload(p *payload.Extensible) error { + f.payloads = append(f.payloads, p) + return nil +} func (f *fakeConsensus) OnTransaction(tx *transaction.Transaction) { f.txs = append(f.txs, tx) } func (f *fakeConsensus) GetPayload(h util.Uint256) *payload.Extensible { panic("implement me") } @@ -455,13 +458,6 @@ func TestConsensus(t *testing.T) { msg := newConsensusMessage(s.chain.BlockHeight()+1, s.chain.BlockHeight()+2) require.Error(t, s.handleMessage(p, msg)) }) - t.Run("invalid category", func(t *testing.T) { - pl := payload.NewExtensible() - pl.Category = "invalid" - pl.ValidBlockEnd = s.chain.BlockHeight() + 1 - msg := NewMessage(CMDExtensible, pl) - require.Error(t, s.handleMessage(p, msg)) - }) } func TestTransaction(t *testing.T) { diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 375695dab..07fd9554c 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -77,6 +77,9 @@ func New(cfg config.StateRoot, log *zap.Logger, bc blockchainer.Blockchainer, cb s.MainCfg = cfg if cfg.Enabled { + if bcConf.StateRootInHeader { + return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled") + } var err error w := cfg.UnlockWallet if s.wallet, err = wallet.NewWalletFromFile(w.Path); err != nil {