forked from TrueCloudLab/neoneo-go
rpc/server: make Server conform network.Service interface
With Start() and Shutdown() taking no parameters and returning no values.
This commit is contained in:
parent
4eee2f930e
commit
a10b1ad32d
7 changed files with 59 additions and 63 deletions
|
@ -151,9 +151,9 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
|
netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
|
||||||
go netSrv.Start(make(chan error, 1))
|
go netSrv.Start(make(chan error, 1))
|
||||||
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger)
|
|
||||||
errCh := make(chan error, 2)
|
errCh := make(chan error, 2)
|
||||||
rpcServer.Start(errCh)
|
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh)
|
||||||
|
rpcServer.Start()
|
||||||
|
|
||||||
return chain, &rpcServer, netSrv
|
return chain, &rpcServer, netSrv
|
||||||
}
|
}
|
||||||
|
@ -187,7 +187,7 @@ func newExecutorWithConfig(t *testing.T, needChain, runChain bool, f func(*confi
|
||||||
func (e *executor) Close(t *testing.T) {
|
func (e *executor) Close(t *testing.T) {
|
||||||
input.Terminal = nil
|
input.Terminal = nil
|
||||||
if e.RPC != nil {
|
if e.RPC != nil {
|
||||||
require.NoError(t, e.RPC.Shutdown())
|
e.RPC.Shutdown()
|
||||||
}
|
}
|
||||||
if e.NetSrv != nil {
|
if e.NetSrv != nil {
|
||||||
e.NetSrv.Shutdown()
|
e.NetSrv.Shutdown()
|
||||||
|
|
|
@ -504,11 +504,11 @@ func startServer(ctx *cli.Context) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cli.NewExitError(err, 1)
|
return cli.NewExitError(err, 1)
|
||||||
}
|
}
|
||||||
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log)
|
|
||||||
errChan := make(chan error)
|
errChan := make(chan error)
|
||||||
|
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan)
|
||||||
|
|
||||||
go serv.Start(errChan)
|
go serv.Start(errChan)
|
||||||
rpcServer.Start(errChan)
|
rpcServer.Start()
|
||||||
|
|
||||||
sighupCh := make(chan os.Signal, 1)
|
sighupCh := make(chan os.Signal, 1)
|
||||||
signal.Notify(sighupCh, syscall.SIGHUP)
|
signal.Notify(sighupCh, syscall.SIGHUP)
|
||||||
|
@ -528,20 +528,14 @@ Main:
|
||||||
switch sig {
|
switch sig {
|
||||||
case syscall.SIGHUP:
|
case syscall.SIGHUP:
|
||||||
log.Info("SIGHUP received, restarting rpc-server")
|
log.Info("SIGHUP received, restarting rpc-server")
|
||||||
serverErr := rpcServer.Shutdown()
|
rpcServer.Shutdown()
|
||||||
if serverErr != nil {
|
rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan)
|
||||||
errChan <- fmt.Errorf("error while restarting rpc-server: %w", serverErr)
|
rpcServer.Start()
|
||||||
break
|
|
||||||
}
|
|
||||||
rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log)
|
|
||||||
rpcServer.Start(errChan)
|
|
||||||
}
|
}
|
||||||
case <-grace.Done():
|
case <-grace.Done():
|
||||||
signal.Stop(sighupCh)
|
signal.Stop(sighupCh)
|
||||||
serv.Shutdown()
|
serv.Shutdown()
|
||||||
if serverErr := rpcServer.Shutdown(); serverErr != nil {
|
rpcServer.Shutdown()
|
||||||
shutdownErr = fmt.Errorf("error on shutdown: %w", serverErr)
|
|
||||||
}
|
|
||||||
break Main
|
break Main
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ import (
|
||||||
func TestClient_NEP17(t *testing.T) {
|
func TestClient_NEP17(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -75,7 +75,7 @@ func TestClient_NEP17(t *testing.T) {
|
||||||
func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) {
|
func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
const extraFee = 10
|
const extraFee = 10
|
||||||
var nonce uint32
|
var nonce uint32
|
||||||
|
|
||||||
|
@ -324,7 +324,7 @@ func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) {
|
||||||
func TestCalculateNetworkFee(t *testing.T) {
|
func TestCalculateNetworkFee(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
const extraFee = 10
|
const extraFee = 10
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
|
@ -394,7 +394,7 @@ func TestCalculateNetworkFee(t *testing.T) {
|
||||||
func TestSignAndPushInvocationTx(t *testing.T) {
|
func TestSignAndPushInvocationTx(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -551,7 +551,7 @@ func TestSignAndPushInvocationTx(t *testing.T) {
|
||||||
func TestSignAndPushP2PNotaryRequest(t *testing.T) {
|
func TestSignAndPushP2PNotaryRequest(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -642,7 +642,7 @@ func TestSignAndPushP2PNotaryRequest(t *testing.T) {
|
||||||
func TestCalculateNotaryFee(t *testing.T) {
|
func TestCalculateNotaryFee(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -662,7 +662,7 @@ func TestPing(t *testing.T) {
|
||||||
require.NoError(t, c.Init())
|
require.NoError(t, c.Init())
|
||||||
|
|
||||||
require.NoError(t, c.Ping())
|
require.NoError(t, c.Ping())
|
||||||
require.NoError(t, rpcSrv.Shutdown())
|
rpcSrv.Shutdown()
|
||||||
httpSrv.Close()
|
httpSrv.Close()
|
||||||
require.Error(t, c.Ping())
|
require.Error(t, c.Ping())
|
||||||
}
|
}
|
||||||
|
@ -670,7 +670,7 @@ func TestPing(t *testing.T) {
|
||||||
func TestCreateTxFromScript(t *testing.T) {
|
func TestCreateTxFromScript(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -699,7 +699,7 @@ func TestCreateTxFromScript(t *testing.T) {
|
||||||
func TestCreateNEP17TransferTx(t *testing.T) {
|
func TestCreateNEP17TransferTx(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -749,7 +749,7 @@ func TestCreateNEP17TransferTx(t *testing.T) {
|
||||||
func TestInvokeVerify(t *testing.T) {
|
func TestInvokeVerify(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -791,7 +791,7 @@ func TestInvokeVerify(t *testing.T) {
|
||||||
func TestClient_GetNativeContracts(t *testing.T) {
|
func TestClient_GetNativeContracts(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -805,7 +805,7 @@ func TestClient_GetNativeContracts(t *testing.T) {
|
||||||
func TestClient_NEP11_ND(t *testing.T) {
|
func TestClient_NEP11_ND(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -871,7 +871,7 @@ func TestClient_NEP11_ND(t *testing.T) {
|
||||||
func TestClient_NEP11_D(t *testing.T) {
|
func TestClient_NEP11_D(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -938,7 +938,7 @@ func TestClient_NEP11_D(t *testing.T) {
|
||||||
func TestClient_NNS(t *testing.T) {
|
func TestClient_NNS(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -987,7 +987,7 @@ func TestClient_NNS(t *testing.T) {
|
||||||
func TestClient_GetNotaryServiceFeePerKey(t *testing.T) {
|
func TestClient_GetNotaryServiceFeePerKey(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -1002,7 +1002,7 @@ func TestClient_GetNotaryServiceFeePerKey(t *testing.T) {
|
||||||
func TestClient_GetOraclePrice(t *testing.T) {
|
func TestClient_GetOraclePrice(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
c, err := client.New(context.Background(), httpSrv.URL, client.Options{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -68,6 +68,7 @@ type (
|
||||||
https *http.Server
|
https *http.Server
|
||||||
shutdown chan struct{}
|
shutdown chan struct{}
|
||||||
started *atomic.Bool
|
started *atomic.Bool
|
||||||
|
errChan chan error
|
||||||
|
|
||||||
subsLock sync.RWMutex
|
subsLock sync.RWMutex
|
||||||
subscribers map[*subscriber]bool
|
subscribers map[*subscriber]bool
|
||||||
|
@ -164,7 +165,7 @@ var upgrader = websocket.Upgrader{}
|
||||||
|
|
||||||
// New creates a new Server struct.
|
// New creates a new Server struct.
|
||||||
func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server,
|
func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server,
|
||||||
orc *oracle.Oracle, log *zap.Logger) Server {
|
orc *oracle.Oracle, log *zap.Logger, errChan chan error) Server {
|
||||||
httpServer := &http.Server{
|
httpServer := &http.Server{
|
||||||
Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10),
|
Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10),
|
||||||
}
|
}
|
||||||
|
@ -191,6 +192,7 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S
|
||||||
https: tlsServer,
|
https: tlsServer,
|
||||||
shutdown: make(chan struct{}),
|
shutdown: make(chan struct{}),
|
||||||
started: atomic.NewBool(false),
|
started: atomic.NewBool(false),
|
||||||
|
errChan: errChan,
|
||||||
|
|
||||||
subscribers: make(map[*subscriber]bool),
|
subscribers: make(map[*subscriber]bool),
|
||||||
// These are NOT buffered to preserve original order of events.
|
// These are NOT buffered to preserve original order of events.
|
||||||
|
@ -202,10 +204,9 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start creates a new JSON-RPC server listening on the configured port. It's
|
// Start creates a new JSON-RPC server listening on the configured port. It creates
|
||||||
// supposed to be run as a separate goroutine (like http.Server's Serve) and it
|
// goroutines needed internally and it returns its errors via errChan passed to New().
|
||||||
// returns its errors via given errChan.
|
func (s *Server) Start() {
|
||||||
func (s *Server) Start(errChan chan error) {
|
|
||||||
if !s.config.Enabled {
|
if !s.config.Enabled {
|
||||||
s.log.Info("RPC server is not enabled")
|
s.log.Info("RPC server is not enabled")
|
||||||
return
|
return
|
||||||
|
@ -224,20 +225,20 @@ func (s *Server) Start(errChan chan error) {
|
||||||
go func() {
|
go func() {
|
||||||
ln, err := net.Listen("tcp", s.https.Addr)
|
ln, err := net.Listen("tcp", s.https.Addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
s.errChan <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.https.Addr = ln.Addr().String()
|
s.https.Addr = ln.Addr().String()
|
||||||
err = s.https.ServeTLS(ln, cfg.CertFile, cfg.KeyFile)
|
err = s.https.ServeTLS(ln, cfg.CertFile, cfg.KeyFile)
|
||||||
if err != http.ErrServerClosed {
|
if err != http.ErrServerClosed {
|
||||||
s.log.Error("failed to start TLS RPC server", zap.Error(err))
|
s.log.Error("failed to start TLS RPC server", zap.Error(err))
|
||||||
errChan <- err
|
s.errChan <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
ln, err := net.Listen("tcp", s.Addr)
|
ln, err := net.Listen("tcp", s.Addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
s.errChan <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.Addr = ln.Addr().String() // set Addr to the actual address
|
s.Addr = ln.Addr().String() // set Addr to the actual address
|
||||||
|
@ -245,34 +246,35 @@ func (s *Server) Start(errChan chan error) {
|
||||||
err = s.Serve(ln)
|
err = s.Serve(ln)
|
||||||
if err != http.ErrServerClosed {
|
if err != http.ErrServerClosed {
|
||||||
s.log.Error("failed to start RPC server", zap.Error(err))
|
s.log.Error("failed to start RPC server", zap.Error(err))
|
||||||
errChan <- err
|
s.errChan <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown overrides the http.Server Shutdown
|
// Shutdown stops the RPC server. It can only be called once.
|
||||||
// method.
|
func (s *Server) Shutdown() {
|
||||||
func (s *Server) Shutdown() error {
|
|
||||||
var httpsErr error
|
var httpsErr error
|
||||||
|
|
||||||
// Signal to websocket writer routines and handleSubEvents.
|
// Signal to websocket writer routines and handleSubEvents.
|
||||||
close(s.shutdown)
|
close(s.shutdown)
|
||||||
|
|
||||||
if s.config.TLSConfig.Enabled {
|
if s.config.TLSConfig.Enabled {
|
||||||
s.log.Info("shutting down rpc-server (https)", zap.String("endpoint", s.https.Addr))
|
s.log.Info("shutting down RPC server (https)", zap.String("endpoint", s.https.Addr))
|
||||||
httpsErr = s.https.Shutdown(context.Background())
|
httpsErr = s.https.Shutdown(context.Background())
|
||||||
}
|
}
|
||||||
|
|
||||||
s.log.Info("shutting down rpc-server", zap.String("endpoint", s.Addr))
|
s.log.Info("shutting down RPC server", zap.String("endpoint", s.Addr))
|
||||||
err := s.Server.Shutdown(context.Background())
|
err := s.Server.Shutdown(context.Background())
|
||||||
|
|
||||||
// Wait for handleSubEvents to finish.
|
// Wait for handleSubEvents to finish.
|
||||||
<-s.executionCh
|
<-s.executionCh
|
||||||
|
|
||||||
if err == nil {
|
if httpsErr != nil {
|
||||||
return httpsErr
|
s.errChan <- httpsErr
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
s.errChan <- err
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) {
|
func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) {
|
||||||
|
|
|
@ -104,9 +104,9 @@ func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool)
|
||||||
serverConfig.Port = 0
|
serverConfig.Port = 0
|
||||||
server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger)
|
server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger)
|
|
||||||
errCh := make(chan error, 2)
|
errCh := make(chan error, 2)
|
||||||
rpcServer.Start(errCh)
|
rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger, errCh)
|
||||||
|
rpcServer.Start()
|
||||||
|
|
||||||
handler := http.HandlerFunc(rpcServer.handleHTTPRequest)
|
handler := http.HandlerFunc(rpcServer.handleHTTPRequest)
|
||||||
srv := httptest.NewServer(handler)
|
srv := httptest.NewServer(handler)
|
||||||
|
|
|
@ -1304,7 +1304,7 @@ func TestRPC(t *testing.T) {
|
||||||
func TestSubmitOracle(t *testing.T) {
|
func TestSubmitOracle(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initClearServerWithServices(t, true, false)
|
chain, rpcSrv, httpSrv := initClearServerWithServices(t, true, false)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitoracleresponse", "params": %s}`
|
rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitoracleresponse", "params": %s}`
|
||||||
runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) {
|
runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) {
|
||||||
|
@ -1340,7 +1340,7 @@ func TestSubmitNotaryRequest(t *testing.T) {
|
||||||
t.Run("disabled P2PSigExtensions", func(t *testing.T) {
|
t.Run("disabled P2PSigExtensions", func(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false)
|
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
req := fmt.Sprintf(rpc, "[]")
|
req := fmt.Sprintf(rpc, "[]")
|
||||||
body := doRPCCallOverHTTP(req, httpSrv.URL, t)
|
body := doRPCCallOverHTTP(req, httpSrv.URL, t)
|
||||||
checkErrGetResult(t, body, true)
|
checkErrGetResult(t, body, true)
|
||||||
|
@ -1348,7 +1348,7 @@ func TestSubmitNotaryRequest(t *testing.T) {
|
||||||
|
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) {
|
runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
|
@ -1459,7 +1459,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) []
|
||||||
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
|
||||||
|
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
e := &executor{chain: chain, httpSrv: httpSrv}
|
e := &executor{chain: chain, httpSrv: httpSrv}
|
||||||
t.Run("single request", func(t *testing.T) {
|
t.Run("single request", func(t *testing.T) {
|
||||||
|
@ -2608,7 +2608,7 @@ func BenchmarkHandleIn(b *testing.B) {
|
||||||
serverConfig.LogLevel = zapcore.FatalLevel
|
serverConfig.LogLevel = zapcore.FatalLevel
|
||||||
server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger)
|
server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger)
|
rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger, make(chan error))
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
|
|
||||||
do := func(b *testing.B, req []byte) {
|
do := func(b *testing.B, req []byte) {
|
||||||
|
|
|
@ -94,7 +94,7 @@ func TestSubscriptions(t *testing.T) {
|
||||||
|
|
||||||
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
go rpcSrv.coreServer.Start(make(chan error))
|
go rpcSrv.coreServer.Start(make(chan error))
|
||||||
defer rpcSrv.coreServer.Shutdown()
|
defer rpcSrv.coreServer.Shutdown()
|
||||||
|
@ -261,7 +261,7 @@ func TestFilteredSubscriptions(t *testing.T) {
|
||||||
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
// It's used as an end-of-event-stream, so it's always present.
|
// It's used as an end-of-event-stream, so it's always present.
|
||||||
blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`)
|
blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`)
|
||||||
|
@ -353,7 +353,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
|
||||||
go rpcSrv.coreServer.Start(make(chan error, 1))
|
go rpcSrv.coreServer.Start(make(chan error, 1))
|
||||||
|
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
// blocks are needed to make GAS deposit for priv0
|
// blocks are needed to make GAS deposit for priv0
|
||||||
blocks := getTestBlocks(t)
|
blocks := getTestBlocks(t)
|
||||||
|
@ -395,7 +395,7 @@ func TestFilteredBlockSubscriptions(t *testing.T) {
|
||||||
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`)
|
blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`)
|
||||||
|
|
||||||
|
@ -433,7 +433,7 @@ func TestMaxSubscriptions(t *testing.T) {
|
||||||
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
for i := 0; i < maxFeeds+1; i++ {
|
for i := 0; i < maxFeeds+1; i++ {
|
||||||
var s string
|
var s string
|
||||||
|
@ -479,7 +479,7 @@ func TestBadSubUnsub(t *testing.T) {
|
||||||
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
testF := func(t *testing.T, cases map[string]string) func(t *testing.T) {
|
testF := func(t *testing.T, cases map[string]string) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
|
@ -513,7 +513,7 @@ func doSomeWSRequest(t *testing.T, ws *websocket.Conn) {
|
||||||
func TestWSClientsLimit(t *testing.T) {
|
func TestWSClientsLimit(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
dialer := websocket.Dialer{HandshakeTimeout: time.Second}
|
dialer := websocket.Dialer{HandshakeTimeout: time.Second}
|
||||||
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
||||||
|
@ -552,7 +552,7 @@ func TestSubscriptionOverflow(t *testing.T) {
|
||||||
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer func() { _ = rpcSrv.Shutdown() }()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs)
|
resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs)
|
||||||
require.Nil(t, resp.Error)
|
require.Nil(t, resp.Error)
|
||||||
|
|
Loading…
Reference in a new issue