rpcsrv: make websocket client limit configurable

This commit is contained in:
Roman Khimov 2022-11-23 12:19:49 +03:00
parent fb09670fd1
commit 2591c39500
5 changed files with 59 additions and 30 deletions

View file

@ -142,6 +142,7 @@ RPC:
MaxIteratorResultItems: 100 MaxIteratorResultItems: 100
MaxFindResultItems: 100 MaxFindResultItems: 100
MaxNEP11Tokens: 100 MaxNEP11Tokens: 100
MaxWebSocketClients: 64
Port: 10332 Port: 10332
SessionEnabled: false SessionEnabled: false
SessionExpirationTime: 15 SessionExpirationTime: 15
@ -176,6 +177,10 @@ where:
- `MaxFindResultItems` - the maximum number of elements for `findstates` response. - `MaxFindResultItems` - the maximum number of elements for `findstates` response.
- `MaxNEP11Tokens` - limit for the number of tokens returned from - `MaxNEP11Tokens` - limit for the number of tokens returned from
`getnep11balances` call. `getnep11balances` call.
- `MaxWebSocketClients` - the maximum simultaneous websocket client connection
number (64 by default). Attempts to establish additional connections will
lead to websocket handshake failures. Use "-1" to disable websocket
connections (0 will lead to using the default value).
- `Port` is an RPC server port it should be bound to. - `Port` is an RPC server port it should be bound to.
- `SessionEnabled` denotes whether session-based iterator JSON-RPC API is enabled. - `SessionEnabled` denotes whether session-based iterator JSON-RPC API is enabled.
If true, then all iterators got from `invoke*` calls will be stored as sessions If true, then all iterators got from `invoke*` calls will be stored as sessions

View file

