From 2591c39500e1a34550be7c29c6a980d54bad9478 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 23 Nov 2022 12:19:49 +0300 Subject: [PATCH] rpcsrv: make websocket client limit configurable --- docs/node-configuration.md | 5 ++ pkg/config/rpc_config.go | 1 + pkg/services/rpcsrv/server.go | 12 +++-- pkg/services/rpcsrv/server_helper_test.go | 8 +++ pkg/services/rpcsrv/subscription_test.go | 63 ++++++++++++++--------- 5 files changed, 59 insertions(+), 30 deletions(-) diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 24a4b8d6a..19fb01325 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -142,6 +142,7 @@ RPC: MaxIteratorResultItems: 100 MaxFindResultItems: 100 MaxNEP11Tokens: 100 + MaxWebSocketClients: 64 Port: 10332 SessionEnabled: false SessionExpirationTime: 15 @@ -176,6 +177,10 @@ where: - `MaxFindResultItems` - the maximum number of elements for `findstates` response. - `MaxNEP11Tokens` - limit for the number of tokens returned from `getnep11balances` call. +- `MaxWebSocketClients` - the maximum simultaneous websocket client connection + number (64 by default). Attempts to establish additional connections will + lead to websocket handshake failures. Use "-1" to disable websocket + connections (0 will lead to using the default value). - `Port` is an RPC server port it should be bound to. - `SessionEnabled` denotes whether session-based iterator JSON-RPC API is enabled. If true, then all iterators got from `invoke*` calls will be stored as sessions diff --git a/pkg/config/rpc_config.go b/pkg/config/rpc_config.go index 09cb3bc25..4411a7fee 100644 --- a/pkg/config/rpc_config.go +++ b/pkg/config/rpc_config.go @@ -16,6 +16,7 @@ type ( MaxIteratorResultItems int `yaml:"MaxIteratorResultItems"` MaxFindResultItems int `yaml:"MaxFindResultItems"` MaxNEP11Tokens int `yaml:"MaxNEP11Tokens"` + MaxWebSocketClients int `yaml:"MaxWebSocketClients"` Port uint16 `yaml:"Port"` SessionEnabled bool `yaml:"SessionEnabled"` SessionExpirationTime int `yaml:"SessionExpirationTime"` diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 2befe85d3..349c2676e 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -183,10 +183,8 @@ const ( // Write deadline. wsWriteLimit = wsPingPeriod / 2 - // Maximum number of subscribers per Server. Each websocket client is - // treated like a subscriber, so technically it's a limit on websocket - // connections. - maxSubscribers = 64 + // Default maximum number of websocket clients per Server. + defaultMaxWebSocketClients = 64 // Maximum number of elements for get*transfers requests. maxTransfersLimit = 1000 @@ -278,6 +276,10 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, log.Info("SessionPoolSize is not set or wrong, setting default value", zap.Int("SessionPoolSize", defaultSessionPoolSize)) } } + if conf.MaxWebSocketClients == 0 { + conf.MaxWebSocketClients = defaultMaxWebSocketClients + log.Info("MaxWebSocketClients is not set or wrong, setting default value", zap.Int("MaxWebSocketClients", defaultMaxWebSocketClients)) + } var oracleWrapped = new(atomic.Value) if orc != nil { oracleWrapped.Store(&orc) @@ -428,7 +430,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ s.subsLock.RLock() numOfSubs := len(s.subscribers) s.subsLock.RUnlock() - if numOfSubs >= maxSubscribers { + if numOfSubs >= s.config.MaxWebSocketClients { s.writeHTTPErrorResponse( params.NewIn(), w, diff --git a/pkg/services/rpcsrv/server_helper_test.go b/pkg/services/rpcsrv/server_helper_test.go index 7dbad237e..6ebd192c3 100644 --- a/pkg/services/rpcsrv/server_helper_test.go +++ b/pkg/services/rpcsrv/server_helper_test.go @@ -112,7 +112,10 @@ func getTestBlocks(t *testing.T) []*block.Block { func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool, disableIteratorsSessions bool) (*core.Blockchain, *Server, *httptest.Server) { chain, orc, cfg, logger := getUnitTestChain(t, needOracle, needNotary, disableIteratorsSessions) + return wrapUnitTestChain(t, chain, orc, cfg, logger) +} +func wrapUnitTestChain(t testing.TB, chain *core.Blockchain, orc *oracle.Oracle, cfg config.Config, logger *zap.Logger) (*core.Blockchain, *Server, *httptest.Server) { serverConfig := network.NewServerConfig(cfg) serverConfig.UserAgent = fmt.Sprintf(config.UserAgentFormat, "0.98.6-test") serverConfig.Port = 0 @@ -128,6 +131,11 @@ func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool, return chain, &rpcServer, srv } +func initClearServerWithCustomConfig(t testing.TB, ccfg func(configuration *config.Config)) (*core.Blockchain, *Server, *httptest.Server) { + chain, orc, cfg, logger := getUnitTestChainWithCustomConfig(t, false, false, ccfg) + return wrapUnitTestChain(t, chain, orc, cfg, logger) +} + func initClearServerWithInMemoryChain(t testing.TB) (*core.Blockchain, *Server, *httptest.Server) { return initClearServerWithServices(t, false, false, false) } diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index 60496feba..752ed03fa 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -9,6 +9,7 @@ import ( "github.com/gorilla/websocket" "github.com/nspcc-dev/neo-go/internal/testchain" + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/neorpc" @@ -534,32 +535,44 @@ func doSomeWSRequest(t *testing.T, ws *websocket.Conn) { } func TestWSClientsLimit(t *testing.T) { - chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) - defer chain.Close() - defer rpcSrv.Shutdown() - - dialer := websocket.Dialer{HandshakeTimeout: time.Second} - url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" - wss := make([]*websocket.Conn, maxSubscribers) - - for i := 0; i < len(wss)+1; i++ { - ws, r, err := dialer.Dial(url, nil) - if r != nil && r.Body != nil { - defer r.Body.Close() + for tname, limit := range map[string]int{"default": 0, "8": 8, "disabled": -1} { + effectiveClients := limit + if limit == 0 { + effectiveClients = defaultMaxWebSocketClients + } else if limit < 0 { + effectiveClients = 0 } - if i < maxSubscribers { - require.NoError(t, err) - wss[i] = ws - // Check that it's completely ready. - doSomeWSRequest(t, ws) - } else { - require.Error(t, err) - } - } - // Check connections are still alive (it actually is necessary to add - // some use of wss to keep connections alive). - for i := 0; i < len(wss); i++ { - doSomeWSRequest(t, wss[i]) + t.Run(tname, func(t *testing.T) { + chain, rpcSrv, httpSrv := initClearServerWithCustomConfig(t, func(cfg *config.Config) { + cfg.ApplicationConfiguration.RPC.MaxWebSocketClients = limit + }) + defer chain.Close() + defer rpcSrv.Shutdown() + + dialer := websocket.Dialer{HandshakeTimeout: time.Second} + url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" + wss := make([]*websocket.Conn, effectiveClients) + + for i := 0; i < len(wss)+1; i++ { + ws, r, err := dialer.Dial(url, nil) + if r != nil && r.Body != nil { + defer r.Body.Close() + } + if i < effectiveClients { + require.NoError(t, err) + wss[i] = ws + // Check that it's completely ready. + doSomeWSRequest(t, ws) + } else { + require.Error(t, err) + } + } + // Check connections are still alive (it actually is necessary to add + // some use of wss to keep connections alive). + for i := 0; i < len(wss); i++ { + doSomeWSRequest(t, wss[i]) + } + }) } }