rpc/client: add notifications support for WSClient

It differs from #895 design in that we have Notifications channel always
exposed as WSClient field, probably it simplifies things a little.
This commit is contained in:
Roman Khimov 2020-05-12 11:18:44 +03:00
parent fc22a46a4c
commit bef14977a2
2 changed files with 261 additions and 12 deletions

View file

@ -7,8 +7,11 @@ import (
"time"
"github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/rpc/request"
"github.com/nspcc-dev/neo-go/pkg/rpc/response"
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result"
)
// WSClient is a websocket-enabled RPC client that can be used with appropriate
@ -17,12 +20,28 @@ import (
// that is only provided via websockets (like event subscription mechanism).
type WSClient struct {
Client
// Notifications is a channel that is used to send events received from
// server. Client's code is supposed to be reading from this channel if
// it wants to use subscription mechanism, failing to do so will cause
// WSClient to block even regular requests. This channel is not buffered.
// In case of protocol error or upon connection closure this channel will
// be closed, so make sure to handle this.
Notifications chan Notification
ws *websocket.Conn
done chan struct{}
notifications chan *request.In
responses chan *response.Raw
requests chan *request.Raw
shutdown chan struct{}
subscriptions map[string]bool
}
// Notification represents server-generated notification for client subscriptions.
// Value can be one of block.Block, result.ApplicationLog, result.NotificationEvent
// or transaction.Transaction based on Type.
type Notification struct {
Type response.EventID
Value interface{}
}
// requestResponse is a combined type for request and response since we can get
@ -59,12 +78,15 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error
return nil, err
}
wsc := &WSClient{
Client: *cl,
ws: ws,
shutdown: make(chan struct{}),
done: make(chan struct{}),
responses: make(chan *response.Raw),
requests: make(chan *request.Raw),
Client: *cl,
Notifications: make(chan Notification),
ws: ws,
shutdown: make(chan struct{}),
done: make(chan struct{}),
responses: make(chan *response.Raw),
requests: make(chan *request.Raw),
subscriptions: make(map[string]bool),
}
go wsc.wsReader()
go wsc.wsWriter()
@ -86,6 +108,7 @@ func (c *WSClient) Close() {
func (c *WSClient) wsReader() {
c.ws.SetReadLimit(wsReadLimit)
c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(wsPongLimit)); return nil })
readloop:
for {
rr := new(requestResponse)
c.ws.SetReadDeadline(time.Now().Add(wsPongLimit))
@ -95,9 +118,37 @@ func (c *WSClient) wsReader() {
break
}
if rr.RawID == nil && rr.Method != "" {
if c.notifications != nil {
c.notifications <- &rr.In
event, err := response.GetEventIDFromString(rr.Method)
if err != nil {
// Bad event received.
break
}
var slice []json.RawMessage
err = json.Unmarshal(rr.RawParams, &slice)
if err != nil || len(slice) != 1 {
// Bad event received.
break
}
var val interface{}
switch event {
case response.BlockEventID:
val = new(block.Block)
case response.TransactionEventID:
val = new(transaction.Transaction)
case response.NotificationEventID:
val = new(result.NotificationEvent)
case response.ExecutionEventID:
val = new(result.ApplicationLog)
default:
// Bad event received.
break readloop
}
err = json.Unmarshal(slice[0], val)
if err != nil || len(slice) != 1 {
// Bad event received.
break
}
c.Notifications <- Notification{event, val}
} else if rr.RawID != nil && (rr.Error != nil || rr.Result != nil) {
resp := new(response.Raw)
resp.ID = rr.RawID
@ -112,9 +163,7 @@ func (c *WSClient) wsReader() {
}
close(c.done)
close(c.responses)
if c.notifications != nil {
close(c.notifications)
}
close(c.Notifications)
}
func (c *WSClient) wsWriter() {
@ -158,3 +207,73 @@ func (c *WSClient) makeWsRequest(r *request.Raw) (*response.Raw, error) {
return resp, nil
}
}
func (c *WSClient) performSubscription(params request.RawParams) (string, error) {
var resp string
if err := c.performRequest("subscribe", params, &resp); err != nil {
return "", err
}
c.subscriptions[resp] = true
return resp, nil
}
func (c *WSClient) performUnsubscription(id string) error {
var resp bool
if !c.subscriptions[id] {
return errors.New("no subscription with this ID")
}
if err := c.performRequest("unsubscribe", request.NewRawParams(id), &resp); err != nil {
return err
}
if !resp {
return errors.New("unsubscribe method returned false result")
}
delete(c.subscriptions, id)
return nil
}
// SubscribeForNewBlocks adds subscription for new block events to this instance
// of client.
func (c *WSClient) SubscribeForNewBlocks() (string, error) {
params := request.NewRawParams("block_added")
return c.performSubscription(params)
}
// SubscribeForNewTransactions adds subscription for new transaction events to
// this instance of client.
func (c *WSClient) SubscribeForNewTransactions() (string, error) {
params := request.NewRawParams("transaction_added")
return c.performSubscription(params)
}
// SubscribeForExecutionNotifications adds subscription for notifications
// generated during transaction execution to this instance of client.
func (c *WSClient) SubscribeForExecutionNotifications() (string, error) {
params := request.NewRawParams("notification_from_execution")
return c.performSubscription(params)
}
// SubscribeForTransactionExecutions adds subscription for application execution
// results generated during transaction execution to this instance of client.
func (c *WSClient) SubscribeForTransactionExecutions() (string, error) {
params := request.NewRawParams("transaction_executed")
return c.performSubscription(params)
}
// Unsubscribe removes subscription for given event stream.
func (c *WSClient) Unsubscribe(id string) error {
return c.performUnsubscription(id)
}
// UnsubscribeAll removes all active subscriptions of current client.
func (c *WSClient) UnsubscribeAll() error {
for id := range c.subscriptions {
err := c.performUnsubscription(id)
if err != nil {
return err
}
}
return nil
}