@ -16,6 +16,7 @@ type (
MaxIteratorResultItems int `yaml:"MaxIteratorResultItems"` MaxIteratorResultItems int `yaml:"MaxIteratorResultItems"`
MaxFindResultItems int `yaml:"MaxFindResultItems"` MaxFindResultItems int `yaml:"MaxFindResultItems"`
MaxNEP11Tokens int `yaml:"MaxNEP11Tokens"` MaxNEP11Tokens int `yaml:"MaxNEP11Tokens"`
MaxWebSocketClients int `yaml:"MaxWebSocketClients"`
Port uint16 `yaml:"Port"` Port uint16 `yaml:"Port"`
SessionEnabled bool `yaml:"SessionEnabled"` SessionEnabled bool `yaml:"SessionEnabled"`
SessionExpirationTime int `yaml:"SessionExpirationTime"` SessionExpirationTime int `yaml:"SessionExpirationTime"`

View file

@ -183,10 +183,8 @@ const (
// Write deadline. // Write deadline.
wsWriteLimit = wsPingPeriod / 2 wsWriteLimit = wsPingPeriod / 2
// Maximum number of subscribers per Server. Each websocket client is // Default maximum number of websocket clients per Server.
// treated like a subscriber, so technically it's a limit on websocket defaultMaxWebSocketClients = 64
// connections.
maxSubscribers = 64
// Maximum number of elements for get*transfers requests. // Maximum number of elements for get*transfers requests.
maxTransfersLimit = 1000 maxTransfersLimit = 1000
@ -278,6 +276,10 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
log.Info("SessionPoolSize is not set or wrong, setting default value", zap.Int("SessionPoolSize", defaultSessionPoolSize)) log.Info("SessionPoolSize is not set or wrong, setting default value", zap.Int("SessionPoolSize", defaultSessionPoolSize))
} }
} }
if conf.MaxWebSocketClients == 0 {
conf.MaxWebSocketClients = defaultMaxWebSocketClients
log.Info("MaxWebSocketClients is not set or wrong, setting default value", zap.Int("MaxWebSocketClients", defaultMaxWebSocketClients))
}
var oracleWrapped = new(atomic.Value) var oracleWrapped = new(atomic.Value)
if orc != nil { if orc != nil {
oracleWrapped.Store(&orc) oracleWrapped.Store(&orc)
@ -428,7 +430,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
s.subsLock.RLock() s.subsLock.RLock()
numOfSubs := len(s.subscribers) numOfSubs := len(s.subscribers)
s.subsLock.RUnlock() s.subsLock.RUnlock()
if numOfSubs >= maxSubscribers { if numOfSubs >= s.config.MaxWebSocketClients {
s.writeHTTPErrorResponse( s.writeHTTPErrorResponse(
params.NewIn(), params.NewIn(),
w, w,

View file

@ -112,7 +112,10 @@ func getTestBlocks(t *testing.T) []*block.Block {
func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool, disableIteratorsSessions bool) (*core.Blockchain, *Server, *httptest.Server) { func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool, disableIteratorsSessions bool) (*core.Blockchain, *Server, *httptest.Server) {
chain, orc, cfg, logger := getUnitTestChain(t, needOracle, needNotary, disableIteratorsSessions) chain, orc, cfg, logger := getUnitTestChain(t, needOracle, needNotary, disableIteratorsSessions)
return wrapUnitTestChain(t, chain, orc, cfg, logger)
}
func wrapUnitTestChain(t testing.TB, chain *core.Blockchain, orc *oracle.Oracle, cfg config.Config, logger *zap.Logger) (*core.Blockchain, *Server, *httptest.Server) {
serverConfig := network.NewServerConfig(cfg) serverConfig := network.NewServerConfig(cfg)
serverConfig.UserAgent = fmt.Sprintf(config.UserAgentFormat, "0.98.6-test") serverConfig.UserAgent = fmt.Sprintf(config.UserAgentFormat, "0.98.6-test")
serverConfig.Port = 0 serverConfig.Port = 0
@ -128,6 +131,11 @@ func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool,
return chain, &rpcServer, srv return chain, &rpcServer, srv
} }
func initClearServerWithCustomConfig(t testing.TB, ccfg func(configuration *config.Config)) (*core.Blockchain, *Server, *httptest.Server) {
chain, orc, cfg, logger := getUnitTestChainWithCustomConfig(t, false, false, ccfg)
return wrapUnitTestChain(t, chain, orc, cfg, logger)
}
func initClearServerWithInMemoryChain(t testing.TB) (*core.Blockchain, *Server, *httptest.Server) { func initClearServerWithInMemoryChain(t testing.TB) (*core.Blockchain, *Server, *httptest.Server) {
return initClearServerWithServices(t, false, false, false) return initClearServerWithServices(t, false, false, false)
} }

View file

@ -9,6 +9,7 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/internal/testchain" "github.com/nspcc-dev/neo-go/internal/testchain"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc"
@ -534,32 +535,44 @@ func doSomeWSRequest(t *testing.T, ws *websocket.Conn) {
} }
func TestWSClientsLimit(t *testing.T) { func TestWSClientsLimit(t *testing.T) {
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) for tname, limit := range map[string]int{"default": 0, "8": 8, "disabled": -1} {
defer chain.Close() effectiveClients := limit
defer rpcSrv.Shutdown() if limit == 0 {
effectiveClients = defaultMaxWebSocketClients
dialer := websocket.Dialer{HandshakeTimeout: time.Second} } else if limit < 0 {
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" effectiveClients = 0
wss := make([]*websocket.Conn, maxSubscribers)
for i := 0; i < len(wss)+1; i++ {
ws, r, err := dialer.Dial(url, nil)
if r != nil && r.Body != nil {
defer r.Body.Close()
} }
if i < maxSubscribers { t.Run(tname, func(t *testing.T) {
require.NoError(t, err) chain, rpcSrv, httpSrv := initClearServerWithCustomConfig(t, func(cfg *config.Config) {
wss[i] = ws cfg.ApplicationConfiguration.RPC.MaxWebSocketClients = limit
// Check that it's completely ready. })
doSomeWSRequest(t, ws) defer chain.Close()
} else { defer rpcSrv.Shutdown()
require.Error(t, err)
} dialer := websocket.Dialer{HandshakeTimeout: time.Second}
} url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
// Check connections are still alive (it actually is necessary to add wss := make([]*websocket.Conn, effectiveClients)
// some use of wss to keep connections alive).
for i := 0; i < len(wss); i++ { for i := 0; i < len(wss)+1; i++ {
doSomeWSRequest(t, wss[i]) ws, r, err := dialer.Dial(url, nil)
if r != nil && r.Body != nil {
defer r.Body.Close()
}
if i < effectiveClients {
require.NoError(t, err)
wss[i] = ws
// Check that it's completely ready.
doSomeWSRequest(t, ws)
} else {
require.Error(t, err)
}
}
// Check connections are still alive (it actually is necessary to add
// some use of wss to keep connections alive).
for i := 0; i < len(wss); i++ {
doSomeWSRequest(t, wss[i])
}
})
} }
} }