diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 6f437fa30..188f4b121 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -251,6 +251,7 @@ RPC: MaxRequestBodyBytes: 5242880 MaxRequestHeaderBytes: 1048576 MaxWebSocketClients: 64 + MaxWebSocketFeeds: 16 SessionEnabled: false SessionExpirationTime: 15 SessionBackedByMPT: false @@ -296,6 +297,9 @@ where: number (64 by default). Attempts to establish additional connections will lead to websocket handshake failures. Use "-1" to disable websocket connections (0 will lead to using the default value). +- `MaxWebSocketFeeds` -- the maximum simultaneous event subscriptions number + for a single client (16 by default). Attemps to create additional subscriptions + will lead to error. - `SessionEnabled` denotes whether session-based iterator JSON-RPC API is enabled. If true, then all iterators got from `invoke*` calls will be stored as sessions on the server side available for further traverse. `traverseiterator` and diff --git a/docs/notifications.md b/docs/notifications.md index 4f3187965..f71e42fcc 100644 --- a/docs/notifications.md +++ b/docs/notifications.md @@ -58,6 +58,9 @@ method. Upon successful subscription, clients receive subscription ID for subsequent management of this subscription. Subscription is only valid for connection lifetime, no long-term client identification is being made. +The maximum number of simultaneous subscriptions can be set server-side +via `MaxWebSocketFeeds` setting. + Errors are not described down below, but they can be returned as standard JSON-RPC errors (most often caused by invalid parameters). diff --git a/pkg/config/rpc_config.go b/pkg/config/rpc_config.go index c9b0a1630..5187f5847 100644 --- a/pkg/config/rpc_config.go +++ b/pkg/config/rpc_config.go @@ -19,6 +19,7 @@ type ( MaxRequestBodyBytes int `yaml:"MaxRequestBodyBytes"` MaxRequestHeaderBytes int `yaml:"MaxRequestHeaderBytes"` MaxWebSocketClients int `yaml:"MaxWebSocketClients"` + MaxWebSocketFeeds int `yaml:"MaxWebSocketFeeds"` SessionEnabled bool `yaml:"SessionEnabled"` SessionExpirationTime int `yaml:"SessionExpirationTime"` SessionBackedByMPT bool `yaml:"SessionBackedByMPT"` diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index b27502e0a..2f5f3325b 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -311,6 +311,10 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, conf.MaxWebSocketClients = defaultMaxWebSocketClients log.Info("MaxWebSocketClients is not set or wrong, setting default value", zap.Int("MaxWebSocketClients", defaultMaxWebSocketClients)) } + if conf.MaxWebSocketFeeds == 0 { + conf.MaxWebSocketFeeds = defaultMaxFeeds + log.Info("MaxWebSocketFeeds is not set or wrong, setting default value", zap.Int("MaxWebSocketFeeds", defaultMaxFeeds)) + } var oracleWrapped = new(atomic.Value) if orc != nil { oracleWrapped.Store(orc) @@ -522,7 +526,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ } resChan := make(chan abstractResult) // response.abstract or response.abstractBatch subChan := make(chan intEvent, notificationBufSize) - subscr := &subscriber{writer: subChan} + subscr := &subscriber{writer: subChan, feeds: make([]feed, s.config.MaxWebSocketFeeds)} s.subsLock.Lock() s.subscribers[subscr] = true s.subsLock.Unlock() @@ -560,7 +564,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ // RegisterLocal performs local client registration. func (s *Server) RegisterLocal(ctx context.Context, events chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) { subChan := make(chan intEvent, notificationBufSize) - subscr := &subscriber{writer: subChan} + subscr := &subscriber{writer: subChan, feeds: make([]feed, s.config.MaxWebSocketFeeds)} s.subsLock.Lock() s.subscribers[subscr] = true s.subsLock.Unlock() diff --git a/pkg/services/rpcsrv/subscription.go b/pkg/services/rpcsrv/subscription.go index 9e3b05893..96d4969c2 100644 --- a/pkg/services/rpcsrv/subscription.go +++ b/pkg/services/rpcsrv/subscription.go @@ -23,7 +23,7 @@ type ( // cheaper doing it this way rather than creating a map), // pointing to an EventID is an obvious overkill at the moment, but // that's not for long. - feeds [maxFeeds]feed + feeds []feed } // feed stores subscriber's desired event ID with filter. feed struct { @@ -43,8 +43,8 @@ func (f feed) Filter() neorpc.SubscriptionFilter { } const ( - // Maximum number of subscriptions per one client. - maxFeeds = 16 + // The default maximum number of subscriptions per one client. + defaultMaxFeeds = 16 // This sets notification messages buffer depth. It may seem to be quite // big, but there is a big gap in speed between internal event processing diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index 57cd79942..613727e2a 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "net/http/httptest" "strings" "sync" "testing" @@ -76,7 +77,11 @@ func getNotification(t *testing.T, respCh <-chan []byte) *neorpc.Notification { func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) { chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) + ws, respMsgs := initWSClient(t, httpSrv, rpcSrv, startNetworkServer...) + return chain, rpcSrv, ws, respMsgs +} +func initWSClient(t *testing.T, httpSrv *httptest.Server, rpcSrv *Server, startNetworkServer ...bool) (*websocket.Conn, chan []byte) { dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" ws, r, err := dialer.Dial(url, nil) @@ -108,7 +113,7 @@ func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core rpcSrv.coreServer.Shutdown() } }) - return chain, rpcSrv, ws, respMsgs + return ws, respMsgs } func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string { @@ -577,9 +582,10 @@ func TestHeaderOfAddedBlockSubscriptions(t *testing.T) { callUnsubscribe(t, c, respMsgs, headerSubID) } -func TestMaxSubscriptions(t *testing.T) { +func testMaxSubscriptions(t *testing.T, f func(*config.Config), maxFeeds int) { var subIDs = make([]string, 0) - _, _, c, respMsgs := initCleanServerAndWSClient(t) + _, rpcSrv, httpSrv := initClearServerWithCustomConfig(t, f) + c, respMsgs := initWSClient(t, httpSrv, rpcSrv) for i := range maxFeeds + 1 { var s string @@ -600,6 +606,17 @@ func TestMaxSubscriptions(t *testing.T) { } } +func TestMaxSubscriptions(t *testing.T) { + t.Run("default", func(t *testing.T) { + testMaxSubscriptions(t, nil, defaultMaxFeeds) + }) + t.Run("maxfeeds=x2", func(t *testing.T) { + testMaxSubscriptions(t, func(c *config.Config) { + c.ApplicationConfiguration.RPC.MaxWebSocketFeeds = defaultMaxFeeds * 2 + }, defaultMaxFeeds*2) + }) +} + func TestBadSubUnsub(t *testing.T) { var subCases = map[string]string{ "no params": `{"jsonrpc": "2.0", "method": "subscribe", "params": [], "id": 1}`,