Merge pull request #2821 from nspcc-dev/improve-wsclient-handshake-error
Improve wsclient handshake error
This commit is contained in:
commit
3ac76c319d
7 changed files with 80 additions and 30 deletions
|
@ -142,6 +142,7 @@ RPC:
|
||||||
MaxIteratorResultItems: 100
|
MaxIteratorResultItems: 100
|
||||||
MaxFindResultItems: 100
|
MaxFindResultItems: 100
|
||||||
MaxNEP11Tokens: 100
|
MaxNEP11Tokens: 100
|
||||||
|
MaxWebSocketClients: 64
|
||||||
Port: 10332
|
Port: 10332
|
||||||
SessionEnabled: false
|
SessionEnabled: false
|
||||||
SessionExpirationTime: 15
|
SessionExpirationTime: 15
|
||||||
|
@ -176,6 +177,10 @@ where:
|
||||||
- `MaxFindResultItems` - the maximum number of elements for `findstates` response.
|
- `MaxFindResultItems` - the maximum number of elements for `findstates` response.
|
||||||
- `MaxNEP11Tokens` - limit for the number of tokens returned from
|
- `MaxNEP11Tokens` - limit for the number of tokens returned from
|
||||||
`getnep11balances` call.
|
`getnep11balances` call.
|
||||||
|
- `MaxWebSocketClients` - the maximum simultaneous websocket client connection
|
||||||
|
number (64 by default). Attempts to establish additional connections will
|
||||||
|
lead to websocket handshake failures. Use "-1" to disable websocket
|
||||||
|
connections (0 will lead to using the default value).
|
||||||
- `Port` is an RPC server port it should be bound to.
|
- `Port` is an RPC server port it should be bound to.
|
||||||
- `SessionEnabled` denotes whether session-based iterator JSON-RPC API is enabled.
|
- `SessionEnabled` denotes whether session-based iterator JSON-RPC API is enabled.
|
||||||
If true, then all iterators got from `invoke*` calls will be stored as sessions
|
If true, then all iterators got from `invoke*` calls will be stored as sessions
|
||||||
|
|
|
@ -16,6 +16,7 @@ type (
|
||||||
MaxIteratorResultItems int `yaml:"MaxIteratorResultItems"`
|
MaxIteratorResultItems int `yaml:"MaxIteratorResultItems"`
|
||||||
MaxFindResultItems int `yaml:"MaxFindResultItems"`
|
MaxFindResultItems int `yaml:"MaxFindResultItems"`
|
||||||
MaxNEP11Tokens int `yaml:"MaxNEP11Tokens"`
|
MaxNEP11Tokens int `yaml:"MaxNEP11Tokens"`
|
||||||
|
MaxWebSocketClients int `yaml:"MaxWebSocketClients"`
|
||||||
Port uint16 `yaml:"Port"`
|
Port uint16 `yaml:"Port"`
|
||||||
SessionEnabled bool `yaml:"SessionEnabled"`
|
SessionEnabled bool `yaml:"SessionEnabled"`
|
||||||
SessionExpirationTime int `yaml:"SessionExpirationTime"`
|
SessionExpirationTime int `yaml:"SessionExpirationTime"`
|
||||||
|
|
|
@ -365,6 +365,15 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error
|
||||||
defer resp.Body.Close() // Not exactly required by websocket, but let's do this for bodyclose checker.
|
defer resp.Body.Close() // Not exactly required by websocket, but let's do this for bodyclose checker.
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if resp != nil && resp.Body != nil {
|
||||||
|
var srvErr neorpc.HeaderAndError
|
||||||
|
|
||||||
|
dec := json.NewDecoder(resp.Body)
|
||||||
|
decErr := dec.Decode(&srvErr)
|
||||||
|
if decErr == nil && srvErr.Error != nil {
|
||||||
|
err = srvErr.Error
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
wsc := &WSClient{
|
wsc := &WSClient{
|
||||||
|
|
|
@ -2145,6 +2145,18 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) {
|
||||||
require.Equal(t, vmstate.Halt, aer.VMState)
|
require.Equal(t, vmstate.Halt, aer.VMState)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWSClientHandshakeError(t *testing.T) {
|
||||||
|
chain, rpcSrv, httpSrv := initClearServerWithCustomConfig(t, func(cfg *config.Config) {
|
||||||
|
cfg.ApplicationConfiguration.RPC.MaxWebSocketClients = -1
|
||||||
|
})
|
||||||
|
defer chain.Close()
|
||||||
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
|
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
||||||
|
_, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{})
|
||||||
|
require.ErrorContains(t, err, "websocket users limit reached")
|
||||||
|
}
|
||||||
|
|
||||||
func TestWSClient_WaitWithMissedEvent(t *testing.T) {
|
func TestWSClient_WaitWithMissedEvent(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
|
chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true)
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
|
|
|
@ -183,10 +183,8 @@ const (
|
||||||
// Write deadline.
|
// Write deadline.
|
||||||
wsWriteLimit = wsPingPeriod / 2
|
wsWriteLimit = wsPingPeriod / 2
|
||||||
|
|
||||||
// Maximum number of subscribers per Server. Each websocket client is
|
// Default maximum number of websocket clients per Server.
|
||||||
// treated like a subscriber, so technically it's a limit on websocket
|
defaultMaxWebSocketClients = 64
|
||||||
// connections.
|
|
||||||
maxSubscribers = 64
|
|
||||||
|
|
||||||
// Maximum number of elements for get*transfers requests.
|
// Maximum number of elements for get*transfers requests.
|
||||||
maxTransfersLimit = 1000
|
maxTransfersLimit = 1000
|
||||||
|
@ -278,6 +276,10 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
|
||||||
log.Info("SessionPoolSize is not set or wrong, setting default value", zap.Int("SessionPoolSize", defaultSessionPoolSize))
|
log.Info("SessionPoolSize is not set or wrong, setting default value", zap.Int("SessionPoolSize", defaultSessionPoolSize))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if conf.MaxWebSocketClients == 0 {
|
||||||
|
conf.MaxWebSocketClients = defaultMaxWebSocketClients
|
||||||
|
log.Info("MaxWebSocketClients is not set or wrong, setting default value", zap.Int("MaxWebSocketClients", defaultMaxWebSocketClients))
|
||||||
|
}
|
||||||
var oracleWrapped = new(atomic.Value)
|
var oracleWrapped = new(atomic.Value)
|
||||||
if orc != nil {
|
if orc != nil {
|
||||||
oracleWrapped.Store(&orc)
|
oracleWrapped.Store(&orc)
|
||||||
|
@ -428,7 +430,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
||||||
s.subsLock.RLock()
|
s.subsLock.RLock()
|
||||||
numOfSubs := len(s.subscribers)
|
numOfSubs := len(s.subscribers)
|
||||||
s.subsLock.RUnlock()
|
s.subsLock.RUnlock()
|
||||||
if numOfSubs >= maxSubscribers {
|
if numOfSubs >= s.config.MaxWebSocketClients {
|
||||||
s.writeHTTPErrorResponse(
|
s.writeHTTPErrorResponse(
|
||||||
params.NewIn(),
|
params.NewIn(),
|
||||||
w,
|
w,
|
||||||
|
|
|
@ -112,7 +112,10 @@ func getTestBlocks(t *testing.T) []*block.Block {
|
||||||
|
|
||||||
func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool, disableIteratorsSessions bool) (*core.Blockchain, *Server, *httptest.Server) {
|
func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool, disableIteratorsSessions bool) (*core.Blockchain, *Server, *httptest.Server) {
|
||||||
chain, orc, cfg, logger := getUnitTestChain(t, needOracle, needNotary, disableIteratorsSessions)
|
chain, orc, cfg, logger := getUnitTestChain(t, needOracle, needNotary, disableIteratorsSessions)
|
||||||
|
return wrapUnitTestChain(t, chain, orc, cfg, logger)
|
||||||
|
}
|
||||||
|
|
||||||
|
func wrapUnitTestChain(t testing.TB, chain *core.Blockchain, orc *oracle.Oracle, cfg config.Config, logger *zap.Logger) (*core.Blockchain, *Server, *httptest.Server) {
|
||||||
serverConfig := network.NewServerConfig(cfg)
|
serverConfig := network.NewServerConfig(cfg)
|
||||||
serverConfig.UserAgent = fmt.Sprintf(config.UserAgentFormat, "0.98.6-test")
|
serverConfig.UserAgent = fmt.Sprintf(config.UserAgentFormat, "0.98.6-test")
|
||||||
serverConfig.Port = 0
|
serverConfig.Port = 0
|
||||||
|
@ -128,6 +131,11 @@ func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool,
|
||||||
return chain, &rpcServer, srv
|
return chain, &rpcServer, srv
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func initClearServerWithCustomConfig(t testing.TB, ccfg func(configuration *config.Config)) (*core.Blockchain, *Server, *httptest.Server) {
|
||||||
|
chain, orc, cfg, logger := getUnitTestChainWithCustomConfig(t, false, false, ccfg)
|
||||||
|
return wrapUnitTestChain(t, chain, orc, cfg, logger)
|
||||||
|
}
|
||||||
|
|
||||||
func initClearServerWithInMemoryChain(t testing.TB) (*core.Blockchain, *Server, *httptest.Server) {
|
func initClearServerWithInMemoryChain(t testing.TB) (*core.Blockchain, *Server, *httptest.Server) {
|
||||||
return initClearServerWithServices(t, false, false, false)
|
return initClearServerWithServices(t, false, false, false)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/nspcc-dev/neo-go/internal/testchain"
|
"github.com/nspcc-dev/neo-go/internal/testchain"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core"
|
"github.com/nspcc-dev/neo-go/pkg/core"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
|
@ -534,20 +535,30 @@ func doSomeWSRequest(t *testing.T, ws *websocket.Conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWSClientsLimit(t *testing.T) {
|
func TestWSClientsLimit(t *testing.T) {
|
||||||
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
|
for tname, limit := range map[string]int{"default": 0, "8": 8, "disabled": -1} {
|
||||||
|
effectiveClients := limit
|
||||||
|
if limit == 0 {
|
||||||
|
effectiveClients = defaultMaxWebSocketClients
|
||||||
|
} else if limit < 0 {
|
||||||
|
effectiveClients = 0
|
||||||
|
}
|
||||||
|
t.Run(tname, func(t *testing.T) {
|
||||||
|
chain, rpcSrv, httpSrv := initClearServerWithCustomConfig(t, func(cfg *config.Config) {
|
||||||
|
cfg.ApplicationConfiguration.RPC.MaxWebSocketClients = limit
|
||||||
|
})
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer rpcSrv.Shutdown()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
dialer := websocket.Dialer{HandshakeTimeout: time.Second}
|
dialer := websocket.Dialer{HandshakeTimeout: time.Second}
|
||||||
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
||||||
wss := make([]*websocket.Conn, maxSubscribers)
|
wss := make([]*websocket.Conn, effectiveClients)
|
||||||
|
|
||||||
for i := 0; i < len(wss)+1; i++ {
|
for i := 0; i < len(wss)+1; i++ {
|
||||||
ws, r, err := dialer.Dial(url, nil)
|
ws, r, err := dialer.Dial(url, nil)
|
||||||
if r != nil && r.Body != nil {
|
if r != nil && r.Body != nil {
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
}
|
}
|
||||||
if i < maxSubscribers {
|
if i < effectiveClients {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
wss[i] = ws
|
wss[i] = ws
|
||||||
// Check that it's completely ready.
|
// Check that it's completely ready.
|
||||||
|
@ -561,6 +572,8 @@ func TestWSClientsLimit(t *testing.T) {
|
||||||
for i := 0; i < len(wss); i++ {
|
for i := 0; i < len(wss); i++ {
|
||||||
doSomeWSRequest(t, wss[i])
|
doSomeWSRequest(t, wss[i])
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// The purpose of this test is to overflow buffers on server side to
|
// The purpose of this test is to overflow buffers on server side to
|
||||||
|
|
Loading…
Reference in a new issue