From af17bbfeababc6148cc5a58bcb67114f97357482 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 31 Aug 2020 18:27:32 +0300 Subject: [PATCH] rpc/server: encode answers more efficiently We're at the point where even this code can clearly be seen in profiles. We can save on some buffers (and CPU cycles) by encoding the answer once. Another ~2% TPS for single node. --- pkg/rpc/response/types.go | 7 +++++++ pkg/rpc/server/server.go | 32 ++++++++++++-------------------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/pkg/rpc/response/types.go b/pkg/rpc/response/types.go index 136082768..1cf09bddc 100644 --- a/pkg/rpc/response/types.go +++ b/pkg/rpc/response/types.go @@ -24,6 +24,13 @@ type Raw struct { Result json.RawMessage `json:"result,omitempty"` } +// Abstract represents abstract JSON-RPC 2.0 response, it differs from Raw in +// that Result field is an interface here. +type Abstract struct { + HeaderAndError + Result interface{} `json:"result,omitempty"` +} + // Notification is a type used to represent wire format of events, they're // special in that they look like requests but they don't have IDs and their // "method" is actually an event name. diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index eb6828f02..3617fcebd 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -230,7 +230,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ s.log.Info("websocket connection upgrade failed", zap.Error(err)) return } - resChan := make(chan response.Raw) + resChan := make(chan response.Abstract) subChan := make(chan *websocket.PreparedMessage, notificationBufSize) subscr := &subscriber{writer: subChan, ws: ws} s.subsLock.Lock() @@ -262,13 +262,13 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ s.writeHTTPServerResponse(req, w, resp) } -func (s *Server) handleRequest(req *request.In, sub *subscriber) response.Raw { +func (s *Server) handleRequest(req *request.In, sub *subscriber) response.Abstract { var res interface{} var resErr *response.Error reqParams, err := req.Params() if err != nil { - return s.packResponseToRaw(req, nil, response.NewInvalidParamsError("Problem parsing request parameters", err)) + return s.packResponse(req, nil, response.NewInvalidParamsError("Problem parsing request parameters", err)) } s.log.Debug("processing rpc request", @@ -287,10 +287,10 @@ func (s *Server) handleRequest(req *request.In, sub *subscriber) response.Raw { res, resErr = handler(s, *reqParams, sub) } } - return s.packResponseToRaw(req, res, resErr) + return s.packResponse(req, res, resErr) } -func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.Raw, subChan <-chan *websocket.PreparedMessage) { +func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.Abstract, subChan <-chan *websocket.PreparedMessage) { pingTicker := time.NewTicker(wsPingPeriod) eventloop: for { @@ -337,7 +337,7 @@ drainloop: } } -func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.Raw, subscr *subscriber) { +func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.Abstract, subscr *subscriber) { ws.SetReadLimit(wsReadLimit) ws.SetReadDeadline(time.Now().Add(wsPongLimit)) ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(wsPongLimit)); return nil }) @@ -712,7 +712,7 @@ func (s *Server) getStorage(ps request.Params) (interface{}, *response.Error) { item := s.chain.GetStorageItem(id, key) if item == nil { - return nil, nil + return "", nil } return hex.EncodeToString(item.Value), nil @@ -1250,8 +1250,8 @@ func (s *Server) blockHeightFromParam(param *request.Param) (int, *response.Erro return num, nil } -func (s *Server) packResponseToRaw(r *request.In, result interface{}, respErr *response.Error) response.Raw { - resp := response.Raw{ +func (s *Server) packResponse(r *request.In, result interface{}, respErr *response.Error) response.Abstract { + resp := response.Abstract{ HeaderAndError: response.HeaderAndError{ Header: response.Header{ JSONRPC: r.JSONRPC, @@ -1262,15 +1262,7 @@ func (s *Server) packResponseToRaw(r *request.In, result interface{}, respErr *r if respErr != nil { resp.Error = respErr } else { - resJSON, err := json.Marshal(result) - if err != nil { - s.log.Error("failed to marshal result", - zap.Error(err), - zap.String("method", r.Method)) - resp.Error = response.NewInternalServerError("failed to encode result", err) - } else { - resp.Result = resJSON - } + resp.Result = result } return resp } @@ -1292,11 +1284,11 @@ func (s *Server) logRequestError(r *request.In, jsonErr *response.Error) { // writeHTTPErrorResponse writes an error response to the ResponseWriter. func (s *Server) writeHTTPErrorResponse(r *request.In, w http.ResponseWriter, jsonErr *response.Error) { - resp := s.packResponseToRaw(r, nil, jsonErr) + resp := s.packResponse(r, nil, jsonErr) s.writeHTTPServerResponse(r, w, resp) } -func (s *Server) writeHTTPServerResponse(r *request.In, w http.ResponseWriter, resp response.Raw) { +func (s *Server) writeHTTPServerResponse(r *request.In, w http.ResponseWriter, resp response.Abstract) { // Errors can happen in many places and we can only catch ALL of them here. if resp.Error != nil { s.logRequestError(r, resp.Error)