diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index e0a075278..372333efa 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -2233,18 +2233,19 @@ func TestWSClient_WaitWithMissedEvent(t *testing.T) { // Accept the next block, but subscriber will get no events because it's overflown. require.NoError(t, chain.AddBlock(b1)) - overEvent, err := json.Marshal(neorpc.Notification{ + overNotification := neorpc.Notification{ JSONRPC: neorpc.JSONRPCVersion, Event: neorpc.MissedEventID, Payload: make([]interface{}, 0), - }) + } + overEvent, err := json.Marshal(overNotification) require.NoError(t, err) overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, overEvent) require.NoError(t, err) rpcSrv.subsLock.Lock() // Deliver overflow message -> triggers subscriber to retry with polling waiter. for s := range rpcSrv.subscribers { - s.writer <- overflowMsg + s.writer <- intEvent{overflowMsg, &overNotification} } rpcSrv.subsLock.Unlock() diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index e2a5d2a58..1899a0ee1 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -469,7 +469,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ return } resChan := make(chan abstractResult) // response.abstract or response.abstractBatch - subChan := make(chan *websocket.PreparedMessage, notificationBufSize) + subChan := make(chan intEvent, notificationBufSize) subscr := &subscriber{writer: subChan, ws: ws} s.subsLock.Lock() s.subscribers[subscr] = true @@ -505,6 +505,19 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ s.writeHTTPServerResponse(req, w, resp) } +// RegisterLocal performs local client registration. +func (s *Server) RegisterLocal(ctx context.Context, events chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) { + subChan := make(chan intEvent, notificationBufSize) + subscr := &subscriber{writer: subChan} + s.subsLock.Lock() + s.subscribers[subscr] = true + s.subsLock.Unlock() + go s.handleLocalNotifications(ctx, events, subChan, subscr) + return func(req *neorpc.Request) (*neorpc.Response, error) { + return s.handleInternal(req, subscr) + } +} + func (s *Server) handleRequest(req *params.Request, sub *subscriber) abstractResult { if req.In != nil { req.In.Method = escapeForLog(req.In.Method) // No valid method name will be changed by it. @@ -518,6 +531,51 @@ func (s *Server) handleRequest(req *params.Request, sub *subscriber) abstractRes return resp } +// handleInternal is an experimental interface to handle client requests directly. +func (s *Server) handleInternal(req *neorpc.Request, sub *subscriber) (*neorpc.Response, error) { + var ( + res interface{} + rpcRes = &neorpc.Response{ + HeaderAndError: neorpc.HeaderAndError{ + Header: neorpc.Header{ + JSONRPC: req.JSONRPC, + ID: json.RawMessage(strconv.FormatUint(req.ID, 10)), + }, + }, + } + ) + reqParams, err := params.FromAny(req.Params) + if err != nil { + return nil, err + } + + s.log.Debug("processing local rpc request", + zap.String("method", req.Method), + zap.Stringer("params", reqParams)) + + start := time.Now() + defer func() { addReqTimeMetric(req.Method, time.Since(start)) }() + + rpcRes.Error = neorpc.NewMethodNotFoundError(fmt.Sprintf("method %q not supported", req.Method)) + handler, ok := rpcHandlers[req.Method] + if ok { + res, rpcRes.Error = handler(s, reqParams) + } else if sub != nil { + handler, ok := rpcWsHandlers[req.Method] + if ok { + res, rpcRes.Error = handler(s, reqParams, sub) + } + } + if res != nil { + b, err := json.Marshal(res) + if err != nil { + return nil, fmt.Errorf("response can't be JSONized: %w", err) + } + rpcRes.Result = json.RawMessage(b) + } + return rpcRes, nil +} + func (s *Server) handleIn(req *params.In, sub *subscriber) abstract { var res interface{} var resErr *neorpc.Error @@ -547,7 +605,31 @@ func (s *Server) handleIn(req *params.In, sub *subscriber) abstract { return s.packResponse(req, res, resErr) } -func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan abstractResult, subChan <-chan *websocket.PreparedMessage) { +func (s *Server) handleLocalNotifications(ctx context.Context, events chan<- neorpc.Notification, subChan <-chan intEvent, subscr *subscriber) { +eventloop: + for { + select { + case <-s.shutdown: + break eventloop + case <-ctx.Done(): + break eventloop + case ev := <-subChan: + events <- *ev.ntf // Make a copy. + } + } + close(events) + s.dropSubscriber(subscr) +drainloop: + for { + select { + case <-subChan: + default: + break drainloop + } + } +} + +func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan abstractResult, subChan <-chan intEvent) { pingTicker := time.NewTicker(wsPingPeriod) eventloop: for { @@ -561,7 +643,7 @@ eventloop: if err := ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)); err != nil { break eventloop } - if err := ws.WritePreparedMessage(event); err != nil { + if err := ws.WritePreparedMessage(event.msg); err != nil { break eventloop } case res, ok := <-resChan: @@ -621,7 +703,12 @@ requestloop: case resChan <- res: } } + s.dropSubscriber(subscr) + close(resChan) + ws.Close() +} +func (s *Server) dropSubscriber(subscr *subscriber) { s.subsLock.Lock() delete(s.subscribers, subscr) s.subsLock.Unlock() @@ -632,8 +719,6 @@ requestloop: } } s.subsCounterLock.Unlock() - close(resChan) - ws.Close() } func (s *Server) getBestBlockHash(_ params.Params) (interface{}, *neorpc.Error) { @@ -2581,11 +2666,12 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) { } func (s *Server) handleSubEvents() { - b, err := json.Marshal(neorpc.Notification{ + var overflowEvent = neorpc.Notification{ JSONRPC: neorpc.JSONRPCVersion, Event: neorpc.MissedEventID, Payload: make([]interface{}, 0), - }) + } + b, err := json.Marshal(overflowEvent) if err != nil { s.log.Error("fatal: failed to marshal overflow event", zap.Error(err)) return @@ -2649,12 +2735,12 @@ chloop: } } select { - case sub.writer <- msg: + case sub.writer <- intEvent{msg, &resp}: default: sub.overflown.Store(true) // MissedEvent is to be delivered eventually. go func(sub *subscriber) { - sub.writer <- overflowMsg + sub.writer <- intEvent{overflowMsg, &overflowEvent} sub.overflown.Store(false) }(sub) } diff --git a/pkg/services/rpcsrv/subscription.go b/pkg/services/rpcsrv/subscription.go index 85e9a7036..306c27f26 100644 --- a/pkg/services/rpcsrv/subscription.go +++ b/pkg/services/rpcsrv/subscription.go @@ -7,9 +7,16 @@ import ( ) type ( + // intEvent is an internal event that has both a proper structure and + // a websocket-ready message. It's used to serve websocket-based clients + // as well as internal ones. + intEvent struct { + msg *websocket.PreparedMessage + ntf *neorpc.Notification + } // subscriber is an event subscriber. subscriber struct { - writer chan<- *websocket.PreparedMessage + writer chan<- intEvent ws *websocket.Conn overflown atomic.Bool // These work like slots as there is not a lot of them (it's