package client import ( "context" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) // Close closes connection to the remote side making // this client instance unusable. Closes notification // channel returned from Client.NotificationChannel(), // Removes all subscription. func (c *Client) Close() { // closing should be done via the channel // to prevent switching to another RPC node // in the notification loop close(c.closeChan) } // SubscribeForExecutionNotifications adds subscription for notifications // generated during contract transaction execution to this instance of client. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error { c.switchLock.Lock() defer c.switchLock.Unlock() if c.inactive { return ErrConnectionLost } _, subscribed := c.subscribedEvents[contract] if subscribed { // no need to subscribe one more time return nil } id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv) if err != nil { return err } c.subscribedEvents[contract] = id return nil } // SubscribeForNewBlocks adds subscription for new block events to this // instance of client. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) SubscribeForNewBlocks() error { c.switchLock.Lock() defer c.switchLock.Unlock() if c.inactive { return ErrConnectionLost } if c.subscribedToBlocks { // no need to subscribe one more time return nil } _, err := c.client.ReceiveBlocks(nil, c.blockRcv) if err != nil { return err } c.subscribedToBlocks = true return nil } // SubscribeForNotaryRequests adds subscription for notary request payloads // addition or removal events to this instance of client. Passed txSigner is // used as filter: subscription is only for the notary requests that must be // signed by txSigner. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error { if c.notary == nil { panic(notaryNotEnabledPanicMsg) } c.switchLock.Lock() defer c.switchLock.Unlock() if c.inactive { return ErrConnectionLost } _, subscribed := c.subscribedNotaryEvents[txSigner] if subscribed { // no need to subscribe one more time return nil } id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv) if err != nil { return err } c.subscribedNotaryEvents[txSigner] = id return nil } // UnsubscribeContract removes subscription for given contract event stream. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) UnsubscribeContract(contract util.Uint160) error { c.switchLock.Lock() defer c.switchLock.Unlock() if c.inactive { return ErrConnectionLost } _, subscribed := c.subscribedEvents[contract] if !subscribed { // no need to unsubscribe contract // without subscription return nil } err := c.client.Unsubscribe(c.subscribedEvents[contract]) if err != nil { return err } delete(c.subscribedEvents, contract) return nil } // UnsubscribeNotaryRequest removes subscription for given notary requests // signer. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) UnsubscribeNotaryRequest(signer util.Uint160) error { if c.notary == nil { panic(notaryNotEnabledPanicMsg) } c.switchLock.Lock() defer c.switchLock.Unlock() if c.inactive { return ErrConnectionLost } _, subscribed := c.subscribedNotaryEvents[signer] if !subscribed { // no need to unsubscribe signer's // requests without subscription return nil } err := c.client.Unsubscribe(c.subscribedNotaryEvents[signer]) if err != nil { return err } delete(c.subscribedNotaryEvents, signer) return nil } // UnsubscribeAll removes all active subscriptions of current client. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. func (c *Client) UnsubscribeAll() error { c.switchLock.Lock() defer c.switchLock.Unlock() if c.inactive { return ErrConnectionLost } // no need to unsubscribe if there are // no active subscriptions if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 && !c.subscribedToBlocks { return nil } err := c.client.UnsubscribeAll() if err != nil { return err } c.subscribedEvents = make(map[util.Uint160]string) c.subscribedNotaryEvents = make(map[util.Uint160]string) c.subscribedToBlocks = false return nil } // subsInfo includes channels for ws notifications; // cached subscription information. type subsInfo struct { blockRcv chan *block.Block notificationRcv chan *state.ContainedNotificationEvent notaryReqRcv chan *result.NotaryRequestEvent subscribedToBlocks bool subscribedEvents map[util.Uint160]string subscribedNotaryEvents map[util.Uint160]string } // restoreSubscriptions restores subscriptions according to cached // information about them. // // If it is NOT a background operation switchLock MUST be held. // Returns a pair: the second is a restoration status and the first // one contains subscription information applied to the passed cli // and receivers for the updated subscriptions. // Does not change Client instance. func (c *Client) restoreSubscriptions(ctx context.Context, cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) { var ( err error id string ) stopCh := make(chan struct{}) defer close(stopCh) blockRcv := make(chan *block.Block) notificationRcv := make(chan *state.ContainedNotificationEvent) notaryReqRcv := make(chan *result.NotaryRequestEvent) c.startListen(ctx, stopCh, blockRcv, notificationRcv, notaryReqRcv, background) if background { c.switchLock.RLock() defer c.switchLock.RUnlock() } si.subscribedToBlocks = c.subscribedToBlocks si.subscribedEvents = copySubsMap(c.subscribedEvents) si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents) si.blockRcv = blockRcv si.notificationRcv = notificationRcv si.notaryReqRcv = notaryReqRcv // new block events restoration if si.subscribedToBlocks { _, err = cli.ReceiveBlocks(nil, blockRcv) if err != nil { c.logger.Error("could not restore block subscription after RPC switch", zap.String("endpoint", endpoint), zap.Error(err), ) return } } // notification events restoration for contract := range si.subscribedEvents { contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890 id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv) if err != nil { c.logger.Error("could not restore notification subscription after RPC switch", zap.String("endpoint", endpoint), zap.Error(err), ) return } si.subscribedEvents[contract] = id } // notary notification events restoration if c.notary != nil { for signer := range si.subscribedNotaryEvents { signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890 id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv) if err != nil { c.logger.Error("could not restore notary notification subscription after RPC switch", zap.String("endpoint", endpoint), zap.Error(err), ) return } si.subscribedNotaryEvents[signer] = id } } return si, true } func (c *Client) startListen(ctx context.Context, stopCh <-chan struct{}, blockRcv <-chan *block.Block, notificationRcv <-chan *state.ContainedNotificationEvent, notaryReqRcv <-chan *result.NotaryRequestEvent, background bool) { // neo-go WS client says to _always_ read notifications // from its channel. Subscribing to any notification // while not reading them in another goroutine may // lead to a dead-lock, thus that async side notification // listening while restoring subscriptions go func() { var e any var ok bool for { select { case <-stopCh: return case e, ok = <-blockRcv: case e, ok = <-notificationRcv: case e, ok = <-notaryReqRcv: } if !ok { return } if background { // background client (test) switch, no need to send // any notification, just preventing dead-lock continue } c.routeEvent(ctx, e) } }() } func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string { newM := make(map[util.Uint160]string, len(m)) for k, v := range m { newM[k] = v } return newM }