diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 10a8e9db9..1ff7fc01f 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -139,6 +139,7 @@ RPC: SessionEnabled: false SessionExpirationTime: 15 SessionBackedByMPT: false + SessionPoolSize: 20 StartWhenSynchronized: false TLSConfig: Address: "" @@ -190,6 +191,10 @@ where: recommended to enable `SessionBackedByMPT` needlessly. `SessionBackedByMPT` is set to `false` by default and is relevant only if `SessionEnabled` is set to `true`. +- `SessionPoolSize` is the maximum number of concurrent iterator sessions. It is + set to `20` by default. If the subsequent session can't be added to the session + pool, then invocation result will contain corresponding error inside the + `FaultException` field. - `StartWhenSynchronized` controls when RPC server will be started, by default (`false` setting) it's started immediately and RPC is availabe during node synchronization. Setting it to `true` will make the node start RPC service only diff --git a/pkg/rpc/response/result/invoke.go b/pkg/rpc/response/result/invoke.go index 438ba240e..65100693f 100644 --- a/pkg/rpc/response/result/invoke.go +++ b/pkg/rpc/response/result/invoke.go @@ -32,7 +32,7 @@ type Invoke struct { } // RegisterIterator is a callback used to register new iterator on the server side. -type RegisterIterator func(sessionID string, item stackitem.Item, id int, finalize func()) uuid.UUID +type RegisterIterator func(sessionID string, item stackitem.Item, id int, finalize func()) (uuid.UUID, error) // InvokeDiag is an additional diagnostic data for invocation. type InvokeDiag struct { @@ -136,7 +136,12 @@ arrloop: if sessionID == "" { sessionID = uuid.NewString() } - iteratorID := r.registerIterator(sessionID, r.Stack[i], i, r.finalize) + iteratorID, err := r.registerIterator(sessionID, r.Stack[i], i, r.finalize) + if err != nil { + // Call finalizer immediately, there can't be race between server and marshaller because session wasn't added to server's session pool. + r.Finalize() + return nil, fmt.Errorf("failed to register iterator session: %w", err) + } data, err = json.Marshal(iteratorAux{ Type: stackitem.InteropT.String(), Interface: iteratorInterfaceName, diff --git a/pkg/rpc/rpc_config.go b/pkg/rpc/rpc_config.go index d1ba9a78b..1e3a3f1d0 100644 --- a/pkg/rpc/rpc_config.go +++ b/pkg/rpc/rpc_config.go @@ -20,6 +20,7 @@ type ( SessionEnabled bool `yaml:"SessionEnabled"` SessionExpirationTime int `yaml:"SessionExpirationTime"` SessionBackedByMPT bool `yaml:"SessionBackedByMPT"` + SessionPoolSize int `yaml:"SessionPoolSize"` StartWhenSynchronized bool `yaml:"StartWhenSynchronized"` TLSConfig TLSConfig `yaml:"TLSConfig"` } diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index fd8d5e3c8..39ed8a909 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -143,6 +143,9 @@ const ( // Maximum number of elements for get*transfers requests. maxTransfersLimit = 1000 + + // defaultSessionPoolSize is the number of concurrently running iterator sessions. + defaultSessionPoolSize = 20 ) var rpcHandlers = map[string]func(*Server, request.Params) (interface{}, *response.Error){ @@ -225,9 +228,15 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S orc.SetBroadcaster(broadcaster.New(orc.MainCfg, log)) } protoCfg := chain.GetConfig() - if conf.SessionEnabled && conf.SessionExpirationTime <= 0 { - conf.SessionExpirationTime = protoCfg.SecondsPerBlock - log.Info("SessionExpirationTime is not set or wrong, setting default value", zap.Int("SessionExpirationTime", protoCfg.SecondsPerBlock)) + if conf.SessionEnabled { + if conf.SessionExpirationTime <= 0 { + conf.SessionExpirationTime = protoCfg.SecondsPerBlock + log.Info("SessionExpirationTime is not set or wrong, setting default value", zap.Int("SessionExpirationTime", protoCfg.SecondsPerBlock)) + } + if conf.SessionPoolSize <= 0 { + conf.SessionPoolSize = defaultSessionPoolSize + log.Info("SessionPoolSize is not set or wrong, setting default value", zap.Int("SessionPoolSize", defaultSessionPoolSize)) + } } return Server{ Server: httpServer, @@ -1998,11 +2007,14 @@ func (s *Server) runScriptInVM(t trigger.Type, script []byte, contractScriptHash } var registerIterator result.RegisterIterator if s.config.SessionEnabled { - registerIterator = func(sessionID string, item stackitem.Item, stackIndex int, finalize func()) uuid.UUID { + registerIterator = func(sessionID string, item stackitem.Item, stackIndex int, finalize func()) (uuid.UUID, error) { iterID := uuid.New() s.sessionsLock.Lock() sess, ok := s.sessions[sessionID] if !ok { + if len(s.sessions) >= s.config.SessionPoolSize { + return uuid.UUID{}, errors.New("max capacity reached") + } timer := time.AfterFunc(time.Second*time.Duration(s.config.SessionExpirationTime), func() { s.sessionsLock.Lock() defer s.sessionsLock.Unlock() @@ -2047,7 +2059,7 @@ func (s *Server) runScriptInVM(t trigger.Type, script []byte, contractScriptHash }) s.sessions[sessionID] = sess s.sessionsLock.Unlock() - return iterID + return iterID, nil } } return result.NewInvoke(ic, script, faultException, registerIterator, s.config.MaxIteratorResultItems), nil