From 24b46c6041eb8b9836f4fb63a89d84b7cdd6b0c2 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 29 Apr 2020 15:25:58 +0300 Subject: [PATCH] rpc/server: add websockets support via '/ws' URL --- go.mod | 1 + go.sum | 2 + pkg/rpc/server/server.go | 78 ++++++++++++++++++++++++++++++++++- pkg/rpc/server/server_test.go | 29 ++++++++++++- 4 files changed, 107 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 0d4eb0ec8..570f75872 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/dgraph-io/badger/v2 v2.0.3 github.com/go-redis/redis v6.10.2+incompatible github.com/go-yaml/yaml v2.1.0+incompatible + github.com/gorilla/websocket v1.4.2 github.com/mr-tron/base58 v1.1.2 github.com/nspcc-dev/dbft v0.0.0-20200303183127-36d3da79c682 github.com/nspcc-dev/rfc6979 v0.2.0 diff --git a/go.sum b/go.sum index abbecef44..9d9886a13 100644 --- a/go.sum +++ b/go.sum @@ -85,6 +85,8 @@ github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index da5f9a72b..87f43b519 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -9,9 +9,9 @@ import ( "net" "net/http" "strconv" + "time" - "github.com/nspcc-dev/neo-go/pkg/rpc" - + "github.com/gorilla/websocket" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" @@ -21,6 +21,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network" + "github.com/nspcc-dev/neo-go/pkg/rpc" "github.com/nspcc-dev/neo-go/pkg/rpc/request" "github.com/nspcc-dev/neo-go/pkg/rpc/response" "github.com/nspcc-dev/neo-go/pkg/rpc/response/result" @@ -43,6 +44,20 @@ type ( } ) +const ( + // Message limit for receiving side. + wsReadLimit = 4096 + + // Disconnection timeout. + wsPongLimit = 60 * time.Second + + // Ping period for connection liveness check. + wsPingPeriod = wsPongLimit / 2 + + // Write deadline. + wsWriteLimit = wsPingPeriod / 2 +) + var rpcHandlers = map[string]func(*Server, request.Params) (interface{}, *response.Error){ "getaccountstate": (*Server).getAccountState, "getapplicationlog": (*Server).getApplicationLog, @@ -80,6 +95,10 @@ var invalidBlockHeightError = func(index int, height int) *response.Error { return response.NewRPCError(fmt.Sprintf("Param at index %d should be greater than or equal to 0 and less then or equal to current block height, got: %d", index, height), "", nil) } +// upgrader is a no-op websocket.Upgrader that reuses HTTP server buffers and +// doesn't set any Error function. +var upgrader = websocket.Upgrader{} + // New creates a new Server struct. func New(chain core.Blockchainer, conf rpc.Config, coreServer *network.Server, log *zap.Logger) Server { httpServer := &http.Server{ @@ -149,6 +168,18 @@ func (s *Server) Shutdown() error { } func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) { + if httpRequest.URL.Path == "/ws" && httpRequest.Method == "GET" { + ws, err := upgrader.Upgrade(w, httpRequest, nil) + if err != nil { + s.log.Info("websocket connection upgrade failed", zap.Error(err)) + return + } + resChan := make(chan response.Raw) + go s.handleWsWrites(ws, resChan) + s.handleWsReads(ws, resChan) + return + } + req := request.NewIn() if httpRequest.Method != "POST" { @@ -192,6 +223,49 @@ func (s *Server) handleRequest(req *request.In) response.Raw { return s.packResponseToRaw(req, res, resErr) } +func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.Raw) { + pingTicker := time.NewTicker(wsPingPeriod) + defer ws.Close() + defer pingTicker.Stop() + for { + select { + case res, ok := <-resChan: + if !ok { + return + } + ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)) + if err := ws.WriteJSON(res); err != nil { + return + } + case <-pingTicker.C: + ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)) + if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + return + } + } + } +} + +func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.Raw) { + ws.SetReadLimit(wsReadLimit) + ws.SetReadDeadline(time.Now().Add(wsPongLimit)) + ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(wsPongLimit)); return nil }) + for { + req := new(request.In) + err := ws.ReadJSON(req) + if err != nil { + break + } + res := s.handleRequest(req) + if res.Error != nil { + s.logRequestError(req, res.Error) + } + resChan <- res + } + close(resChan) + ws.Close() +} + func (s *Server) getBestBlockHash(_ request.Params) (interface{}, *response.Error) { return "0x" + s.chain.CurrentBlockHash().StringLE(), nil } diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 830bf895e..8c2fb826a 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/gorilla/websocket" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -882,6 +883,19 @@ var rpcTestCases = map[string][]rpcTestCase{ } func TestRPC(t *testing.T) { + t.Run("http", func(t *testing.T) { + testRPCProtocol(t, doRPCCallOverHTTP) + }) + + t.Run("websocket", func(t *testing.T) { + testRPCProtocol(t, doRPCCallOverWS) + }) +} + +// testRPCProtocol runs a full set of tests using given callback to make actual +// calls. Some tests change the chain state, thus we reinitialize the chain from +// scratch here. +func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) []byte) { chain, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() @@ -1028,7 +1042,20 @@ func checkErrGetResult(t *testing.T, body []byte, expectingFail bool) json.RawMe return resp.Result } -func doRPCCall(rpcCall string, url string, t *testing.T) []byte { +func doRPCCallOverWS(rpcCall string, url string, t *testing.T) []byte { + dialer := websocket.Dialer{HandshakeTimeout: time.Second} + url = "ws" + strings.TrimPrefix(url, "http") + c, _, err := dialer.Dial(url+"/ws", nil) + require.NoError(t, err) + c.SetWriteDeadline(time.Now().Add(time.Second)) + require.NoError(t, c.WriteMessage(1, []byte(rpcCall))) + c.SetReadDeadline(time.Now().Add(time.Second)) + _, body, err := c.ReadMessage() + require.NoError(t, err) + return bytes.TrimSpace(body) +} + +func doRPCCallOverHTTP(rpcCall string, url string, t *testing.T) []byte { cl := http.Client{Timeout: time.Second} resp, err := cl.Post(url, "application/json", strings.NewReader(rpcCall)) require.NoErrorf(t, err, "could not make a POST request")