From 2593bb0535da28ca508ad7948bc5634a1933c22c Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 22 Apr 2022 11:33:56 +0300 Subject: [PATCH] network: extend Service with Name, use it to distinguish services --- pkg/consensus/consensus.go | 7 +++++++ pkg/network/server.go | 6 ++++-- pkg/network/server_test.go | 17 +++++++++-------- pkg/rpc/server/server.go | 5 +++++ pkg/services/notary/notary.go | 5 +++++ pkg/services/oracle/oracle.go | 5 +++++ pkg/services/stateroot/service.go | 1 + pkg/services/stateroot/validators.go | 5 +++++ 8 files changed, 41 insertions(+), 10 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 585e34aa8..28c9b4336 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -63,6 +63,8 @@ type Ledger interface { // Service represents consensus instance. type Service interface { + // Name returns service name. + Name() string // Start initializes dBFT and starts event loop for consensus service. // It must be called only when sufficient amount of peers are connected. Start() @@ -256,6 +258,11 @@ func (s *service) newPrepareRequest() payload.PrepareRequest { return r } +// Name returns service name. +func (s *service) Name() string { + return "consensus" +} + func (s *service) Start() { if s.started.CAS(false, true) { s.log.Info("starting consensus service") diff --git a/pkg/network/server.go b/pkg/network/server.go index 69bce8051..b7c6bb342 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -75,6 +75,7 @@ type ( // Service is a service abstraction (oracle, state root, consensus, etc). Service interface { + Name() string Start() Shutdown() } @@ -100,7 +101,7 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer - services []Service + services map[string]Service extensHandlers map[string]func(*payload.Extensible) error extensHighPrio string txCallback func(*transaction.Transaction) @@ -177,6 +178,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), log: log, transactions: make(chan *transaction.Transaction, 64), + services: make(map[string]Service), extensHandlers: make(map[string]func(*payload.Extensible) error), stateSync: stSync, } @@ -270,7 +272,7 @@ func (s *Server) Shutdown() { // AddService allows to add a service to be started/stopped by Server. func (s *Server) AddService(svc Service) { - s.services = append(s.services, svc) + s.services[svc.Name()] = svc } // AddExtensibleService register a service that handles extensible payload of some kind. diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 0624b157b..6c1af736f 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -38,8 +38,9 @@ type fakeConsensus struct { var _ consensus.Service = (*fakeConsensus)(nil) -func (f *fakeConsensus) Start() { f.started.Store(true) } -func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } +func (f *fakeConsensus) Name() string { return "fake" } +func (f *fakeConsensus) Start() { f.started.Store(true) } +func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } func (f *fakeConsensus) OnPayload(p *payload.Extensible) error { f.payloads = append(f.payloads, p) return nil @@ -114,12 +115,12 @@ func TestServerStartAndShutdown(t *testing.T) { p := newLocalPeer(t, s) s.register <- p - assert.True(t, s.services[0].(*fakeConsensus).started.Load()) + assert.True(t, s.services["fake"].(*fakeConsensus).started.Load()) s.Shutdown() <-ch - require.True(t, s.services[0].(*fakeConsensus).stopped.Load()) + require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load()) }) } @@ -431,13 +432,13 @@ func TestConsensus(t *testing.T) { s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil } require.NoError(t, s.handleMessage(p, msg)) - require.Contains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) + require.Contains(t, s.services["fake"].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) t.Run("small ValidUntilBlockEnd", func(t *testing.T) { t.Run("current height", func(t *testing.T) { msg := newConsensusMessage(0, s.chain.BlockHeight()) require.NoError(t, s.handleMessage(p, msg)) - require.NotContains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) + require.NotContains(t, s.services["fake"].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) }) t.Run("invalid", func(t *testing.T) { msg := newConsensusMessage(0, s.chain.BlockHeight()-1) @@ -468,13 +469,13 @@ func TestTransaction(t *testing.T) { s.register <- p s.testHandleMessage(t, nil, CMDTX, tx) - require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) + require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) }) t.Run("bad", func(t *testing.T) { tx := newDummyTx() s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.testHandleMessage(t, nil, CMDTX, tx) - require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) // Consensus receives everything. + require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) // Consensus receives everything. }) } diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 287918ff7..7e83dc68d 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -204,6 +204,11 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S } } +// Name returns service name. +func (s *Server) Name() string { + return "rpc" +} + // Start creates a new JSON-RPC server listening on the configured port. It creates // goroutines needed internally and it returns its errors via errChan passed to New(). func (s *Server) Start() { diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index f6537ac5b..66dd24e78 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -152,6 +152,11 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu }, nil } +// Name returns service name. +func (n *Notary) Name() string { + return "notary" +} + // Start runs Notary module in a separate goroutine. func (n *Notary) Start() { n.Config.Log.Info("starting notary service") diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 5165ba5c5..a67a6f1e2 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -174,6 +174,11 @@ func NewOracle(cfg Config) (*Oracle, error) { return o, nil } +// Name returns service name. +func (o *Oracle) Name() string { + return "oracle" +} + // Shutdown shutdowns Oracle. func (o *Oracle) Shutdown() { close(o.close) diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 8da2ae13d..e643d65bf 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -28,6 +28,7 @@ type ( // Service represents state root service. Service interface { + Name() string OnPayload(p *payload.Extensible) error AddSignature(height uint32, validatorIndex int32, sig []byte) error GetConfig() config.StateRoot diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 8c934590f..1b8619a64 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -17,6 +17,11 @@ const ( firstVoteResendDelay = 3 * time.Second ) +// Name returns service name. +func (s *service) Name() string { + return "stateroot" +} + // Start runs service instance in a separate goroutine. func (s *service) Start() { s.log.Info("starting state validation service")