network: extend Service with Name, use it to distinguish services

This commit is contained in:
Roman Khimov 2022-04-22 11:33:56 +03:00
parent a10b1ad32d
commit 2593bb0535
8 changed files with 41 additions and 10 deletions

View file

@ -63,6 +63,8 @@ type Ledger interface {
// Service represents consensus instance. // Service represents consensus instance.
type Service interface { type Service interface {
// Name returns service name.
Name() string
// Start initializes dBFT and starts event loop for consensus service. // Start initializes dBFT and starts event loop for consensus service.
// It must be called only when sufficient amount of peers are connected. // It must be called only when sufficient amount of peers are connected.
Start() Start()
@ -256,6 +258,11 @@ func (s *service) newPrepareRequest() payload.PrepareRequest {
return r return r
} }
// Name returns service name.
func (s *service) Name() string {
return "consensus"
}
func (s *service) Start() { func (s *service) Start() {
if s.started.CAS(false, true) { if s.started.CAS(false, true) {
s.log.Info("starting consensus service") s.log.Info("starting consensus service")

View file

@ -75,6 +75,7 @@ type (
// Service is a service abstraction (oracle, state root, consensus, etc). // Service is a service abstraction (oracle, state root, consensus, etc).
Service interface { Service interface {
Name() string
Start() Start()
Shutdown() Shutdown()
} }
@ -100,7 +101,7 @@ type (
notaryRequestPool *mempool.Pool notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool extensiblePool *extpool.Pool
notaryFeer NotaryFeer notaryFeer NotaryFeer
services []Service services map[string]Service
extensHandlers map[string]func(*payload.Extensible) error extensHandlers map[string]func(*payload.Extensible) error
extensHighPrio string extensHighPrio string
txCallback func(*transaction.Transaction) txCallback func(*transaction.Transaction)
@ -177,6 +178,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log, log: log,
transactions: make(chan *transaction.Transaction, 64), transactions: make(chan *transaction.Transaction, 64),
services: make(map[string]Service),
extensHandlers: make(map[string]func(*payload.Extensible) error), extensHandlers: make(map[string]func(*payload.Extensible) error),
stateSync: stSync, stateSync: stSync,
} }
@ -270,7 +272,7 @@ func (s *Server) Shutdown() {
// AddService allows to add a service to be started/stopped by Server. // AddService allows to add a service to be started/stopped by Server.
func (s *Server) AddService(svc Service) { func (s *Server) AddService(svc Service) {
s.services = append(s.services, svc) s.services[svc.Name()] = svc
} }
// AddExtensibleService register a service that handles extensible payload of some kind. // AddExtensibleService register a service that handles extensible payload of some kind.

View file

@ -38,8 +38,9 @@ type fakeConsensus struct {
var _ consensus.Service = (*fakeConsensus)(nil) var _ consensus.Service = (*fakeConsensus)(nil)
func (f *fakeConsensus) Start() { f.started.Store(true) } func (f *fakeConsensus) Name() string { return "fake" }
func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } func (f *fakeConsensus) Start() { f.started.Store(true) }
func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) }
func (f *fakeConsensus) OnPayload(p *payload.Extensible) error { func (f *fakeConsensus) OnPayload(p *payload.Extensible) error {
f.payloads = append(f.payloads, p) f.payloads = append(f.payloads, p)
return nil return nil
@ -114,12 +115,12 @@ func TestServerStartAndShutdown(t *testing.T) {
p := newLocalPeer(t, s) p := newLocalPeer(t, s)
s.register <- p s.register <- p
assert.True(t, s.services[0].(*fakeConsensus).started.Load()) assert.True(t, s.services["fake"].(*fakeConsensus).started.Load())
s.Shutdown() s.Shutdown()
<-ch <-ch
require.True(t, s.services[0].(*fakeConsensus).stopped.Load()) require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load())
}) })
} }
@ -431,13 +432,13 @@ func TestConsensus(t *testing.T) {
s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil } s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil }
require.NoError(t, s.handleMessage(p, msg)) require.NoError(t, s.handleMessage(p, msg))
require.Contains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) require.Contains(t, s.services["fake"].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible))
t.Run("small ValidUntilBlockEnd", func(t *testing.T) { t.Run("small ValidUntilBlockEnd", func(t *testing.T) {
t.Run("current height", func(t *testing.T) { t.Run("current height", func(t *testing.T) {
msg := newConsensusMessage(0, s.chain.BlockHeight()) msg := newConsensusMessage(0, s.chain.BlockHeight())
require.NoError(t, s.handleMessage(p, msg)) require.NoError(t, s.handleMessage(p, msg))
require.NotContains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) require.NotContains(t, s.services["fake"].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible))
}) })
t.Run("invalid", func(t *testing.T) { t.Run("invalid", func(t *testing.T) {
msg := newConsensusMessage(0, s.chain.BlockHeight()-1) msg := newConsensusMessage(0, s.chain.BlockHeight()-1)
@ -468,13 +469,13 @@ func TestTransaction(t *testing.T) {
s.register <- p s.register <- p
s.testHandleMessage(t, nil, CMDTX, tx) s.testHandleMessage(t, nil, CMDTX, tx)
require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx)
}) })
t.Run("bad", func(t *testing.T) { t.Run("bad", func(t *testing.T) {
tx := newDummyTx() tx := newDummyTx()
s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds }
s.testHandleMessage(t, nil, CMDTX, tx) s.testHandleMessage(t, nil, CMDTX, tx)
require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) // Consensus receives everything. require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) // Consensus receives everything.
}) })
} }

