rpc/server: add websockets support via '/ws' URL

This commit is contained in:
Roman Khimov 2020-04-29 15:25:58 +03:00
parent 8cec6694ae
commit ec62edac68
4 changed files with 108 additions and 4 deletions

1
go.mod
View file

@ -6,6 +6,7 @@ require (
github.com/dgraph-io/badger/v2 v2.0.3 github.com/dgraph-io/badger/v2 v2.0.3
github.com/go-redis/redis v6.10.2+incompatible github.com/go-redis/redis v6.10.2+incompatible
github.com/go-yaml/yaml v2.1.0+incompatible github.com/go-yaml/yaml v2.1.0+incompatible
github.com/gorilla/websocket v1.4.2
github.com/mr-tron/base58 v1.1.2 github.com/mr-tron/base58 v1.1.2
github.com/nspcc-dev/dbft v0.0.0-20200303183127-36d3da79c682 github.com/nspcc-dev/dbft v0.0.0-20200303183127-36d3da79c682
github.com/nspcc-dev/rfc6979 v0.2.0 github.com/nspcc-dev/rfc6979 v0.2.0

2
go.sum
View file

@ -85,6 +85,8 @@ github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=

View file

@ -9,12 +9,12 @@ import (
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/rpc"
"github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/hash"
@ -22,6 +22,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network" "github.com/nspcc-dev/neo-go/pkg/network"
"github.com/nspcc-dev/neo-go/pkg/rpc"
"github.com/nspcc-dev/neo-go/pkg/rpc/request" "github.com/nspcc-dev/neo-go/pkg/rpc/request"
"github.com/nspcc-dev/neo-go/pkg/rpc/response" "github.com/nspcc-dev/neo-go/pkg/rpc/response"
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result" "github.com/nspcc-dev/neo-go/pkg/rpc/response/result"
@ -44,6 +45,20 @@ type (
} }
) )
const (
// Message limit for receiving side.
wsReadLimit = 4096
// Disconnection timeout.
wsPongLimit = 60 * time.Second
// Ping period for connection liveness check.
wsPingPeriod = wsPongLimit / 2
// Write deadline.
wsWriteLimit = wsPingPeriod / 2
)
var rpcHandlers = map[string]func(*Server, request.Params) (interface{}, *response.Error){ var rpcHandlers = map[string]func(*Server, request.Params) (interface{}, *response.Error){
"getaccountstate": (*Server).getAccountState, "getaccountstate": (*Server).getAccountState,
"getapplicationlog": (*Server).getApplicationLog, "getapplicationlog": (*Server).getApplicationLog,
@ -81,6 +96,10 @@ var invalidBlockHeightError = func(index int, height int) *response.Error {
return response.NewRPCError(fmt.Sprintf("Param at index %d should be greater than or equal to 0 and less then or equal to current block height, got: %d", index, height), "", nil) return response.NewRPCError(fmt.Sprintf("Param at index %d should be greater than or equal to 0 and less then or equal to current block height, got: %d", index, height), "", nil)
} }
// upgrader is a no-op websocket.Upgrader that reuses HTTP server buffers and
// doesn't set any Error function.
var upgrader = websocket.Upgrader{}
// New creates a new Server struct. // New creates a new Server struct.
func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server, log *zap.Logger) Server { func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server, log *zap.Logger) Server {
httpServer := &http.Server{ httpServer := &http.Server{
@ -150,6 +169,18 @@ func (s *Server) Shutdown() error {
} }
func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) { func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) {
if httpRequest.URL.Path == "/ws" && httpRequest.Method == "GET" {
ws, err := upgrader.Upgrade(w, httpRequest, nil)
if err != nil {
s.log.Info("websocket connection upgrade failed", zap.Error(err))
return
}
resChan := make(chan response.Raw)
go s.handleWsWrites(ws, resChan)
s.handleWsReads(ws, resChan)
return
}
req := request.NewIn() req := request.NewIn()
if httpRequest.Method != "POST" { if httpRequest.Method != "POST" {
@ -193,6 +224,49 @@ func (s *Server) handleRequest(req *request.In) response.Raw {
return s.packResponseToRaw(req, res, resErr) return s.packResponseToRaw(req, res, resErr)
} }
func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.Raw) {
pingTicker := time.NewTicker(wsPingPeriod)
defer ws.Close()
defer pingTicker.Stop()
for {
select {
case res, ok := <-resChan:
if !ok {
return
}
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
if err := ws.WriteJSON(res); err != nil {
return
}
case <-pingTicker.C:
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}
func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.Raw) {
ws.SetReadLimit(wsReadLimit)
ws.SetReadDeadline(time.Now().Add(wsPongLimit))
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(wsPongLimit)); return nil })
for {
req := new(request.In)
err := ws.ReadJSON(req)
if err != nil {
break
}
res := s.handleRequest(req)
if res.Error != nil {
s.logRequestError(req, res.Error)
}
resChan <- res
}
close(resChan)
ws.Close()
}
func (s *Server) getBestBlockHash(_ request.Params) (interface{}, *response.Error) { func (s *Server) getBestBlockHash(_ request.Params) (interface{}, *response.Error) {
return "0x" + s.chain.CurrentBlockHash().StringLE(), nil return "0x" + s.chain.CurrentBlockHash().StringLE(), nil
} }

View file

@ -14,6 +14,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
@ -814,6 +815,19 @@ var rpcTestCases = map[string][]rpcTestCase{
} }
func TestRPC(t *testing.T) { func TestRPC(t *testing.T) {
t.Run("http", func(t *testing.T) {
testRPCProtocol(t, doRPCCallOverHTTP)
})
t.Run("websocket", func(t *testing.T) {
testRPCProtocol(t, doRPCCallOverWS)
})
}
// testRPCProtocol runs a full set of tests using given callback to make actual
// calls. Some tests change the chain state, thus we reinitialize the chain from
// scratch here.
func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) []byte) {
chain, httpSrv := initServerWithInMemoryChain(t) chain, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()
@ -1082,7 +1096,20 @@ func checkErrGetResult(t *testing.T, body []byte, expectingFail bool) json.RawMe
return resp.Result return resp.Result
} }
func doRPCCall(rpcCall string, url string, t *testing.T) []byte { func doRPCCallOverWS(rpcCall string, url string, t *testing.T) []byte {
dialer := websocket.Dialer{HandshakeTimeout: time.Second}
url = "ws" + strings.TrimPrefix(url, "http")
c, _, err := dialer.Dial(url+"/ws", nil)
require.NoError(t, err)
c.SetWriteDeadline(time.Now().Add(time.Second))
require.NoError(t, c.WriteMessage(1, []byte(rpcCall)))
c.SetReadDeadline(time.Now().Add(time.Second))
_, body, err := c.ReadMessage()
require.NoError(t, err)
return bytes.TrimSpace(body)
}
func doRPCCallOverHTTP(rpcCall string, url string, t *testing.T) []byte {
cl := http.Client{Timeout: time.Second} cl := http.Client{Timeout: time.Second}
resp, err := cl.Post(url, "application/json", strings.NewReader(rpcCall)) resp, err := cl.Post(url, "application/json", strings.NewReader(rpcCall))
require.NoErrorf(t, err, "could not make a POST request") require.NoErrorf(t, err, "could not make a POST request")