forked from TrueCloudLab/neoneo-go
parent
679846c1a1
commit
6ee919747f
5 changed files with 272 additions and 58 deletions
|
@ -225,7 +225,7 @@ func (s *Server) Shutdown() error {
|
|||
}
|
||||
|
||||
func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) {
|
||||
req := request.NewIn()
|
||||
req := request.NewRequest()
|
||||
|
||||
if httpRequest.URL.Path == "/ws" && httpRequest.Method == "GET" {
|
||||
// Technically there is a race between this check and
|
||||
|
@ -237,7 +237,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
|||
s.subsLock.RUnlock()
|
||||
if numOfSubs >= maxSubscribers {
|
||||
s.writeHTTPErrorResponse(
|
||||
req,
|
||||
request.NewIn(),
|
||||
w,
|
||||
response.NewInternalServerError("websocket users limit reached", nil),
|
||||
)
|
||||
|
@ -248,7 +248,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
|||
s.log.Info("websocket connection upgrade failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
resChan := make(chan response.Abstract)
|
||||
resChan := make(chan response.AbstractResult) // response.Abstract or response.AbstractBatch
|
||||
subChan := make(chan *websocket.PreparedMessage, notificationBufSize)
|
||||
subscr := &subscriber{writer: subChan, ws: ws}
|
||||
s.subsLock.Lock()
|
||||
|
@ -261,7 +261,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
|||
|
||||
if httpRequest.Method != "POST" {
|
||||
s.writeHTTPErrorResponse(
|
||||
req,
|
||||
request.NewIn(),
|
||||
w,
|
||||
response.NewInvalidParamsError(
|
||||
fmt.Sprintf("Invalid method '%s', please retry with 'POST'", httpRequest.Method), nil,
|
||||
|
@ -272,7 +272,7 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
|||
|
||||
err := req.DecodeData(httpRequest.Body)
|
||||
if err != nil {
|
||||
s.writeHTTPErrorResponse(req, w, response.NewParseError("Problem parsing JSON-RPC request body", err))
|
||||
s.writeHTTPErrorResponse(request.NewIn(), w, response.NewParseError("Problem parsing JSON-RPC request body", err))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -280,9 +280,23 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ
|
|||
s.writeHTTPServerResponse(req, w, resp)
|
||||
}
|
||||
|
||||
func (s *Server) handleRequest(req *request.In, sub *subscriber) response.Abstract {
|
||||
func (s *Server) handleRequest(req *request.Request, sub *subscriber) response.AbstractResult {
|
||||
if req.In != nil {
|
||||
return s.handleIn(req.In, sub)
|
||||
}
|
||||
resp := make(response.AbstractBatch, len(req.Batch))
|
||||
for i, in := range req.Batch {
|
||||
resp[i] = s.handleIn(&in, sub)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
func (s *Server) handleIn(req *request.In, sub *subscriber) response.Abstract {
|
||||
var res interface{}
|
||||
var resErr *response.Error
|
||||
if req.JSONRPC != request.JSONRPCVersion {
|
||||
return s.packResponse(req, nil, response.NewInvalidParamsError("Problem parsing JSON", fmt.Errorf("invalid version, expected 2.0 got: '%s'", req.JSONRPC)))
|
||||
}
|
||||
|
||||
reqParams, err := req.Params()
|
||||
if err != nil {
|
||||
|
@ -308,7 +322,7 @@ func (s *Server) handleRequest(req *request.In, sub *subscriber) response.Abstra
|
|||
return s.packResponse(req, res, resErr)
|
||||
}
|
||||
|
||||
func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.Abstract, subChan <-chan *websocket.PreparedMessage) {
|
||||
func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.AbstractResult, subChan <-chan *websocket.PreparedMessage) {
|
||||
pingTicker := time.NewTicker(wsPingPeriod)
|
||||
eventloop:
|
||||
for {
|
||||
|
@ -355,21 +369,21 @@ drainloop:
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.Abstract, subscr *subscriber) {
|
||||
func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.AbstractResult, subscr *subscriber) {
|
||||
ws.SetReadLimit(wsReadLimit)
|
||||
ws.SetReadDeadline(time.Now().Add(wsPongLimit))
|
||||
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(wsPongLimit)); return nil })
|
||||
requestloop:
|
||||
for {
|
||||
req := new(request.In)
|
||||
req := request.NewRequest()
|
||||
err := ws.ReadJSON(req)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
res := s.handleRequest(req, subscr)
|
||||
if res.Error != nil {
|
||||
s.logRequestError(req, res.Error)
|
||||
}
|
||||
res.RunForErrors(func(jsonErr *response.Error) {
|
||||
s.logRequestError(req, jsonErr)
|
||||
})
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
break requestloop
|
||||
|
@ -1388,15 +1402,17 @@ func (s *Server) packResponse(r *request.In, result interface{}, respErr *respon
|
|||
}
|
||||
|
||||
// logRequestError is a request error logger.
|
||||
func (s *Server) logRequestError(r *request.In, jsonErr *response.Error) {
|
||||
func (s *Server) logRequestError(r *request.Request, jsonErr *response.Error) {
|
||||
logFields := []zap.Field{
|
||||
zap.Error(jsonErr.Cause),
|
||||
zap.String("method", r.Method),
|
||||
}
|
||||
|
||||
params, err := r.Params()
|
||||
if err == nil {
|
||||
logFields = append(logFields, zap.Any("params", params))
|
||||
if r.In != nil {
|
||||
logFields = append(logFields, zap.String("method", r.In.Method))
|
||||
params, err := r.In.Params()
|
||||
if err == nil {
|
||||
logFields = append(logFields, zap.Any("params", params))
|
||||
}
|
||||
}
|
||||
|
||||
s.log.Error("Error encountered with rpc request", logFields...)
|
||||
|
@ -1405,14 +1421,19 @@ func (s *Server) logRequestError(r *request.In, jsonErr *response.Error) {
|
|||
// writeHTTPErrorResponse writes an error response to the ResponseWriter.
|
||||
func (s *Server) writeHTTPErrorResponse(r *request.In, w http.ResponseWriter, jsonErr *response.Error) {
|
||||
resp := s.packResponse(r, nil, jsonErr)
|
||||
s.writeHTTPServerResponse(r, w, resp)
|
||||
s.writeHTTPServerResponse(&request.Request{In: r}, w, resp)
|
||||
}
|
||||
|
||||
func (s *Server) writeHTTPServerResponse(r *request.In, w http.ResponseWriter, resp response.Abstract) {
|
||||
func (s *Server) writeHTTPServerResponse(r *request.Request, w http.ResponseWriter, resp response.AbstractResult) {
|
||||
// Errors can happen in many places and we can only catch ALL of them here.
|
||||
if resp.Error != nil {
|
||||
s.logRequestError(r, resp.Error)
|
||||
w.WriteHeader(resp.Error.HTTPCode)
|
||||
resp.RunForErrors(func(jsonErr *response.Error) {
|
||||
s.logRequestError(r, jsonErr)
|
||||
})
|
||||
if r.In != nil {
|
||||
resp := resp.(response.Abstract)
|
||||
if resp.Error != nil {
|
||||
w.WriteHeader(resp.Error.HTTPCode)
|
||||
}
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
if s.config.EnableCORSWorkaround {
|
||||
|
@ -1424,9 +1445,15 @@ func (s *Server) writeHTTPServerResponse(r *request.In, w http.ResponseWriter, r
|
|||
err := encoder.Encode(resp)
|
||||
|
||||
if err != nil {
|
||||
s.log.Error("Error encountered while encoding response",
|
||||
zap.String("err", err.Error()),
|
||||
zap.String("method", r.Method))
|
||||
switch {
|
||||
case r.In != nil:
|
||||
s.log.Error("Error encountered while encoding response",
|
||||
zap.String("err", err.Error()),
|
||||
zap.String("method", r.In.Method))
|
||||
case r.Batch != nil:
|
||||
s.log.Error("Error encountered while encoding batch response",
|
||||
zap.String("err", err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue