Merge pull request #2445 from nspcc-dev/rpc-when-sync

Add StartWhenSynchronized option for RPC server
This commit is contained in:
Roman Khimov 2022-04-26 14:20:00 +03:00 committed by GitHub
commit 8983d25f7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 138 additions and 80 deletions

View file

@ -151,9 +151,9 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
require.NoError(t, err) require.NoError(t, err)
netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
go netSrv.Start(make(chan error, 1)) go netSrv.Start(make(chan error, 1))
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger)
errCh := make(chan error, 2) errCh := make(chan error, 2)
rpcServer.Start(errCh) rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh)
rpcServer.Start()
return chain, &rpcServer, netSrv return chain, &rpcServer, netSrv
} }
@ -187,7 +187,7 @@ func newExecutorWithConfig(t *testing.T, needChain, runChain bool, f func(*confi
func (e *executor) Close(t *testing.T) { func (e *executor) Close(t *testing.T) {
input.Terminal = nil input.Terminal = nil
if e.RPC != nil { if e.RPC != nil {
require.NoError(t, e.RPC.Shutdown()) e.RPC.Shutdown()
} }
if e.NetSrv != nil { if e.NetSrv != nil {
e.NetSrv.Shutdown() e.NetSrv.Shutdown()

View file

@ -504,11 +504,14 @@ func startServer(ctx *cli.Context) error {
if err != nil { if err != nil {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log)
errChan := make(chan error) errChan := make(chan error)
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan)
serv.AddService(&rpcServer)
go serv.Start(errChan) go serv.Start(errChan)
rpcServer.Start(errChan) if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized {
rpcServer.Start()
}
sighupCh := make(chan os.Signal, 1) sighupCh := make(chan os.Signal, 1)
signal.Notify(sighupCh, syscall.SIGHUP) signal.Notify(sighupCh, syscall.SIGHUP)
@ -528,20 +531,16 @@ Main:
switch sig { switch sig {
case syscall.SIGHUP: case syscall.SIGHUP:
log.Info("SIGHUP received, restarting rpc-server") log.Info("SIGHUP received, restarting rpc-server")
serverErr := rpcServer.Shutdown() rpcServer.Shutdown()
if serverErr != nil { rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan)
errChan <- fmt.Errorf("error while restarting rpc-server: %w", serverErr) serv.AddService(&rpcServer) // Replaces old one by service name.
break if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() {
rpcServer.Start()
} }
rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log)
rpcServer.Start(errChan)
} }
case <-grace.Done(): case <-grace.Done():
signal.Stop(sighupCh) signal.Stop(sighupCh)
serv.Shutdown() serv.Shutdown()
if serverErr := rpcServer.Shutdown(); serverErr != nil {
shutdownErr = fmt.Errorf("error on shutdown: %w", serverErr)
}
break Main break Main
} }
} }

View file

@ -55,6 +55,17 @@ Or specify a different network with appropriate flag like this:
By default, the node will run in foreground using current standard output for By default, the node will run in foreground using current standard output for
logging. logging.
### Node synchronization
Most of the services (state validation, oracle, consensus and RPC if
configured with `StartWhenSynchronized` option) are only started after the
node is completely synchronizaed because running them before that is either
pointless or even dangerous. The node considers itself to be fully
synchronized with the network if it has more than `MinPeers` neighbours and if
at least 2/3 of them are known to have a height less than or equal to the
current height of the node.
### Restarting node services ### Restarting node services
To restart some node services without full node restart, send the SIGHUP To restart some node services without full node restart, send the SIGHUP

View file

@ -136,6 +136,7 @@ RPC:
MaxFindResultItems: 100 MaxFindResultItems: 100
MaxNEP11Tokens: 100 MaxNEP11Tokens: 100
Port: 10332 Port: 10332
StartWhenSynchronized: false
TLSConfig: TLSConfig:
Address: "" Address: ""
CertFile: serv.crt CertFile: serv.crt
@ -158,6 +159,10 @@ where:
- `MaxNEP11Tokens` - limit for the number of tokens returned from - `MaxNEP11Tokens` - limit for the number of tokens returned from
`getnep11balances` call. `getnep11balances` call.
- `Port` is an RPC server port it should be bound to. - `Port` is an RPC server port it should be bound to.
- `StartWhenSynchronized` controls when RPC server will be started, by default
(`false` setting) it's started immediately and RPC is availabe during node
synchronization. Setting it to `true` will make the node start RPC service only
after full synchronization.
- `TLS` section configures TLS protocol. - `TLS` section configures TLS protocol.
### State Root Configuration ### State Root Configuration

View file

@ -63,6 +63,8 @@ type Ledger interface {
// Service represents consensus instance. // Service represents consensus instance.
type Service interface { type Service interface {
// Name returns service name.
Name() string
// Start initializes dBFT and starts event loop for consensus service. // Start initializes dBFT and starts event loop for consensus service.
// It must be called only when sufficient amount of peers are connected. // It must be called only when sufficient amount of peers are connected.
Start() Start()
@ -256,6 +258,11 @@ func (s *service) newPrepareRequest() payload.PrepareRequest {
return r return r
} }
// Name returns service name.
func (s *service) Name() string {
return "consensus"
}
func (s *service) Start() { func (s *service) Start() {
if s.started.CAS(false, true) { if s.started.CAS(false, true) {
s.log.Info("starting consensus service") s.log.Info("starting consensus service")

View file

@ -105,10 +105,13 @@ func (bq *blockQueue) run() {
func (bq *blockQueue) putBlock(block *block.Block) error { func (bq *blockQueue) putBlock(block *block.Block) error {
h := bq.chain.BlockHeight() h := bq.chain.BlockHeight()
bq.queueLock.Lock() bq.queueLock.Lock()
defer bq.queueLock.Unlock()
if bq.discarded.Load() {
return nil
}
if block.Index <= h || h+blockCacheSize < block.Index { if block.Index <= h || h+blockCacheSize < block.Index {
// can easily happen when fetching the same blocks from // can easily happen when fetching the same blocks from
// different peers, thus not considered as error // different peers, thus not considered as error
bq.queueLock.Unlock()
return nil return nil
} }
pos := indexToPosition(block.Index) pos := indexToPosition(block.Index)
@ -122,7 +125,6 @@ func (bq *blockQueue) putBlock(block *block.Block) error {
} }
} }
l := bq.len l := bq.len
bq.queueLock.Unlock()
// update metrics // update metrics
updateBlockQueueLenMetric(l) updateBlockQueueLenMetric(l)
select { select {
@ -142,8 +144,8 @@ func (bq *blockQueue) lastQueued() uint32 {
func (bq *blockQueue) discard() { func (bq *blockQueue) discard() {
if bq.discarded.CAS(false, true) { if bq.discarded.CAS(false, true) {
close(bq.checkBlocks)
bq.queueLock.Lock() bq.queueLock.Lock()
close(bq.checkBlocks)
// Technically we could bq.queue = nil, but this would cost // Technically we could bq.queue = nil, but this would cost
// another if in run(). // another if in run().
for i := 0; i < len(bq.queue); i++ { for i := 0; i < len(bq.queue); i++ {

View file

@ -75,6 +75,7 @@ type (
// Service is a service abstraction (oracle, state root, consensus, etc). // Service is a service abstraction (oracle, state root, consensus, etc).
Service interface { Service interface {
Name() string
Start() Start()
Shutdown() Shutdown()
} }
@ -100,7 +101,7 @@ type (
notaryRequestPool *mempool.Pool notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool extensiblePool *extpool.Pool
notaryFeer NotaryFeer notaryFeer NotaryFeer
services []Service services map[string]Service
extensHandlers map[string]func(*payload.Extensible) error extensHandlers map[string]func(*payload.Extensible) error
extensHighPrio string extensHighPrio string
txCallback func(*transaction.Transaction) txCallback func(*transaction.Transaction)
@ -177,6 +178,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log, log: log,
transactions: make(chan *transaction.Transaction, 64), transactions: make(chan *transaction.Transaction, 64),
services: make(map[string]Service),
extensHandlers: make(map[string]func(*payload.Extensible) error), extensHandlers: make(map[string]func(*payload.Extensible) error),
stateSync: stSync, stateSync: stSync,
} }
@ -270,7 +272,7 @@ func (s *Server) Shutdown() {
// AddService allows to add a service to be started/stopped by Server. // AddService allows to add a service to be started/stopped by Server.
func (s *Server) AddService(svc Service) { func (s *Server) AddService(svc Service) {
s.services = append(s.services, svc) s.services[svc.Name()] = svc
} }
// AddExtensibleService register a service that handles extensible payload of some kind. // AddExtensibleService register a service that handles extensible payload of some kind.

View file

@ -38,8 +38,9 @@ type fakeConsensus struct {
var _ consensus.Service = (*fakeConsensus)(nil) var _ consensus.Service = (*fakeConsensus)(nil)
func (f *fakeConsensus) Start() { f.started.Store(true) } func (f *fakeConsensus) Name() string { return "fake" }
func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } func (f *fakeConsensus) Start() { f.started.Store(true) }
func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) }
func (f *fakeConsensus) OnPayload(p *payload.Extensible) error { func (f *fakeConsensus) OnPayload(p *payload.Extensible) error {
f.payloads = append(f.payloads, p) f.payloads = append(f.payloads, p)
return nil return nil
@ -114,12 +115,12 @@ func TestServerStartAndShutdown(t *testing.T) {
p := newLocalPeer(t, s) p := newLocalPeer(t, s)
s.register <- p s.register <- p
assert.True(t, s.services[0].(*fakeConsensus).started.Load()) assert.True(t, s.services["fake"].(*fakeConsensus).started.Load())
s.Shutdown() s.Shutdown()
<-ch <-ch
require.True(t, s.services[0].(*fakeConsensus).stopped.Load()) require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load())
}) })
} }
@ -431,13 +432,13 @@ func TestConsensus(t *testing.T) {
s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil } s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil }
require.NoError(t, s.handleMessage(p, msg)) require.NoError(t, s.handleMessage(p, msg))
require.Contains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) require.Contains(t, s.services["fake"].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible))
t.Run("small ValidUntilBlockEnd", func(t *testing.T) { t.Run("small ValidUntilBlockEnd", func(t *testing.T) {
t.Run("current height", func(t *testing.T) { t.Run("current height", func(t *testing.T) {
msg := newConsensusMessage(0, s.chain.BlockHeight()) msg := newConsensusMessage(0, s.chain.BlockHeight())
require.NoError(t, s.handleMessage(p, msg)) require.NoError(t, s.handleMessage(p, msg))
require.NotContains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) require.NotContains(t, s.services["fake"].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible))
}) })
t.Run("invalid", func(t *testing.T) { t.Run("invalid", func(t *testing.T) {
msg := newConsensusMessage(0, s.chain.BlockHeight()-1) msg := newConsensusMessage(0, s.chain.BlockHeight()-1)
@ -468,13 +469,13 @@ func TestTransaction(t *testing.T) {
s.register <- p s.register <- p
s.testHandleMessage(t, nil, CMDTX, tx) s.testHandleMessage(t, nil, CMDTX, tx)
require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx)
}) })
t.Run("bad", func(t *testing.T) { t.Run("bad", func(t *testing.T) {
tx := newDummyTx() tx := newDummyTx()
s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds }
s.testHandleMessage(t, nil, CMDTX, tx) s.testHandleMessage(t, nil, CMDTX, tx)
require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) // Consensus receives everything. require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) // Consensus receives everything.
}) })
} }

View file

@ -17,6 +17,7 @@ type (
MaxFindResultItems int `yaml:"MaxFindResultItems"` MaxFindResultItems int `yaml:"MaxFindResultItems"`
MaxNEP11Tokens int `yaml:"MaxNEP11Tokens"` MaxNEP11Tokens int `yaml:"MaxNEP11Tokens"`
Port uint16 `yaml:"Port"` Port uint16 `yaml:"Port"`
StartWhenSynchronized bool `yaml:"StartWhenSynchronized"`
TLSConfig TLSConfig `yaml:"TLSConfig"` TLSConfig TLSConfig `yaml:"TLSConfig"`
} }

View file

@ -32,7 +32,7 @@ import (
func TestClient_NEP17(t *testing.T) { func TestClient_NEP17(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -75,7 +75,7 @@ func TestClient_NEP17(t *testing.T) {
func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) { func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
const extraFee = 10 const extraFee = 10
var nonce uint32 var nonce uint32
@ -324,7 +324,7 @@ func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) {
func TestCalculateNetworkFee(t *testing.T) { func TestCalculateNetworkFee(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
const extraFee = 10 const extraFee = 10
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
@ -394,7 +394,7 @@ func TestCalculateNetworkFee(t *testing.T) {
func TestSignAndPushInvocationTx(t *testing.T) { func TestSignAndPushInvocationTx(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -551,7 +551,7 @@ func TestSignAndPushInvocationTx(t *testing.T) {
func TestSignAndPushP2PNotaryRequest(t *testing.T) { func TestSignAndPushP2PNotaryRequest(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true) chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -642,7 +642,7 @@ func TestSignAndPushP2PNotaryRequest(t *testing.T) {
func TestCalculateNotaryFee(t *testing.T) { func TestCalculateNotaryFee(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -662,7 +662,7 @@ func TestPing(t *testing.T) {
require.NoError(t, c.Init()) require.NoError(t, c.Init())
require.NoError(t, c.Ping()) require.NoError(t, c.Ping())
require.NoError(t, rpcSrv.Shutdown()) rpcSrv.Shutdown()
httpSrv.Close() httpSrv.Close()
require.Error(t, c.Ping()) require.Error(t, c.Ping())
} }
@ -670,7 +670,7 @@ func TestPing(t *testing.T) {
func TestCreateTxFromScript(t *testing.T) { func TestCreateTxFromScript(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -699,7 +699,7 @@ func TestCreateTxFromScript(t *testing.T) {
func TestCreateNEP17TransferTx(t *testing.T) { func TestCreateNEP17TransferTx(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -749,7 +749,7 @@ func TestCreateNEP17TransferTx(t *testing.T) {
func TestInvokeVerify(t *testing.T) { func TestInvokeVerify(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -791,7 +791,7 @@ func TestInvokeVerify(t *testing.T) {
func TestClient_GetNativeContracts(t *testing.T) { func TestClient_GetNativeContracts(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -805,7 +805,7 @@ func TestClient_GetNativeContracts(t *testing.T) {
func TestClient_NEP11_ND(t *testing.T) { func TestClient_NEP11_ND(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -871,7 +871,7 @@ func TestClient_NEP11_ND(t *testing.T) {
func TestClient_NEP11_D(t *testing.T) { func TestClient_NEP11_D(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -938,7 +938,7 @@ func TestClient_NEP11_D(t *testing.T) {
func TestClient_NNS(t *testing.T) { func TestClient_NNS(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -987,7 +987,7 @@ func TestClient_NNS(t *testing.T) {
func TestClient_GetNotaryServiceFeePerKey(t *testing.T) { func TestClient_GetNotaryServiceFeePerKey(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)
@ -1002,7 +1002,7 @@ func TestClient_GetNotaryServiceFeePerKey(t *testing.T) {
func TestClient_GetOraclePrice(t *testing.T) { func TestClient_GetOraclePrice(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
require.NoError(t, err) require.NoError(t, err)

View file

@ -50,6 +50,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/emit"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -66,6 +67,8 @@ type (
log *zap.Logger log *zap.Logger
https *http.Server https *http.Server
shutdown chan struct{} shutdown chan struct{}
started *atomic.Bool
errChan chan error
subsLock sync.RWMutex subsLock sync.RWMutex
subscribers map[*subscriber]bool subscribers map[*subscriber]bool
@ -162,7 +165,7 @@ var upgrader = websocket.Upgrader{}
// New creates a new Server struct. // New creates a new Server struct.
func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server, func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server,
orc *oracle.Oracle, log *zap.Logger) Server { orc *oracle.Oracle, log *zap.Logger, errChan chan error) Server {
httpServer := &http.Server{ httpServer := &http.Server{
Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10), Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10),
} }
@ -188,6 +191,8 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S
oracle: orc, oracle: orc,
https: tlsServer, https: tlsServer,
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
started: atomic.NewBool(false),
errChan: errChan,
subscribers: make(map[*subscriber]bool), subscribers: make(map[*subscriber]bool),
// These are NOT buffered to preserve original order of events. // These are NOT buffered to preserve original order of events.
@ -199,14 +204,22 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S
} }
} }
// Start creates a new JSON-RPC server listening on the configured port. It's // Name returns service name.
// supposed to be run as a separate goroutine (like http.Server's Serve) and it func (s *Server) Name() string {
// returns its errors via given errChan. return "rpc"
func (s *Server) Start(errChan chan error) { }
// Start creates a new JSON-RPC server listening on the configured port. It creates
// goroutines needed internally and it returns its errors via errChan passed to New().
func (s *Server) Start() {
if !s.config.Enabled { if !s.config.Enabled {
s.log.Info("RPC server is not enabled") s.log.Info("RPC server is not enabled")
return return
} }
if !s.started.CAS(false, true) {
s.log.Info("RPC server already started")
return
}
s.Handler = http.HandlerFunc(s.handleHTTPRequest) s.Handler = http.HandlerFunc(s.handleHTTPRequest)
s.log.Info("starting rpc-server", zap.String("endpoint", s.Addr)) s.log.Info("starting rpc-server", zap.String("endpoint", s.Addr))
@ -217,20 +230,20 @@ func (s *Server) Start(errChan chan error) {
go func() { go func() {
ln, err := net.Listen("tcp", s.https.Addr) ln, err := net.Listen("tcp", s.https.Addr)
if err != nil { if err != nil {
errChan <- err s.errChan <- err
return return
} }
s.https.Addr = ln.Addr().String() s.https.Addr = ln.Addr().String()
err = s.https.ServeTLS(ln, cfg.CertFile, cfg.KeyFile) err = s.https.ServeTLS(ln, cfg.CertFile, cfg.KeyFile)
if err != http.ErrServerClosed { if err != http.ErrServerClosed {
s.log.Error("failed to start TLS RPC server", zap.Error(err)) s.log.Error("failed to start TLS RPC server", zap.Error(err))
errChan <- err s.errChan <- err
} }
}() }()
} }
ln, err := net.Listen("tcp", s.Addr) ln, err := net.Listen("tcp", s.Addr)
if err != nil { if err != nil {
errChan <- err s.errChan <- err
return return
} }
s.Addr = ln.Addr().String() // set Addr to the actual address s.Addr = ln.Addr().String() // set Addr to the actual address
@ -238,34 +251,35 @@ func (s *Server) Start(errChan chan error) {
err = s.Serve(ln) err = s.Serve(ln)
if err != http.ErrServerClosed { if err != http.ErrServerClosed {
s.log.Error("failed to start RPC server", zap.Error(err)) s.log.Error("failed to start RPC server", zap.Error(err))
errChan <- err s.errChan <- err
} }
}() }()
} }
// Shutdown overrides the http.Server Shutdown // Shutdown stops the RPC server. It can only be called once.
// method. func (s *Server) Shutdown() {
func (s *Server) Shutdown() error { if !s.started.Load() {
var httpsErr error return
}
// Signal to websocket writer routines and handleSubEvents. // Signal to websocket writer routines and handleSubEvents.
close(s.shutdown) close(s.shutdown)
if s.config.TLSConfig.Enabled { if s.config.TLSConfig.Enabled {
s.log.Info("shutting down rpc-server (https)", zap.String("endpoint", s.https.Addr)) s.log.Info("shutting down RPC server (https)", zap.String("endpoint", s.https.Addr))
httpsErr = s.https.Shutdown(context.Background()) err := s.https.Shutdown(context.Background())
if err != nil {
s.log.Warn("error during RPC (https) server shutdown", zap.Error(err))
}
} }
s.log.Info("shutting down rpc-server", zap.String("endpoint", s.Addr)) s.log.Info("shutting down RPC server", zap.String("endpoint", s.Addr))
err := s.Server.Shutdown(context.Background()) err := s.Server.Shutdown(context.Background())
if err != nil {
s.log.Warn("error during RPC (http) server shutdown", zap.Error(err))
}
// Wait for handleSubEvents to finish. // Wait for handleSubEvents to finish.
<-s.executionCh <-s.executionCh
if err == nil {
return httpsErr
}
return err
} }
func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) { func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) {

View file

@ -104,9 +104,9 @@ func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool)
serverConfig.Port = 0 serverConfig.Port = 0
server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger) server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger)
require.NoError(t, err) require.NoError(t, err)
rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger)
errCh := make(chan error, 2) errCh := make(chan error, 2)
rpcServer.Start(errCh) rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger, errCh)
rpcServer.Start()
handler := http.HandlerFunc(rpcServer.handleHTTPRequest) handler := http.HandlerFunc(rpcServer.handleHTTPRequest)
srv := httptest.NewServer(handler) srv := httptest.NewServer(handler)

View file

@ -1304,7 +1304,7 @@ func TestRPC(t *testing.T) {
func TestSubmitOracle(t *testing.T) { func TestSubmitOracle(t *testing.T) {
chain, rpcSrv, httpSrv := initClearServerWithServices(t, true, false) chain, rpcSrv, httpSrv := initClearServerWithServices(t, true, false)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitoracleresponse", "params": %s}` rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitoracleresponse", "params": %s}`
runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) { runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) {
@ -1340,7 +1340,7 @@ func TestSubmitNotaryRequest(t *testing.T) {
t.Run("disabled P2PSigExtensions", func(t *testing.T) { t.Run("disabled P2PSigExtensions", func(t *testing.T) {
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false) chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
req := fmt.Sprintf(rpc, "[]") req := fmt.Sprintf(rpc, "[]")
body := doRPCCallOverHTTP(req, httpSrv.URL, t) body := doRPCCallOverHTTP(req, httpSrv.URL, t)
checkErrGetResult(t, body, true) checkErrGetResult(t, body, true)
@ -1348,7 +1348,7 @@ func TestSubmitNotaryRequest(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true) chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) { runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
@ -1459,7 +1459,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) []
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
e := &executor{chain: chain, httpSrv: httpSrv} e := &executor{chain: chain, httpSrv: httpSrv}
t.Run("single request", func(t *testing.T) { t.Run("single request", func(t *testing.T) {
@ -2608,7 +2608,7 @@ func BenchmarkHandleIn(b *testing.B) {
serverConfig.LogLevel = zapcore.FatalLevel serverConfig.LogLevel = zapcore.FatalLevel
server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger) server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger)
require.NoError(b, err) require.NoError(b, err)
rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger) rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger, make(chan error))
defer chain.Close() defer chain.Close()
do := func(b *testing.B, req []byte) { do := func(b *testing.B, req []byte) {

View file

@ -94,7 +94,7 @@ func TestSubscriptions(t *testing.T) {
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
go rpcSrv.coreServer.Start(make(chan error)) go rpcSrv.coreServer.Start(make(chan error))
defer rpcSrv.coreServer.Shutdown() defer rpcSrv.coreServer.Shutdown()
@ -261,7 +261,7 @@ func TestFilteredSubscriptions(t *testing.T) {
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
// It's used as an end-of-event-stream, so it's always present. // It's used as an end-of-event-stream, so it's always present.
blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`) blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`)
@ -353,7 +353,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
go rpcSrv.coreServer.Start(make(chan error, 1)) go rpcSrv.coreServer.Start(make(chan error, 1))
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
// blocks are needed to make GAS deposit for priv0 // blocks are needed to make GAS deposit for priv0
blocks := getTestBlocks(t) blocks := getTestBlocks(t)
@ -395,7 +395,7 @@ func TestFilteredBlockSubscriptions(t *testing.T) {
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`) blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`)
@ -433,7 +433,7 @@ func TestMaxSubscriptions(t *testing.T) {
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
for i := 0; i < maxFeeds+1; i++ { for i := 0; i < maxFeeds+1; i++ {
var s string var s string
@ -479,7 +479,7 @@ func TestBadSubUnsub(t *testing.T) {
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
testF := func(t *testing.T, cases map[string]string) func(t *testing.T) { testF := func(t *testing.T, cases map[string]string) func(t *testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
@ -513,7 +513,7 @@ func doSomeWSRequest(t *testing.T, ws *websocket.Conn) {
func TestWSClientsLimit(t *testing.T) { func TestWSClientsLimit(t *testing.T) {
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
dialer := websocket.Dialer{HandshakeTimeout: time.Second} dialer := websocket.Dialer{HandshakeTimeout: time.Second}
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
@ -552,7 +552,7 @@ func TestSubscriptionOverflow(t *testing.T) {
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer rpcSrv.Shutdown()
resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs) resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs)
require.Nil(t, resp.Error) require.Nil(t, resp.Error)

View file

@ -152,6 +152,11 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu
}, nil }, nil
} }
// Name returns service name.
func (n *Notary) Name() string {
return "notary"
}
// Start runs Notary module in a separate goroutine. // Start runs Notary module in a separate goroutine.
func (n *Notary) Start() { func (n *Notary) Start() {
n.Config.Log.Info("starting notary service") n.Config.Log.Info("starting notary service")

View file

@ -174,6 +174,11 @@ func NewOracle(cfg Config) (*Oracle, error) {
return o, nil return o, nil
} }
// Name returns service name.
func (o *Oracle) Name() string {
return "oracle"
}
// Shutdown shutdowns Oracle. // Shutdown shutdowns Oracle.
func (o *Oracle) Shutdown() { func (o *Oracle) Shutdown() {
close(o.close) close(o.close)

View file

@ -28,6 +28,7 @@ type (
// Service represents state root service. // Service represents state root service.
Service interface { Service interface {
Name() string
OnPayload(p *payload.Extensible) error OnPayload(p *payload.Extensible) error
AddSignature(height uint32, validatorIndex int32, sig []byte) error AddSignature(height uint32, validatorIndex int32, sig []byte) error
GetConfig() config.StateRoot GetConfig() config.StateRoot

View file

@ -17,6 +17,11 @@ const (
firstVoteResendDelay = 3 * time.Second firstVoteResendDelay = 3 * time.Second
) )
// Name returns service name.
func (s *service) Name() string {
return "stateroot"
}
// Start runs service instance in a separate goroutine. // Start runs service instance in a separate goroutine.
func (s *service) Start() { func (s *service) Start() {
s.log.Info("starting state validation service") s.log.Info("starting state validation service")