diff --git a/ROADMAP.md b/ROADMAP.md index cd747077f..eeab55fc8 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -26,16 +26,6 @@ APIs/commands/configurations will be removed and here is a list of scheduled breaking changes. Consider changing your code/scripts/configurations if you're using anything mentioned here. -## WSClient Notifications channel and SubscribeFor* APIs - -Version 0.99.5 of NeoGo introduces a new set of subscription APIs that gives -more control to the WSClient user that can pass specific channels to be used -for specific subscriptions now. Old APIs and generic Notifications channel are -still available, but will be removed, so please convert your code to using new -Receive* APIs. - -Removal of these APIs is scheduled for May 2023 (~0.103.0 release). - ## SecondsPerBlock protocol configuration With 0.100.0 version SecondsPerBlock protocol configuration setting was diff --git a/pkg/rpcclient/local.go b/pkg/rpcclient/local.go index 345566d71..0c1a33967 100644 --- a/pkg/rpcclient/local.go +++ b/pkg/rpcclient/local.go @@ -27,8 +27,7 @@ type Internal struct { func NewInternal(ctx context.Context, register InternalHook) (*Internal, error) { c := &Internal{ WSClient: WSClient{ - Client: Client{}, - Notifications: make(chan Notification), + Client: Client{}, shutdown: make(chan struct{}), done: make(chan struct{}), @@ -67,7 +66,6 @@ eventloop: } } close(c.done) - close(c.Notifications) c.ctxCancel() // ctx is cancelled, server is notified and will finish soon. drainloop: diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index b11d611a5..c86798b91 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -16,7 +16,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/neorpc/rpcevent" - "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" ) @@ -56,20 +55,6 @@ import ( // CloseNotificationChannelIfFull option is on. type WSClient struct { Client - // Notifications is a channel that is used to send events received from - // the server. Client's code is supposed to be reading from this channel if - // it wants to use subscription mechanism. Failing to do so will cause - // WSClient to block even regular requests. This channel is not buffered. - // In case of protocol error or upon connection closure, this channel will - // be closed, so make sure to handle this. Make sure you're not changing the - // received notifications, as it may affect the functionality of other - // notification receivers. - // - // Deprecated: please, use custom channels with ReceiveBlocks, ReceiveTransactions, - // ReceiveExecutionNotifications, ReceiveExecutions, ReceiveNotaryRequests - // methods to subscribe for notifications. This field will be removed in future - // versions. - Notifications chan Notification ws *websocket.Conn wsOpts WSOptions @@ -360,56 +345,6 @@ func (r *notaryRequestReceiver) Close() { close(r.ch) } -// naiveReceiver is a structure leaved for deprecated single channel based notifications -// delivering. -// -// Deprecated: this receiver must be removed after outdated subscriptions API removal. -type naiveReceiver struct { - eventID neorpc.EventID - filter any - ch chan<- Notification -} - -// EventID implements neorpc.Comparator interface. -func (r *naiveReceiver) EventID() neorpc.EventID { - return r.eventID -} - -// Filter implements neorpc.Comparator interface. -func (r *naiveReceiver) Filter() any { - return r.filter -} - -// Receiver implements notificationReceiver interface. -func (r *naiveReceiver) Receiver() any { - return r.ch -} - -// TrySend implements notificationReceiver interface. -func (r *naiveReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) { - if rpcevent.Matches(r, ntf) { - if nonBlocking { - select { - case r.ch <- ntf: - default: - return true, true - } - } else { - r.ch <- ntf - } - - return true, false - } - return false, false -} - -// Close implements notificationReceiver interface. -func (r *naiveReceiver) Close() { - r.ch <- Notification{ - Type: neorpc.MissedEventID, // backwards-compatible behaviour - } -} - // Notification represents a server-generated notification for client subscriptions. // Value can be one of *block.Block, *state.AppExecResult, *state.ContainedNotificationEvent // *transaction.Transaction or *subscriptions.NotaryRequestEvent based on Type. @@ -485,8 +420,7 @@ func NewWS(ctx context.Context, endpoint string, opts WSOptions) (*WSClient, err return nil, err } wsc := &WSClient{ - Client: Client{}, - Notifications: make(chan Notification), + Client: Client{}, ws: ws, wsOpts: opts, @@ -634,21 +568,14 @@ readloop: } // dropSubCh closes corresponding subscriber's channel and removes it from the -// receivers map. If the channel belongs to a naive subscriber then it will be -// closed manually without call to Close(). The channel is still being kept in +// receivers map. The channel is still being kept in // the subscribers map as technically the server-side subscription still exists // and the user is responsible for unsubscription. This method must be called // under subscriptionsLock taken. It's the caller's duty to ensure dropSubCh // will be called once per channel, otherwise panic will occur. func (c *WSClient) dropSubCh(rcvrCh any, id string, ignoreCloseNotificationChannelIfFull bool) { if ignoreCloseNotificationChannelIfFull || c.wsOpts.CloseNotificationChannelIfFull { - rcvr := c.subscriptions[id] - _, ok := rcvr.(*naiveReceiver) - if ok { // naiveReceiver uses c.Notifications that should be handled separately. - close(c.Notifications) - } else { - c.subscriptions[id].Close() - } + c.subscriptions[id].Close() delete(c.receivers, rcvrCh) } } @@ -778,29 +705,6 @@ func (c *WSClient) performSubscription(params []any, rcvr notificationReceiver) return resp, nil } -// SubscribeForNewBlocks adds subscription for new block events to this instance -// of the client. It can be filtered by primary consensus node index, nil value doesn't -// add any filters. -// -// Deprecated: please, use ReceiveBlocks. This method will be removed in future versions. -func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) { - var flt any - if primary != nil { - var f = neorpc.BlockFilter{Primary: primary} - flt = *f.Copy() - } - params := []any{"block_added"} - if flt != nil { - params = append(params, flt) - } - r := &naiveReceiver{ - eventID: neorpc.BlockEventID, - filter: flt, - ch: c.Notifications, - } - return c.performSubscription(params, r) -} - // ReceiveBlocks registers provided channel as a receiver for the new block events. // Events can be filtered by the given BlockFilter, nil value doesn't add any filter. // See WSClient comments for generic Receive* behaviour details. @@ -820,29 +724,6 @@ func (c *WSClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Blo return c.performSubscription(params, r) } -// SubscribeForNewTransactions adds subscription for new transaction events to -// this instance of the client. It can be filtered by the sender and/or the signer, nil -// value is treated as missing filter. -// -// Deprecated: please, use ReceiveTransactions. This method will be removed in future versions. -func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *util.Uint160) (string, error) { - var flt any - if sender != nil || signer != nil { - var f = neorpc.TxFilter{Sender: sender, Signer: signer} - flt = *f.Copy() - } - params := []any{"transaction_added"} - if flt != nil { - params = append(params, flt) - } - r := &naiveReceiver{ - eventID: neorpc.TransactionEventID, - filter: flt, - ch: c.Notifications, - } - return c.performSubscription(params, r) -} - // ReceiveTransactions registers provided channel as a receiver for new transaction // events. Events can be filtered by the given TxFilter, nil value doesn't add any // filter. See WSClient comments for generic Receive* behaviour details. @@ -862,30 +743,6 @@ func (c *WSClient) ReceiveTransactions(flt *neorpc.TxFilter, rcvr chan<- *transa return c.performSubscription(params, r) } -// SubscribeForExecutionNotifications adds subscription for notifications -// generated during transaction execution to this instance of the client. It can be -// filtered by the contract's hash (that emits notifications), nil value puts no such -// restrictions. -// -// Deprecated: please, use ReceiveExecutionNotifications. This method will be removed in future versions. -func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, name *string) (string, error) { - var flt any - if contract != nil || name != nil { - var f = neorpc.NotificationFilter{Contract: contract, Name: name} - flt = *f.Copy() - } - params := []any{"notification_from_execution"} - if flt != nil { - params = append(params, flt) - } - r := &naiveReceiver{ - eventID: neorpc.NotificationEventID, - filter: flt, - ch: c.Notifications, - } - return c.performSubscription(params, r) -} - // ReceiveExecutionNotifications registers provided channel as a receiver for execution // events. Events can be filtered by the given NotificationFilter, nil value doesn't add // any filter. See WSClient comments for generic Receive* behaviour details. @@ -905,33 +762,6 @@ func (c *WSClient) ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, return c.performSubscription(params, r) } -// SubscribeForTransactionExecutions adds subscription for application execution -// results generated during transaction execution to this instance of the client. It can -// be filtered by state (HALT/FAULT) to check for successful or failing -// transactions, nil value means no filtering. -// -// Deprecated: please, use ReceiveExecutions. This method will be removed in future versions. -func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) { - var flt any - if state != nil { - if *state != "HALT" && *state != "FAULT" { - return "", errors.New("bad state parameter") - } - var f = neorpc.ExecutionFilter{State: state} - flt = *f.Copy() - } - params := []any{"transaction_executed"} - if flt != nil { - params = append(params, flt) - } - r := &naiveReceiver{ - eventID: neorpc.ExecutionEventID, - filter: flt, - ch: c.Notifications, - } - return c.performSubscription(params, r) -} - // ReceiveExecutions registers provided channel as a receiver for // application execution result events generated during transaction execution. // Events can be filtered by the given ExecutionFilter, nil value doesn't add any filter. @@ -957,30 +787,6 @@ func (c *WSClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *s return c.performSubscription(params, r) } -// SubscribeForNotaryRequests adds subscription for notary request payloads -// addition or removal events to this instance of client. It can be filtered by -// request sender's hash, or main tx signer's hash, nil value puts no such -// restrictions. -// -// Deprecated: please, use ReceiveNotaryRequests. This method will be removed in future versions. -func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) { - var flt any - if sender != nil || mainSigner != nil { - var f = neorpc.TxFilter{Sender: sender, Signer: mainSigner} - flt = *f.Copy() - } - params := []any{"notary_request_event"} - if flt != nil { - params = append(params, flt) - } - r := &naiveReceiver{ - eventID: neorpc.NotaryRequestEventID, - filter: flt, - ch: c.Notifications, - } - return c.performSubscription(params, r) -} - // ReceiveNotaryRequests registers provided channel as a receiver for notary request // payload addition or removal events. Events can be filtered by the given TxFilter // where sender corresponds to notary request sender (the second fallback transaction diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index 7f3db1d27..62ee4c1f0 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -205,31 +205,23 @@ func TestWSClientEvents(t *testing.T) { wsc.receivers[chan<- *state.AppExecResult(aerCh3)] = []string{"6"} // MissedEvent must close the channels above. - wsc.subscriptions["7"] = &naiveReceiver{eventID: neorpc.BlockEventID, ch: wsc.Notifications} - wsc.subscriptions["8"] = &naiveReceiver{eventID: neorpc.BlockEventID, ch: wsc.Notifications} // check duplicating subscriptions - wsc.subscriptions["9"] = &naiveReceiver{eventID: neorpc.ExecutionEventID, ch: wsc.Notifications} // check different events - wsc.receivers[wsc.Notifications] = []string{"7", "8", "9"} wsc.subscriptionsLock.Unlock() var ( b1Cnt, b2Cnt int aer1Cnt, aer2Cnt, aer3Cnt int ntfCnt int - defaultCount int - expectedb1Cnt, expectedb2Cnt = 1, 1 // single Block event - expectedaer1Cnt, expectedaer2Cnt, expectedaer3Cnt = 2, 2, 0 // two HALTED AERs - expectedntfCnt = 1 // single notification event - expectedDefaultCnt = 1 + 2 + 1 // single Block event + two AERs + missed event + expectedb1Cnt, expectedb2Cnt = 1, 1 // single Block event + expectedaer1Cnt, expectedaer2Cnt, expectedaer3Cnt = 2, 2, 0 // two HALTED AERs + expectedntfCnt = 1 // single notification event aer *state.AppExecResult ) for b1Cnt+b2Cnt+ aer1Cnt+aer2Cnt+aer3Cnt+ - ntfCnt+ - defaultCount != + ntfCnt != expectedb1Cnt+expectedb2Cnt+ expectedaer1Cnt+expectedaer2Cnt+expectedaer3Cnt+ - expectedntfCnt+ - expectedDefaultCnt { + expectedntfCnt { select { case _, ok = <-bCh1: if ok { @@ -256,10 +248,6 @@ func TestWSClientEvents(t *testing.T) { if ok { ntfCnt++ } - case _, ok = <-wsc.Notifications: - if ok { - defaultCount++ - } case <-time.After(time.Second): t.Fatal("timeout waiting for event") } @@ -270,7 +258,6 @@ func TestWSClientEvents(t *testing.T) { assert.Equal(t, expectedaer2Cnt, aer2Cnt) assert.Equal(t, expectedaer3Cnt, aer3Cnt) assert.Equal(t, expectedntfCnt, ntfCnt) - assert.Equal(t, expectedDefaultCnt, defaultCount) // Channels must be closed by server _, ok = <-bCh1 diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index b2ac2f9f0..d65274a2a 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -2066,89 +2066,6 @@ func TestWSClient_SubscriptionsCompat(t *testing.T) { bCount++ return b1, primary, sender, ntfName, st } - checkDeprecated := func(t *testing.T, filtered bool) { - var ( - bID, txID, ntfID, aerID string - err error - ) - b, primary, sender, ntfName, st := getData(t) - if filtered { - bID, err = c.SubscribeForNewBlocks(&primary) //nolint:staticcheck // SA1019: c.SubscribeForNewBlocks is deprecated - require.NoError(t, err) - txID, err = c.SubscribeForNewTransactions(&sender, nil) //nolint:staticcheck // SA1019: c.SubscribeForNewTransactions is deprecated - require.NoError(t, err) - ntfID, err = c.SubscribeForExecutionNotifications(nil, &ntfName) //nolint:staticcheck // SA1019: c.SubscribeForExecutionNotifications is deprecated - require.NoError(t, err) - aerID, err = c.SubscribeForTransactionExecutions(&st) //nolint:staticcheck // SA1019: c.SubscribeForTransactionExecutions is deprecated - require.NoError(t, err) - } else { - bID, err = c.SubscribeForNewBlocks(nil) //nolint:staticcheck // SA1019: c.SubscribeForNewBlocks is deprecated - require.NoError(t, err) - txID, err = c.SubscribeForNewTransactions(nil, nil) //nolint:staticcheck // SA1019: c.SubscribeForNewTransactions is deprecated - require.NoError(t, err) - ntfID, err = c.SubscribeForExecutionNotifications(nil, nil) //nolint:staticcheck // SA1019: c.SubscribeForExecutionNotifications is deprecated - require.NoError(t, err) - aerID, err = c.SubscribeForTransactionExecutions(nil) //nolint:staticcheck // SA1019: c.SubscribeForTransactionExecutions is deprecated - require.NoError(t, err) - } - - var ( - lock sync.RWMutex - received byte - exitCh = make(chan struct{}) - ) - go func() { - dispatcher: - for { - select { - case ntf := <-c.Notifications: //nolint:staticcheck // SA1019: c.Notifications is deprecated - lock.Lock() - switch ntf.Type { - case neorpc.BlockEventID: - received |= 1 - case neorpc.TransactionEventID: - received |= 1 << 1 - case neorpc.NotificationEventID: - received |= 1 << 2 - case neorpc.ExecutionEventID: - received |= 1 << 3 - } - lock.Unlock() - case <-exitCh: - break dispatcher - } - } - drainLoop: - for { - select { - case <-c.Notifications: //nolint:staticcheck // SA1019: c.Notifications is deprecated - default: - break drainLoop - } - } - }() - - // Accept the next block and wait for events. - require.NoError(t, chain.AddBlock(b)) - assert.Eventually(t, func() bool { - lock.RLock() - defer lock.RUnlock() - - return received == 1<<4-1 - }, time.Second, 100*time.Millisecond) - - require.NoError(t, c.Unsubscribe(bID)) - require.NoError(t, c.Unsubscribe(txID)) - require.NoError(t, c.Unsubscribe(ntfID)) - require.NoError(t, c.Unsubscribe(aerID)) - exitCh <- struct{}{} - } - t.Run("deprecated, filtered", func(t *testing.T) { - checkDeprecated(t, true) - }) - t.Run("deprecated, non-filtered", func(t *testing.T) { - checkDeprecated(t, false) - }) checkRelevant := func(t *testing.T, filtered bool) { b, primary, sender, ntfName, st := getData(t)