neoneo-go/pkg/rpcclient/wsclient.go
Anna Shaleva 6dbae7edc4 rpcclient: fix WS-client unsubscription process
Do not block subscribers until the unsubscription request to RPC server
is completed. Otherwise, another notification may be received from the
RPC server which will block the unsubscription process.

At the same time, fix event-based waiter. We must not block the receiver
channel during unsubscription because there's a chance that subsequent
event will be sent by the server. We need to read this event in order not
to block the WSClient's readloop.
2022-11-16 23:44:30 +03:00

975 lines
31 KiB
Go

package rpcclient
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/neorpc/rpcevent"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/atomic"
)
// WSClient is a websocket-enabled RPC client that can be used with appropriate
// servers. It's supposed to be faster than Client because it has persistent
// connection to the server and at the same time it exposes some functionality
// that is only provided via websockets (like event subscription mechanism).
// WSClient is thread-safe and can be used from multiple goroutines to perform
// RPC requests.
type WSClient struct {
Client
// Notifications is a channel that is used to send events received from
// the server. Client's code is supposed to be reading from this channel if
// it wants to use subscription mechanism. Failing to do so will cause
// WSClient to block even regular requests. This channel is not buffered.
// In case of protocol error or upon connection closure, this channel will
// be closed, so make sure to handle this.
//
// Deprecated: please, use custom channels with ReceiveBlocks, ReceiveTransactions,
// ReceiveExecutionNotifications, ReceiveExecutions, ReceiveNotaryRequests
// methods to subscribe for notifications. This field will be removed in future
// versions.
Notifications chan Notification
ws *websocket.Conn
done chan struct{}
requests chan *neorpc.Request
shutdown chan struct{}
closeCalled atomic.Bool
closeErrLock sync.RWMutex
closeErr error
subscriptionsLock sync.RWMutex
subscriptions map[string]notificationReceiver
// receivers is a mapping from receiver channel to a set of corresponding subscription IDs.
// It must be accessed with subscriptionsLock taken. Its keys must be used to deliver
// notifications, if channel is not in the receivers list and corresponding subscription
// still exists, notification must not be sent.
receivers map[interface{}][]string
respLock sync.RWMutex
respChannels map[uint64]chan *neorpc.Response
}
// notificationReceiver is an interface aimed to provide WS subscriber functionality
// for different types of subscriptions.
type notificationReceiver interface {
// Comparator provides notification filtering functionality.
rpcevent.Comparator
// Receiver returns notification receiver channel.
Receiver() interface{}
// TrySend checks whether notification passes receiver filter and sends it
// to the underlying channel if so.
TrySend(ntf Notification) bool
// Close closes underlying receiver channel.
Close()
}
// blockReceiver stores information about block events subscriber.
type blockReceiver struct {
filter *neorpc.BlockFilter
ch chan<- *block.Block
}
// EventID implements neorpc.Comparator interface.
func (r *blockReceiver) EventID() neorpc.EventID {
return neorpc.BlockEventID
}
// Filter implements neorpc.Comparator interface.
func (r *blockReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *blockReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *blockReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*block.Block)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *blockReceiver) Close() {
close(r.ch)
}
// txReceiver stores information about transaction events subscriber.
type txReceiver struct {
filter *neorpc.TxFilter
ch chan<- *transaction.Transaction
}
// EventID implements neorpc.Comparator interface.
func (r *txReceiver) EventID() neorpc.EventID {
return neorpc.TransactionEventID
}
// Filter implements neorpc.Comparator interface.
func (r *txReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *txReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *txReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*transaction.Transaction)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *txReceiver) Close() {
close(r.ch)
}
// executionNotificationReceiver stores information about execution notifications subscriber.
type executionNotificationReceiver struct {
filter *neorpc.NotificationFilter
ch chan<- *state.ContainedNotificationEvent
}
// EventID implements neorpc.Comparator interface.
func (r *executionNotificationReceiver) EventID() neorpc.EventID {
return neorpc.NotificationEventID
}
// Filter implements neorpc.Comparator interface.
func (r *executionNotificationReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *executionNotificationReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *executionNotificationReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*state.ContainedNotificationEvent)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *executionNotificationReceiver) Close() {
close(r.ch)
}
// executionReceiver stores information about application execution results subscriber.
type executionReceiver struct {
filter *neorpc.ExecutionFilter
ch chan<- *state.AppExecResult
}
// EventID implements neorpc.Comparator interface.
func (r *executionReceiver) EventID() neorpc.EventID {
return neorpc.ExecutionEventID
}
// Filter implements neorpc.Comparator interface.
func (r *executionReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *executionReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *executionReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*state.AppExecResult)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *executionReceiver) Close() {
close(r.ch)
}
// notaryRequestReceiver stores information about notary requests subscriber.
type notaryRequestReceiver struct {
filter *neorpc.TxFilter
ch chan<- *result.NotaryRequestEvent
}
// EventID implements neorpc.Comparator interface.
func (r *notaryRequestReceiver) EventID() neorpc.EventID {
return neorpc.NotaryRequestEventID
}
// Filter implements neorpc.Comparator interface.
func (r *notaryRequestReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *notaryRequestReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *notaryRequestReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*result.NotaryRequestEvent)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *notaryRequestReceiver) Close() {
close(r.ch)
}
// naiveReceiver is a structure leaved for deprecated single channel based notifications
// delivering.
//
// Deprecated: this receiver must be removed after outdated subscriptions API removal.
type naiveReceiver struct {
eventID neorpc.EventID
filter interface{}
ch chan<- Notification
}
// EventID implements neorpc.Comparator interface.
func (r *naiveReceiver) EventID() neorpc.EventID {
return r.eventID
}
// Filter implements neorpc.Comparator interface.
func (r *naiveReceiver) Filter() interface{} {
return r.filter
}
// Receiver implements notificationReceiver interface.
func (r *naiveReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *naiveReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *naiveReceiver) Close() {
r.ch <- Notification{
Type: neorpc.MissedEventID, // backwards-compatible behaviour
}
}
// Notification represents a server-generated notification for client subscriptions.
// Value can be one of *block.Block, *state.AppExecResult, *state.ContainedNotificationEvent
// *transaction.Transaction or *subscriptions.NotaryRequestEvent based on Type.
type Notification struct {
Type neorpc.EventID
Value interface{}
}
// EventID implements Container interface and returns notification ID.
func (n Notification) EventID() neorpc.EventID {
return n.Type
}
// EventPayload implements Container interface and returns notification
// object.
func (n Notification) EventPayload() interface{} {
return n.Value
}
// requestResponse is a combined type for request and response since we can get
// any of them here.
type requestResponse struct {
neorpc.Response
Method string `json:"method"`
RawParams []json.RawMessage `json:"params,omitempty"`
}
const (
// Message limit for receiving side.
wsReadLimit = 10 * 1024 * 1024
// Disconnection timeout.
wsPongLimit = 60 * time.Second
// Ping period for connection liveness check.
wsPingPeriod = wsPongLimit / 2
// Write deadline.
wsWriteLimit = wsPingPeriod / 2
)
// ErrNilNotificationReceiver is returned when notification receiver channel is nil.
var ErrNilNotificationReceiver = errors.New("nil notification receiver")
// errConnClosedByUser is a WSClient error used iff the user calls (*WSClient).Close method by himself.
var errConnClosedByUser = errors.New("connection closed by user")
// NewWS returns a new WSClient ready to use (with established websocket
// connection). You need to use websocket URL for it like `ws://1.2.3.4/ws`.
// You should call Init method to initialize the network magic the client is
// operating on.
func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error) {
dialer := websocket.Dialer{HandshakeTimeout: opts.DialTimeout}
ws, resp, err := dialer.DialContext(ctx, endpoint, nil)
if resp != nil && resp.Body != nil { // Can be non-nil even with error returned.
defer resp.Body.Close() // Not exactly required by websocket, but let's do this for bodyclose checker.
}
if err != nil {
return nil, err
}
wsc := &WSClient{
Client: Client{},
Notifications: make(chan Notification),
ws: ws,
shutdown: make(chan struct{}),
done: make(chan struct{}),
closeCalled: *atomic.NewBool(false),
respChannels: make(map[uint64]chan *neorpc.Response),
requests: make(chan *neorpc.Request),
subscriptions: make(map[string]notificationReceiver),
receivers: make(map[interface{}][]string),
}
err = initClient(ctx, &wsc.Client, endpoint, opts)
if err != nil {
return nil, err
}
wsc.Client.cli = nil
go wsc.wsReader()
go wsc.wsWriter()
wsc.requestF = wsc.makeWsRequest
return wsc, nil
}
// Close closes connection to the remote side rendering this client instance
// unusable.
func (c *WSClient) Close() {
if c.closeCalled.CAS(false, true) {
c.setCloseErr(errConnClosedByUser)
// Closing shutdown channel sends a signal to wsWriter to break out of the
// loop. In doing so it does ws.Close() closing the network connection
// which in turn makes wsReader receive an err from ws.ReadJSON() and also
// break out of the loop closing c.done channel in its shutdown sequence.
close(c.shutdown)
// Call to cancel will send signal to all users of Context().
c.Client.ctxCancel()
}
<-c.done
}
func (c *WSClient) wsReader() {
c.ws.SetReadLimit(wsReadLimit)
c.ws.SetPongHandler(func(string) error {
err := c.ws.SetReadDeadline(time.Now().Add(wsPongLimit))
if err != nil {
c.setCloseErr(fmt.Errorf("failed to set pong read deadline: %w", err))
}
return err
})
var connCloseErr error
readloop:
for {
rr := new(requestResponse)
err := c.ws.SetReadDeadline(time.Now().Add(wsPongLimit))
if err != nil {
connCloseErr = fmt.Errorf("failed to set response read deadline: %w", err)
break readloop
}
err = c.ws.ReadJSON(rr)
if err != nil {
// Timeout/connection loss/malformed response.
connCloseErr = fmt.Errorf("failed to read JSON response (timeout/connection loss/malformed response): %w", err)
break readloop
}
if rr.ID == nil && rr.Method != "" {
event, err := neorpc.GetEventIDFromString(rr.Method)
if err != nil {
// Bad event received.
connCloseErr = fmt.Errorf("failed to perse event ID from string %s: %w", rr.Method, err)
break readloop
}
if event != neorpc.MissedEventID && len(rr.RawParams) != 1 {
// Bad event received.
connCloseErr = fmt.Errorf("bad event received: %s / %d", event, len(rr.RawParams))
break readloop
}
var val interface{}
switch event {
case neorpc.BlockEventID:
sr, err := c.StateRootInHeader()
if err != nil {
// Client is not initialized.
connCloseErr = fmt.Errorf("failed to fetch StateRootInHeader: %w", err)
break readloop
}
val = block.New(sr)
case neorpc.TransactionEventID:
val = &transaction.Transaction{}
case neorpc.NotificationEventID:
val = new(state.ContainedNotificationEvent)
case neorpc.ExecutionEventID:
val = new(state.AppExecResult)
case neorpc.NotaryRequestEventID:
val = new(result.NotaryRequestEvent)
case neorpc.MissedEventID:
// No value.
default:
// Bad event received.
connCloseErr = fmt.Errorf("unknown event received: %d", event)
break readloop
}
if event != neorpc.MissedEventID {
err = json.Unmarshal(rr.RawParams[0], val)
if err != nil {
// Bad event received.
connCloseErr = fmt.Errorf("failed to unmarshal event of type %s from JSON: %w", event, err)
break readloop
}
}
if event == neorpc.MissedEventID {
c.subscriptionsLock.Lock()
for rcvr, ids := range c.receivers {
c.subscriptions[ids[0]].Close()
delete(c.receivers, rcvr)
}
c.subscriptionsLock.Unlock()
continue readloop
}
c.subscriptionsLock.RLock()
ntf := Notification{Type: event, Value: val}
for _, ids := range c.receivers {
for _, id := range ids {
if c.subscriptions[id].TrySend(ntf) {
break // strictly one notification per channel
}
}
}
c.subscriptionsLock.RUnlock()
} else if rr.ID != nil && (rr.Error != nil || rr.Result != nil) {
id, err := strconv.ParseUint(string(rr.ID), 10, 64)
if err != nil {
connCloseErr = fmt.Errorf("failed to retrieve response ID from string %s: %w", string(rr.ID), err)
break readloop // Malformed response (invalid response ID).
}
ch := c.getResponseChannel(id)
if ch == nil {
connCloseErr = fmt.Errorf("unknown response channel for response %d", id)
break readloop // Unknown response (unexpected response ID).
}
ch <- &rr.Response
} else {
// Malformed response, neither valid request, nor valid response.
connCloseErr = fmt.Errorf("malformed response")
break readloop
}
}
if connCloseErr != nil {
c.setCloseErr(connCloseErr)
}
close(c.done)
c.respLock.Lock()
for _, ch := range c.respChannels {
close(ch)
}
c.respChannels = nil
c.respLock.Unlock()
close(c.Notifications)
c.Client.ctxCancel()
}
func (c *WSClient) wsWriter() {
pingTicker := time.NewTicker(wsPingPeriod)
defer c.ws.Close()
defer pingTicker.Stop()
var connCloseErr error
writeloop:
for {
select {
case <-c.shutdown:
return
case <-c.done:
return
case req, ok := <-c.requests:
if !ok {
return
}
if err := c.ws.SetWriteDeadline(time.Now().Add(c.opts.RequestTimeout)); err != nil {
connCloseErr = fmt.Errorf("failed to set request write deadline: %w", err)
break writeloop
}
if err := c.ws.WriteJSON(req); err != nil {
connCloseErr = fmt.Errorf("failed to write JSON request (%s / %d): %w", req.Method, len(req.Params), err)
break writeloop
}
case <-pingTicker.C:
if err := c.ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)); err != nil {
connCloseErr = fmt.Errorf("failed to set ping write deadline: %w", err)
break writeloop
}
if err := c.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
connCloseErr = fmt.Errorf("failed to write ping message: %w", err)
break writeloop
}
}
}
if connCloseErr != nil {
c.setCloseErr(connCloseErr)
}
}
func (c *WSClient) unregisterRespChannel(id uint64) {
c.respLock.Lock()
defer c.respLock.Unlock()
if ch, ok := c.respChannels[id]; ok {
delete(c.respChannels, id)
close(ch)
}
}
func (c *WSClient) getResponseChannel(id uint64) chan *neorpc.Response {
c.respLock.RLock()
defer c.respLock.RUnlock()
return c.respChannels[id]
}
func (c *WSClient) makeWsRequest(r *neorpc.Request) (*neorpc.Response, error) {
ch := make(chan *neorpc.Response)
c.respLock.Lock()
select {
case <-c.done:
c.respLock.Unlock()
return nil, errors.New("connection lost before registering response channel")
default:
c.respChannels[r.ID] = ch
c.respLock.Unlock()
}
select {
case <-c.done:
return nil, errors.New("connection lost before sending the request")
case c.requests <- r:
}
select {
case <-c.done:
return nil, errors.New("connection lost while waiting for the response")
case resp := <-ch:
c.unregisterRespChannel(r.ID)
return resp, nil
}
}
func (c *WSClient) performSubscription(params []interface{}, rcvr notificationReceiver) (string, error) {
var resp string
if err := c.performRequest("subscribe", params, &resp); err != nil {
return "", err
}
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
c.subscriptions[resp] = rcvr
ch := rcvr.Receiver()
c.receivers[ch] = append(c.receivers[ch], resp)
return resp, nil
}
// SubscribeForNewBlocks adds subscription for new block events to this instance
// of the client. It can be filtered by primary consensus node index, nil value doesn't
// add any filters.
//
// Deprecated: please, use ReceiveBlocks. This method will be removed in future versions.
func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) {
var flt *neorpc.BlockFilter
if primary != nil {
flt = &neorpc.BlockFilter{Primary: primary}
}
params := []interface{}{"block_added"}
if flt != nil {
params = append(params, *flt)
}
r := &naiveReceiver{
eventID: neorpc.BlockEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, r)
}
// ReceiveBlocks registers provided channel as a receiver for the new block events.
// Events can be filtered by the given BlockFilter, nil value doesn't add any filter.
// The receiver channel must be properly read and drained after usage in order not
// to block other notification receivers. If multiple subscriptions share the same
// receiver channel, then matching notification is only sent once per channel. The
// receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"block_added"}
if flt != nil {
params = append(params, *flt)
}
r := &blockReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// SubscribeForNewTransactions adds subscription for new transaction events to
// this instance of the client. It can be filtered by the sender and/or the signer, nil
// value is treated as missing filter.
//
// Deprecated: please, use ReceiveTransactions. This method will be removed in future versions.
func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *util.Uint160) (string, error) {
var flt *neorpc.TxFilter
if sender != nil || signer != nil {
flt = &neorpc.TxFilter{Sender: sender, Signer: signer}
}
params := []interface{}{"transaction_added"}
if flt != nil {
params = append(params, *flt)
}
r := &naiveReceiver{
eventID: neorpc.TransactionEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, r)
}
// ReceiveTransactions registers provided channel as a receiver for new transaction
// events. Events can be filtered by the given TxFilter, nil value doesn't add any
// filter. The receiver channel must be properly read and drained after usage in
// order not to block other notification receivers. If multiple subscriptions share
// the same receiver channel, then matching notification is only sent once per channel.
// The receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveTransactions(flt *neorpc.TxFilter, rcvr chan<- *transaction.Transaction) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"transaction_added"}
if flt != nil {
params = append(params, *flt)
}
r := &txReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// SubscribeForExecutionNotifications adds subscription for notifications
// generated during transaction execution to this instance of the client. It can be
// filtered by the contract's hash (that emits notifications), nil value puts no such
// restrictions.
//
// Deprecated: please, use ReceiveExecutionNotifications. This method will be removed in future versions.
func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, name *string) (string, error) {
var flt *neorpc.NotificationFilter
if contract != nil || name != nil {
flt = &neorpc.NotificationFilter{Contract: contract, Name: name}
}
params := []interface{}{"notification_from_execution"}
if flt != nil {
params = append(params, *flt)
}
r := &naiveReceiver{
eventID: neorpc.NotificationEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, r)
}
// ReceiveExecutionNotifications registers provided channel as a receiver for execution
// events. Events can be filtered by the given NotificationFilter, nil value doesn't add
// any filter. The receiver channel must be properly read and drained after usage in
// order not to block other notification receivers. If multiple subscriptions share the
// same receiver channel, then matching notification is only sent once per channel. The
// receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"notification_from_execution"}
if flt != nil {
params = append(params, *flt)
}
r := &executionNotificationReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// SubscribeForTransactionExecutions adds subscription for application execution
// results generated during transaction execution to this instance of the client. It can
// be filtered by state (HALT/FAULT) to check for successful or failing
// transactions, nil value means no filtering.
//
// Deprecated: please, use ReceiveExecutions. This method will be removed in future versions.
func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) {
var flt *neorpc.ExecutionFilter
if state != nil {
flt = &neorpc.ExecutionFilter{State: state}
}
params := []interface{}{"transaction_executed"}
if flt != nil {
if flt.State != nil {
if *flt.State != "HALT" && *flt.State != "FAULT" {
return "", errors.New("bad state parameter")
}
}
params = append(params, *flt)
}
r := &naiveReceiver{
eventID: neorpc.ExecutionEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, r)
}
// ReceiveExecutions registers provided channel as a receiver for
// application execution result events generated during transaction execution.
// Events can be filtered by the given ExecutionFilter, nil value doesn't add any filter.
// The receiver channel must be properly read and drained after usage in order not
// to block other notification receivers. If multiple subscriptions share the same
// receiver channel, then matching notification is only sent once per channel. The
// receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"transaction_executed"}
if flt != nil {
if flt.State != nil {
if *flt.State != "HALT" && *flt.State != "FAULT" {
return "", errors.New("bad state parameter")
}
}
params = append(params, *flt)
}
r := &executionReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// SubscribeForNotaryRequests adds subscription for notary request payloads
// addition or removal events to this instance of client. It can be filtered by
// request sender's hash, or main tx signer's hash, nil value puts no such
// restrictions.
//
// Deprecated: please, use ReceiveNotaryRequests. This method will be removed in future versions.
func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) {
var flt *neorpc.TxFilter
if sender != nil || mainSigner != nil {
flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner}
}
params := []interface{}{"notary_request_event"}
if flt != nil {
params = append(params, *flt)
}
r := &naiveReceiver{
eventID: neorpc.NotaryRequestEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, r)
}
// ReceiveNotaryRequests registers provided channel as a receiver for notary request
// payload addition or removal events. Events can be filtered by the given TxFilter
// where sender corresponds to notary request sender (the second fallback transaction
// signer) and signer corresponds to main transaction signers. nil value doesn't add
// any filter. The receiver channel must be properly read and drained after usage in
// order not to block other notification receivers. If multiple subscriptions share
// the same receiver channel, then matching notification is only sent once per channel.
// The receiver channel will be closed by the WSClient immediately after MissedEvent
// is received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveNotaryRequests(flt *neorpc.TxFilter, rcvr chan<- *result.NotaryRequestEvent) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"notary_request_event"}
if flt != nil {
params = append(params, *flt)
}
r := &notaryRequestReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// Unsubscribe removes subscription for the given event stream. It will return an
// error in case if there's no subscription with the provided ID. Call to Unsubscribe
// doesn't block notifications receive process for given subscriber, thus, ensure
// that subscriber channel is properly drained while unsubscription is being
// performed. You may probably need to run unsubscription process in a separate
// routine (in parallel with notification receiver routine) to avoid Client's
// notification dispatcher blocking.
func (c *WSClient) Unsubscribe(id string) error {
return c.performUnsubscription(id)
}
// UnsubscribeAll removes all active subscriptions of the current client. It copies
// the list of subscribers in order not to hold the lock for the whole execution
// time and tries to unsubscribe from us many feeds as possible returning the
// chunk of unsubscription errors afterwards. Call to UnsubscribeAll doesn't block
// notifications receive process for given subscribers, thus, ensure that subscribers
// channels are properly drained while unsubscription is being performed. You may
// probably need to run unsubscription process in a separate routine (in parallel
// with notification receiver routines) to avoid Client's notification dispatcher
// blocking.
func (c *WSClient) UnsubscribeAll() error {
c.subscriptionsLock.Lock()
subs := make([]string, 0, len(c.subscriptions))
for id := range c.subscriptions {
subs = append(subs, id)
}
c.subscriptionsLock.Unlock()
var resErr error
for _, id := range subs {
err := c.performUnsubscription(id)
if err != nil {
errFmt := "failed to unsubscribe from feed %d: %v"
errArgs := []interface{}{err}
if resErr != nil {
errFmt = "%w; " + errFmt
errArgs = append([]interface{}{resErr}, errArgs...)
}
resErr = fmt.Errorf(errFmt, errArgs...)
}
}
return resErr
}
// performUnsubscription is internal method that removes subscription with the given
// ID from the list of subscriptions and receivers. It takes the subscriptions lock
// after WS RPC unsubscription request is completed. Until then the subscriber channel
// may still receive WS notifications.
func (c *WSClient) performUnsubscription(id string) error {
var resp bool
if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil {
return err
}
if !resp {
return errors.New("unsubscribe method returned false result")
}
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
rcvr, ok := c.subscriptions[id]
if !ok {
return errors.New("no subscription with this ID")
}
ch := rcvr.Receiver()
ids := c.receivers[ch]
for i, rcvrID := range ids {
if rcvrID == id {
ids = append(ids[:i], ids[i+1:]...)
break
}
}
if len(ids) == 0 {
delete(c.receivers, ch)
} else {
c.receivers[ch] = ids
}
delete(c.subscriptions, id)
return nil
}
// setCloseErr is a thread-safe method setting closeErr in case if it's not yet set.
func (c *WSClient) setCloseErr(err error) {
c.closeErrLock.Lock()
defer c.closeErrLock.Unlock()
if c.closeErr == nil {
c.closeErr = err
}
}
// GetError returns the reason of WS connection closing. It returns nil in case if connection
// was closed by the use via Close() method calling.
func (c *WSClient) GetError() error {
c.closeErrLock.RLock()
defer c.closeErrLock.RUnlock()
if c.closeErr != nil && errors.Is(c.closeErr, errConnClosedByUser) {
return nil
}
return c.closeErr
}
// Context returns WSClient Cancel context that will be terminated on Client shutdown.
func (c *WSClient) Context() context.Context {
return c.Client.ctx
}