Merge pull request #3828 from fyfyrchik/update-feeds

rpcsrv: allow to configure subscription limit
This commit is contained in:
Anna Shaleva 2025-03-07 16:42:35 +03:00 committed by GitHub
commit 7d72b6538a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 37 additions and 8 deletions

View file

@ -251,6 +251,7 @@ RPC:
MaxRequestBodyBytes: 5242880 MaxRequestBodyBytes: 5242880
MaxRequestHeaderBytes: 1048576 MaxRequestHeaderBytes: 1048576
MaxWebSocketClients: 64 MaxWebSocketClients: 64
MaxWebSocketFeeds: 16
SessionEnabled: false SessionEnabled: false
SessionExpirationTime: 15 SessionExpirationTime: 15
SessionBackedByMPT: false SessionBackedByMPT: false
@ -296,6 +297,9 @@ where:
number (64 by default). Attempts to establish additional connections will number (64 by default). Attempts to establish additional connections will
lead to websocket handshake failures. Use "-1" to disable websocket lead to websocket handshake failures. Use "-1" to disable websocket
connections (0 will lead to using the default value). connections (0 will lead to using the default value).
- `MaxWebSocketFeeds` -- the maximum simultaneous event subscriptions number
for a single client (16 by default). Attemps to create additional subscriptions
will lead to error.
- `SessionEnabled` denotes whether session-based iterator JSON-RPC API is enabled. - `SessionEnabled` denotes whether session-based iterator JSON-RPC API is enabled.
If true, then all iterators got from `invoke*` calls will be stored as sessions If true, then all iterators got from `invoke*` calls will be stored as sessions
on the server side available for further traverse. `traverseiterator` and on the server side available for further traverse. `traverseiterator` and

View file

@ -58,6 +58,9 @@ method. Upon successful subscription, clients receive subscription ID for
subsequent management of this subscription. Subscription is only valid for subsequent management of this subscription. Subscription is only valid for
connection lifetime, no long-term client identification is being made. connection lifetime, no long-term client identification is being made.
The maximum number of simultaneous subscriptions can be set server-side
via `MaxWebSocketFeeds` setting.
Errors are not described down below, but they can be returned as standard Errors are not described down below, but they can be returned as standard
JSON-RPC errors (most often caused by invalid parameters). JSON-RPC errors (most often caused by invalid parameters).

View file