View file

@ -204,6 +204,11 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S
} }
} }
// Name returns service name.
func (s *Server) Name() string {
return "rpc"
}
// Start creates a new JSON-RPC server listening on the configured port. It creates // Start creates a new JSON-RPC server listening on the configured port. It creates
// goroutines needed internally and it returns its errors via errChan passed to New(). // goroutines needed internally and it returns its errors via errChan passed to New().
func (s *Server) Start() { func (s *Server) Start() {

View file

@ -152,6 +152,11 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu
}, nil }, nil
} }
// Name returns service name.
func (n *Notary) Name() string {
return "notary"
}
// Start runs Notary module in a separate goroutine. // Start runs Notary module in a separate goroutine.
func (n *Notary) Start() { func (n *Notary) Start() {
n.Config.Log.Info("starting notary service") n.Config.Log.Info("starting notary service")

View file

@ -174,6 +174,11 @@ func NewOracle(cfg Config) (*Oracle, error) {
return o, nil return o, nil
} }
// Name returns service name.
func (o *Oracle) Name() string {
return "oracle"
}
// Shutdown shutdowns Oracle. // Shutdown shutdowns Oracle.
func (o *Oracle) Shutdown() { func (o *Oracle) Shutdown() {
close(o.close) close(o.close)

View file

@ -28,6 +28,7 @@ type (
// Service represents state root service. // Service represents state root service.
Service interface { Service interface {
Name() string
OnPayload(p *payload.Extensible) error OnPayload(p *payload.Extensible) error
AddSignature(height uint32, validatorIndex int32, sig []byte) error AddSignature(height uint32, validatorIndex int32, sig []byte) error
GetConfig() config.StateRoot GetConfig() config.StateRoot

View file

@ -17,6 +17,11 @@ const (
firstVoteResendDelay = 3 * time.Second firstVoteResendDelay = 3 * time.Second
) )
// Name returns service name.
func (s *service) Name() string {
return "stateroot"
}
// Start runs service instance in a separate goroutine. // Start runs service instance in a separate goroutine.
func (s *service) Start() { func (s *service) Start() {
s.log.Info("starting state validation service") s.log.Info("starting state validation service")