rpcsrv: provide RegisterLocal for local clients
This allows to call RPC server more effectively in the same process (bypassing TCP/HTTP). Refs. #2909.
This commit is contained in:
parent
e496084bee
commit
a55a01d456
3 changed files with 107 additions and 13 deletions
|
@ -2233,18 +2233,19 @@ func TestWSClient_WaitWithMissedEvent(t *testing.T) {
|
|||
// Accept the next block, but subscriber will get no events because it's overflown.
|
||||
require.NoError(t, chain.AddBlock(b1))
|
||||
|
||||
overEvent, err := json.Marshal(neorpc.Notification{
|
||||
overNotification := neorpc.Notification{
|
||||
JSONRPC: neorpc.JSONRPCVersion,
|
||||
Event: neorpc.MissedEventID,
|
||||
Payload: make([]interface{}, 0),
|
||||
})
|
||||
}
|
||||
overEvent, err := json.Marshal(overNotification)
|
||||
require.NoError(t, err)
|
||||
overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, overEvent)
|
||||
require.NoError(t, err)
|
||||
rpcSrv.subsLock.Lock()
|
||||
// Deliver overflow message -> triggers subscriber to retry with polling waiter.
|
||||
for s := range rpcSrv.subscribers {
|
||||
s.writer <- overflowMsg
|
||||
s.writer <- intEvent{overflowMsg, &overNotification}
|
||||
}
|
||||
rpcSrv.subsLock.Unlock()
|
||||
|
||||
|
|
|
@ -469,7 +469,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
|||
return
|
||||
}
|
||||
resChan := make(chan abstractResult) // response.abstract or response.abstractBatch
|
||||
subChan := make(chan *websocket.PreparedMessage, notificationBufSize)
|
||||
subChan := make(chan intEvent, notificationBufSize)
|
||||
subscr := &subscriber{writer: subChan, ws: ws}
|
||||
s.subsLock.Lock()
|
||||
s.subscribers[subscr] = true
|
||||
|
@ -505,6 +505,19 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
|||
s.writeHTTPServerResponse(req, w, resp)
|
||||
}
|
||||
|
||||
// RegisterLocal performs local client registration.
|
||||
func (s *Server) RegisterLocal(ctx context.Context, events chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) {
|
||||
subChan := make(chan intEvent, notificationBufSize)
|
||||
subscr := &subscriber{writer: subChan}
|
||||
s.subsLock.Lock()
|
||||
s.subscribers[subscr] = true
|
||||
s.subsLock.Unlock()
|
||||
go s.handleLocalNotifications(ctx, events, subChan, subscr)
|
||||
return func(req *neorpc.Request) (*neorpc.Response, error) {
|
||||
return s.handleInternal(req, subscr)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleRequest(req *params.Request, sub *subscriber) abstractResult {
|
||||
if req.In != nil {
|
||||
req.In.Method = escapeForLog(req.In.Method) // No valid method name will be changed by it.
|
||||
|
@ -518,6 +531,51 @@ func (s *Server) handleRequest(req *params.Request, sub *subscriber) abstractRes
|
|||
return resp
|
||||
}
|
||||
|
||||
// handleInternal is an experimental interface to handle client requests directly.
|
||||
func (s *Server) handleInternal(req *neorpc.Request, sub *subscriber) (*neorpc.Response, error) {
|
||||
var (
|
||||
res interface{}
|
||||
rpcRes = &neorpc.Response{
|
||||
HeaderAndError: neorpc.HeaderAndError{
|
||||
Header: neorpc.Header{
|
||||
JSONRPC: req.JSONRPC,
|
||||
ID: json.RawMessage(strconv.FormatUint(req.ID, 10)),
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
reqParams, err := params.FromAny(req.Params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.log.Debug("processing local rpc request",
|
||||
zap.String("method", req.Method),
|
||||
zap.Stringer("params", reqParams))
|
||||
|
||||
start := time.Now()
|
||||
defer func() { addReqTimeMetric(req.Method, time.Since(start)) }()
|
||||
|
||||
rpcRes.Error = neorpc.NewMethodNotFoundError(fmt.Sprintf("method %q not supported", req.Method))
|
||||
handler, ok := rpcHandlers[req.Method]
|
||||
if ok {
|
||||
res, rpcRes.Error = handler(s, reqParams)
|
||||
} else if sub != nil {
|
||||
handler, ok := rpcWsHandlers[req.Method]
|
||||
if ok {
|
||||
res, rpcRes.Error = handler(s, reqParams, sub)
|
||||
}
|
||||
}
|
||||
if res != nil {
|
||||
b, err := json.Marshal(res)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("response can't be JSONized: %w", err)
|
||||
}
|
||||
rpcRes.Result = json.RawMessage(b)
|
||||
}
|
||||
return rpcRes, nil
|
||||
}
|
||||
|
||||
func (s *Server) handleIn(req *params.In, sub *subscriber) abstract {
|
||||
var res interface{}
|
||||
var resErr *neorpc.Error
|
||||
|
@ -547,7 +605,31 @@ func (s *Server) handleIn(req *params.In, sub *subscriber) abstract {
|
|||
return s.packResponse(req, res, resErr)
|
||||
}
|
||||
|
||||
func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan abstractResult, subChan <-chan *websocket.PreparedMessage) {
|
||||
func (s *Server) handleLocalNotifications(ctx context.Context, events chan<- neorpc.Notification, subChan <-chan intEvent, subscr *subscriber) {
|
||||
eventloop:
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
break eventloop
|
||||
case <-ctx.Done():
|
||||
break eventloop
|
||||
case ev := <-subChan:
|
||||
events <- *ev.ntf // Make a copy.
|
||||
}
|
||||
}
|
||||
close(events)
|
||||
s.dropSubscriber(subscr)
|
||||
drainloop:
|
||||
for {
|
||||
select {
|
||||
case <-subChan:
|
||||
default:
|
||||
break drainloop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan abstractResult, subChan <-chan intEvent) {
|
||||
pingTicker := time.NewTicker(wsPingPeriod)
|
||||
eventloop:
|
||||
for {
|
||||
|
@ -561,7 +643,7 @@ eventloop:
|
|||
if err := ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)); err != nil {
|
||||
break eventloop
|
||||
}
|
||||
if err := ws.WritePreparedMessage(event); err != nil {
|
||||
if err := ws.WritePreparedMessage(event.msg); err != nil {
|
||||
break eventloop
|
||||
}
|
||||
case res, ok := <-resChan:
|
||||
|
@ -621,7 +703,12 @@ requestloop:
|
|||
case resChan <- res:
|
||||
}
|
||||
}
|
||||
s.dropSubscriber(subscr)
|
||||
close(resChan)
|
||||
ws.Close()
|
||||
}
|
||||
|
||||
func (s *Server) dropSubscriber(subscr *subscriber) {
|
||||
s.subsLock.Lock()
|
||||
delete(s.subscribers, subscr)
|
||||
s.subsLock.Unlock()
|
||||
|
@ -632,8 +719,6 @@ requestloop:
|
|||
}
|
||||
}
|
||||
s.subsCounterLock.Unlock()
|
||||
close(resChan)
|
||||
ws.Close()
|
||||
}
|
||||
|
||||
func (s *Server) getBestBlockHash(_ params.Params) (interface{}, *neorpc.Error) {
|
||||
|
@ -2581,11 +2666,12 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
|
|||
}
|
||||
|
||||
func (s *Server) handleSubEvents() {
|
||||
b, err := json.Marshal(neorpc.Notification{
|
||||
var overflowEvent = neorpc.Notification{
|
||||
JSONRPC: neorpc.JSONRPCVersion,
|
||||
Event: neorpc.MissedEventID,
|
||||
Payload: make([]interface{}, 0),
|
||||
})
|
||||
}
|
||||
b, err := json.Marshal(overflowEvent)
|
||||
if err != nil {
|
||||
s.log.Error("fatal: failed to marshal overflow event", zap.Error(err))
|
||||
return
|
||||
|
@ -2649,12 +2735,12 @@ chloop:
|
|||
}
|
||||
}
|
||||
select {
|
||||
case sub.writer <- msg:
|
||||
case sub.writer <- intEvent{msg, &resp}:
|
||||
default:
|
||||
sub.overflown.Store(true)
|
||||
// MissedEvent is to be delivered eventually.
|
||||
go func(sub *subscriber) {
|
||||
sub.writer <- overflowMsg
|
||||
sub.writer <- intEvent{overflowMsg, &overflowEvent}
|
||||
sub.overflown.Store(false)
|
||||
}(sub)
|
||||
}
|
||||
|
|
|
@ -7,9 +7,16 @@ import (
|
|||
)
|
||||
|
||||
type (
|
||||
// intEvent is an internal event that has both a proper structure and
|
||||
// a websocket-ready message. It's used to serve websocket-based clients
|
||||
// as well as internal ones.
|
||||
intEvent struct {
|
||||
msg *websocket.PreparedMessage
|
||||
ntf *neorpc.Notification
|
||||
}
|
||||
// subscriber is an event subscriber.
|
||||
subscriber struct {
|
||||
writer chan<- *websocket.PreparedMessage
|
||||
writer chan<- intEvent
|
||||
ws *websocket.Conn
|
||||
overflown atomic.Bool
|
||||
// These work like slots as there is not a lot of them (it's
|
||||
|
|
Loading…
Reference in a new issue