rpc: restrict the amount of concurrently running iterator sessions

This commit is contained in:
Anna Shaleva 2022-07-08 15:42:27 +03:00
parent 8f73ce08c8
commit 445cca114a
4 changed files with 30 additions and 7 deletions

View file

@ -139,6 +139,7 @@ RPC:
SessionEnabled: false SessionEnabled: false
SessionExpirationTime: 15 SessionExpirationTime: 15
SessionBackedByMPT: false SessionBackedByMPT: false
SessionPoolSize: 20
StartWhenSynchronized: false StartWhenSynchronized: false
TLSConfig: TLSConfig:
Address: "" Address: ""
@ -190,6 +191,10 @@ where:
recommended to enable `SessionBackedByMPT` needlessly. `SessionBackedByMPT` is recommended to enable `SessionBackedByMPT` needlessly. `SessionBackedByMPT` is
set to `false` by default and is relevant only if `SessionEnabled` is set to set to `false` by default and is relevant only if `SessionEnabled` is set to
`true`. `true`.
- `SessionPoolSize` is the maximum number of concurrent iterator sessions. It is
set to `20` by default. If the subsequent session can't be added to the session
pool, then invocation result will contain corresponding error inside the
`FaultException` field.
- `StartWhenSynchronized` controls when RPC server will be started, by default - `StartWhenSynchronized` controls when RPC server will be started, by default
(`false` setting) it's started immediately and RPC is availabe during node (`false` setting) it's started immediately and RPC is availabe during node
synchronization. Setting it to `true` will make the node start RPC service only synchronization. Setting it to `true` will make the node start RPC service only

View file

@ -32,7 +32,7 @@ type Invoke struct {
} }
// RegisterIterator is a callback used to register new iterator on the server side. // RegisterIterator is a callback used to register new iterator on the server side.
type RegisterIterator func(sessionID string, item stackitem.Item, id int, finalize func()) uuid.UUID type RegisterIterator func(sessionID string, item stackitem.Item, id int, finalize func()) (uuid.UUID, error)
// InvokeDiag is an additional diagnostic data for invocation. // InvokeDiag is an additional diagnostic data for invocation.
type InvokeDiag struct { type InvokeDiag struct {
@ -136,7 +136,12 @@ arrloop:
if sessionID == "" { if sessionID == "" {
sessionID = uuid.NewString() sessionID = uuid.NewString()
} }
iteratorID := r.registerIterator(sessionID, r.Stack[i], i, r.finalize) iteratorID, err := r.registerIterator(sessionID, r.Stack[i], i, r.finalize)
if err != nil {
// Call finalizer immediately, there can't be race between server and marshaller because session wasn't added to server's session pool.
r.Finalize()
return nil, fmt.Errorf("failed to register iterator session: %w", err)
}
data, err = json.Marshal(iteratorAux{ data, err = json.Marshal(iteratorAux{
Type: stackitem.InteropT.String(), Type: stackitem.InteropT.String(),
Interface: iteratorInterfaceName, Interface: iteratorInterfaceName,

View file

@ -20,6 +20,7 @@ type (
SessionEnabled bool `yaml:"SessionEnabled"` SessionEnabled bool `yaml:"SessionEnabled"`
SessionExpirationTime int `yaml:"SessionExpirationTime"` SessionExpirationTime int `yaml:"SessionExpirationTime"`
SessionBackedByMPT bool `yaml:"SessionBackedByMPT"` SessionBackedByMPT bool `yaml:"SessionBackedByMPT"`
SessionPoolSize int `yaml:"SessionPoolSize"`
StartWhenSynchronized bool `yaml:"StartWhenSynchronized"` StartWhenSynchronized bool `yaml:"StartWhenSynchronized"`
TLSConfig TLSConfig `yaml:"TLSConfig"` TLSConfig TLSConfig `yaml:"TLSConfig"`
} }

View file

@ -143,6 +143,9 @@ const (
// Maximum number of elements for get*transfers requests. // Maximum number of elements for get*transfers requests.
maxTransfersLimit = 1000 maxTransfersLimit = 1000
// defaultSessionPoolSize is the number of concurrently running iterator sessions.
defaultSessionPoolSize = 20
) )
var rpcHandlers = map[string]func(*Server, request.Params) (interface{}, *response.Error){ var rpcHandlers = map[string]func(*Server, request.Params) (interface{}, *response.Error){
@ -225,10 +228,16 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S
orc.SetBroadcaster(broadcaster.New(orc.MainCfg, log)) orc.SetBroadcaster(broadcaster.New(orc.MainCfg, log))
} }
protoCfg := chain.GetConfig() protoCfg := chain.GetConfig()
if conf.SessionEnabled && conf.SessionExpirationTime <= 0 { if conf.SessionEnabled {
if conf.SessionExpirationTime <= 0 {
conf.SessionExpirationTime = protoCfg.SecondsPerBlock conf.SessionExpirationTime = protoCfg.SecondsPerBlock
log.Info("SessionExpirationTime is not set or wrong, setting default value", zap.Int("SessionExpirationTime", protoCfg.SecondsPerBlock)) log.Info("SessionExpirationTime is not set or wrong, setting default value", zap.Int("SessionExpirationTime", protoCfg.SecondsPerBlock))
} }
if conf.SessionPoolSize <= 0 {
conf.SessionPoolSize = defaultSessionPoolSize
log.Info("SessionPoolSize is not set or wrong, setting default value", zap.Int("SessionPoolSize", defaultSessionPoolSize))
}
}
return Server{ return Server{
Server: httpServer, Server: httpServer,
chain: chain, chain: chain,
@ -1998,11 +2007,14 @@ func (s *Server) runScriptInVM(t trigger.Type, script []byte, contractScriptHash
} }
var registerIterator result.RegisterIterator var registerIterator result.RegisterIterator
if s.config.SessionEnabled { if s.config.SessionEnabled {
registerIterator = func(sessionID string, item stackitem.Item, stackIndex int, finalize func()) uuid.UUID { registerIterator = func(sessionID string, item stackitem.Item, stackIndex int, finalize func()) (uuid.UUID, error) {
iterID := uuid.New() iterID := uuid.New()
s.sessionsLock.Lock() s.sessionsLock.Lock()
sess, ok := s.sessions[sessionID] sess, ok := s.sessions[sessionID]
if !ok { if !ok {
if len(s.sessions) >= s.config.SessionPoolSize {
return uuid.UUID{}, errors.New("max capacity reached")
}
timer := time.AfterFunc(time.Second*time.Duration(s.config.SessionExpirationTime), func() { timer := time.AfterFunc(time.Second*time.Duration(s.config.SessionExpirationTime), func() {
s.sessionsLock.Lock() s.sessionsLock.Lock()
defer s.sessionsLock.Unlock() defer s.sessionsLock.Unlock()
@ -2047,7 +2059,7 @@ func (s *Server) runScriptInVM(t trigger.Type, script []byte, contractScriptHash
}) })
s.sessions[sessionID] = sess s.sessions[sessionID] = sess
s.sessionsLock.Unlock() s.sessionsLock.Unlock()
return iterID return iterID, nil
} }
} }
return result.NewInvoke(ic, script, faultException, registerIterator, s.config.MaxIteratorResultItems), nil return result.NewInvoke(ic, script, faultException, registerIterator, s.config.MaxIteratorResultItems), nil