@ -19,6 +19,7 @@ type (
MaxRequestBodyBytes int `yaml:"MaxRequestBodyBytes"` MaxRequestBodyBytes int `yaml:"MaxRequestBodyBytes"`
MaxRequestHeaderBytes int `yaml:"MaxRequestHeaderBytes"` MaxRequestHeaderBytes int `yaml:"MaxRequestHeaderBytes"`
MaxWebSocketClients int `yaml:"MaxWebSocketClients"` MaxWebSocketClients int `yaml:"MaxWebSocketClients"`
MaxWebSocketFeeds int `yaml:"MaxWebSocketFeeds"`
SessionEnabled bool `yaml:"SessionEnabled"` SessionEnabled bool `yaml:"SessionEnabled"`
SessionExpirationTime int `yaml:"SessionExpirationTime"` SessionExpirationTime int `yaml:"SessionExpirationTime"`
SessionBackedByMPT bool `yaml:"SessionBackedByMPT"` SessionBackedByMPT bool `yaml:"SessionBackedByMPT"`

View file

@ -311,6 +311,10 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
conf.MaxWebSocketClients = defaultMaxWebSocketClients conf.MaxWebSocketClients = defaultMaxWebSocketClients
log.Info("MaxWebSocketClients is not set or wrong, setting default value", zap.Int("MaxWebSocketClients", defaultMaxWebSocketClients)) log.Info("MaxWebSocketClients is not set or wrong, setting default value", zap.Int("MaxWebSocketClients", defaultMaxWebSocketClients))
} }
if conf.MaxWebSocketFeeds == 0 {
conf.MaxWebSocketFeeds = defaultMaxFeeds
log.Info("MaxWebSocketFeeds is not set or wrong, setting default value", zap.Int("MaxWebSocketFeeds", defaultMaxFeeds))
}
var oracleWrapped = new(atomic.Value) var oracleWrapped = new(atomic.Value)
if orc != nil { if orc != nil {
oracleWrapped.Store(orc) oracleWrapped.Store(orc)
@ -522,7 +526,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
} }
resChan := make(chan abstractResult) // response.abstract or response.abstractBatch resChan := make(chan abstractResult) // response.abstract or response.abstractBatch
subChan := make(chan intEvent, notificationBufSize) subChan := make(chan intEvent, notificationBufSize)
subscr := &subscriber{writer: subChan} subscr := &subscriber{writer: subChan, feeds: make([]feed, s.config.MaxWebSocketFeeds)}
s.subsLock.Lock() s.subsLock.Lock()
s.subscribers[subscr] = true s.subscribers[subscr] = true
s.subsLock.Unlock() s.subsLock.Unlock()
@ -560,7 +564,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
// RegisterLocal performs local client registration. // RegisterLocal performs local client registration.
func (s *Server) RegisterLocal(ctx context.Context, events chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) { func (s *Server) RegisterLocal(ctx context.Context, events chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) {
subChan := make(chan intEvent, notificationBufSize) subChan := make(chan intEvent, notificationBufSize)
subscr := &subscriber{writer: subChan} subscr := &subscriber{writer: subChan, feeds: make([]feed, s.config.MaxWebSocketFeeds)}
s.subsLock.Lock() s.subsLock.Lock()
s.subscribers[subscr] = true s.subscribers[subscr] = true
s.subsLock.Unlock() s.subsLock.Unlock()

View file

@ -23,7 +23,7 @@ type (
// cheaper doing it this way rather than creating a map), // cheaper doing it this way rather than creating a map),
// pointing to an EventID is an obvious overkill at the moment, but // pointing to an EventID is an obvious overkill at the moment, but
// that's not for long. // that's not for long.
feeds [maxFeeds]feed feeds []feed
} }
// feed stores subscriber's desired event ID with filter. // feed stores subscriber's desired event ID with filter.
feed struct { feed struct {
@ -43,8 +43,8 @@ func (f feed) Filter() neorpc.SubscriptionFilter {
} }
const ( const (
// Maximum number of subscriptions per one client. // The default maximum number of subscriptions per one client.
maxFeeds = 16 defaultMaxFeeds = 16
// This sets notification messages buffer depth. It may seem to be quite // This sets notification messages buffer depth. It may seem to be quite
// big, but there is a big gap in speed between internal event processing // big, but there is a big gap in speed between internal event processing

View file

@ -4,6 +4,7 @@ import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http/httptest"
"strings" "strings"
"sync" "sync"
"testing" "testing"
@ -76,7 +77,11 @@ func getNotification(t *testing.T, respCh <-chan []byte) *neorpc.Notification {
func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) { func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) {
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
ws, respMsgs := initWSClient(t, httpSrv, rpcSrv, startNetworkServer...)
return chain, rpcSrv, ws, respMsgs
}
func initWSClient(t *testing.T, httpSrv *httptest.Server, rpcSrv *Server, startNetworkServer ...bool) (*websocket.Conn, chan []byte) {
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
ws, r, err := dialer.Dial(url, nil) ws, r, err := dialer.Dial(url, nil)
@ -108,7 +113,7 @@ func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core
rpcSrv.coreServer.Shutdown() rpcSrv.coreServer.Shutdown()
} }
}) })
return chain, rpcSrv, ws, respMsgs return ws, respMsgs
} }
func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string { func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string {
@ -577,9 +582,10 @@ func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
callUnsubscribe(t, c, respMsgs, headerSubID) callUnsubscribe(t, c, respMsgs, headerSubID)
} }
func TestMaxSubscriptions(t *testing.T) { func testMaxSubscriptions(t *testing.T, f func(*config.Config), maxFeeds int) {
var subIDs = make([]string, 0) var subIDs = make([]string, 0)
_, _, c, respMsgs := initCleanServerAndWSClient(t) _, rpcSrv, httpSrv := initClearServerWithCustomConfig(t, f)
c, respMsgs := initWSClient(t, httpSrv, rpcSrv)
for i := range maxFeeds + 1 { for i := range maxFeeds + 1 {
var s string var s string
@ -600,6 +606,17 @@ func TestMaxSubscriptions(t *testing.T) {
} }
} }
func TestMaxSubscriptions(t *testing.T) {
t.Run("default", func(t *testing.T) {
testMaxSubscriptions(t, nil, defaultMaxFeeds)
})
t.Run("maxfeeds=x2", func(t *testing.T) {
testMaxSubscriptions(t, func(c *config.Config) {
c.ApplicationConfiguration.RPC.MaxWebSocketFeeds = defaultMaxFeeds * 2
}, defaultMaxFeeds*2)
})
}
func TestBadSubUnsub(t *testing.T) { func TestBadSubUnsub(t *testing.T) {
var subCases = map[string]string{ var subCases = map[string]string{
"no params": `{"jsonrpc": "2.0", "method": "subscribe", "params": [], "id": 1}`, "no params": `{"jsonrpc": "2.0", "method": "subscribe", "params": [], "id": 1}`,