From 35fdf6f315dfc58c1bfcc8855aea9a08cc2ba461 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 11 May 2023 17:05:24 +0300 Subject: [PATCH] [#337] morph: Move subscription logic to subscriber Signed-off-by: Evgenii Stratonikov --- pkg/morph/client/client.go | 26 +-- pkg/morph/client/constructor.go | 27 +-- pkg/morph/client/multi.go | 157 +++------------- pkg/morph/client/notifications.go | 280 ++--------------------------- pkg/morph/subscriber/subscriber.go | 275 ++++++++++++++++++++-------- 5 files changed, 258 insertions(+), 507 deletions(-) diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 1c33fa5e..83231501 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -69,9 +69,6 @@ type Client struct { // on every normal call. switchLock *sync.RWMutex - notifications chan rpcclient.Notification - subsInfo // protected with switchLock - // channel for internal stop closeChan chan struct{} @@ -566,26 +563,11 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (val // NotificationChannel returns channel than receives subscribed // notification from the connected RPC node. -// Channel is closed when connection to the RPC node has been -// lost without the possibility of recovery. +// Channel is closed when connection to the RPC node is lost. func (c *Client) NotificationChannel() <-chan rpcclient.Notification { - return c.notifications -} - -// inactiveMode switches Client to an inactive mode: -// - notification channel is closed; -// - all the new RPC request would return ErrConnectionLost; -// - inactiveModeCb is called if not nil. -func (c *Client) inactiveMode() { - c.switchLock.Lock() - defer c.switchLock.Unlock() - - close(c.notifications) - c.inactive = true - - if c.cfg.inactiveModeCb != nil { - c.cfg.inactiveModeCb() - } + c.switchLock.RLock() + defer c.switchLock.RUnlock() + return c.client.Notifications //lint:ignore SA1019 waits for neo-go v0.102.0 https://github.com/nspcc-dev/neo-go/pull/2980 } func (c *Client) setActor(act *actor.Actor) { diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 4232b349..1f2a1eb8 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -10,11 +10,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" lru "github.com/hashicorp/golang-lru/v2" - "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/util" @@ -108,21 +105,13 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er } cli := &Client{ - cache: newClientCache(), - logger: cfg.logger, - acc: acc, - accAddr: accAddr, - cfg: *cfg, - switchLock: &sync.RWMutex{}, - notifications: make(chan rpcclient.Notification), - subsInfo: subsInfo{ - blockRcv: make(chan *block.Block), - notificationRcv: make(chan *state.ContainedNotificationEvent), - notaryReqRcv: make(chan *result.NotaryRequestEvent), - subscribedEvents: make(map[util.Uint160]string), - subscribedNotaryEvents: make(map[util.Uint160]string), - }, - closeChan: make(chan struct{}), + cache: newClientCache(), + logger: cfg.logger, + acc: acc, + accAddr: accAddr, + cfg: *cfg, + switchLock: &sync.RWMutex{}, + closeChan: make(chan struct{}), } cli.endpoints.init(cfg.endpoints) @@ -162,7 +151,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er } cli.setActor(act) - go cli.notificationLoop(ctx) + go cli.closeWaiter(ctx) return cli, nil } diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index fab90b44..e006ca69 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -6,11 +6,6 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/neorpc" - "github.com/nspcc-dev/neo-go/pkg/neorpc/result" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" "go.uber.org/zap" ) @@ -34,7 +29,8 @@ func (e *endpoints) init(ee []Endpoint) { e.list = ee } -func (c *Client) switchRPC(ctx context.Context) bool { +// SwitchRPC performs reconnection and returns true if it was successful. +func (c *Client) SwitchRPC(ctx context.Context) bool { c.switchLock.Lock() defer c.switchLock.Unlock() @@ -58,20 +54,8 @@ func (c *Client) switchRPC(ctx context.Context) bool { c.logger.Info(logs.ClientConnectionToTheNewRPCNodeHasBeenEstablished, zap.String("endpoint", newEndpoint)) - subs, ok := c.restoreSubscriptions(ctx, cli, newEndpoint, false) - if !ok { - // new WS client does not allow - // restoring subscription, client - // could not work correctly => - // closing connection to RPC node - // to switch to another one - cli.Close() - continue - } - c.client = cli c.setActor(act) - c.subsInfo = subs if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() && c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority { @@ -82,97 +66,21 @@ func (c *Client) switchRPC(ctx context.Context) bool { return true } + c.inactive = true + + if c.cfg.inactiveModeCb != nil { + c.cfg.inactiveModeCb() + } return false } -func (c *Client) notificationLoop(ctx context.Context) { - var e any - var ok bool - - for { - c.switchLock.RLock() - bChan := c.blockRcv - nChan := c.notificationRcv - nrChan := c.notaryReqRcv - c.switchLock.RUnlock() - - select { - case <-ctx.Done(): - _ = c.UnsubscribeAll() - c.close() - - return - case <-c.closeChan: - _ = c.UnsubscribeAll() - c.close() - - return - case e, ok = <-bChan: - case e, ok = <-nChan: - case e, ok = <-nrChan: - } - - if ok { - c.routeEvent(ctx, e) - continue - } - - if !c.reconnect(ctx) { - return - } - } -} - -func (c *Client) routeEvent(ctx context.Context, e any) { - typedNotification := rpcclient.Notification{Value: e} - - switch e.(type) { - case *block.Block: - typedNotification.Type = neorpc.BlockEventID - case *state.ContainedNotificationEvent: - typedNotification.Type = neorpc.NotificationEventID - case *result.NotaryRequestEvent: - typedNotification.Type = neorpc.NotaryRequestEventID - } - +func (c *Client) closeWaiter(ctx context.Context) { select { - case c.notifications <- typedNotification: case <-ctx.Done(): - _ = c.UnsubscribeAll() - c.close() case <-c.closeChan: - _ = c.UnsubscribeAll() - c.close() } -} - -func (c *Client) reconnect(ctx context.Context) bool { - if closeErr := c.client.GetError(); closeErr != nil { - c.logger.Warn(logs.ClientSwitchingToTheNextRPCNode, - zap.String("reason", closeErr.Error()), - ) - } else { - // neo-go client was closed by calling `Close` - // method, that happens only when a client has - // switched to the more prioritized RPC - return true - } - - if !c.switchRPC(ctx) { - c.logger.Error(logs.ClientCouldNotEstablishConnectionToAnyRPCNode) - - // could not connect to all endpoints => - // switch client to inactive mode - c.inactiveMode() - - return false - } - - // TODO(@carpawell): call here some callback retrieved in constructor - // of the client to allow checking chain state since during switch - // process some notification could be lost - - return true + _ = c.UnsubscribeAll() + c.close() } func (c *Client) switchToMostPrioritized(ctx context.Context) { @@ -218,36 +126,28 @@ mainLoop: continue } - if subs, ok := c.restoreSubscriptions(ctx, cli, tryE, true); ok { - c.switchLock.Lock() - - // higher priority node could have been - // connected in the other goroutine - if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority { - cli.Close() - c.switchLock.Unlock() - return - } - - c.client.Close() - c.cache.invalidate() - c.client = cli - c.setActor(act) - c.subsInfo = subs - c.endpoints.curr = i + c.switchLock.Lock() + // higher priority node could have been + // connected in the other goroutine + if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority { + cli.Close() c.switchLock.Unlock() - - c.logger.Info(logs.ClientSwitchedToTheHigherPriorityRPC, - zap.String("endpoint", tryE)) - return } - c.logger.Warn(logs.ClientCouldNotRestoreSideChainSubscriptionsUsingNode, - zap.String("endpoint", tryE), - zap.Error(err), - ) + c.client.Close() + c.cache.invalidate() + c.client = cli + c.setActor(act) + c.endpoints.curr = i + + c.switchLock.Unlock() + + c.logger.Info(logs.ClientSwitchedToTheHigherPriorityRPC, + zap.String("endpoint", tryE)) + + return } } } @@ -255,6 +155,7 @@ mainLoop: // close closes notification channel and wrapped WS client. func (c *Client) close() { - close(c.notifications) + c.switchLock.RLock() + defer c.switchLock.RUnlock() c.client.Close() } diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 69eafc65..dbca00d7 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -1,16 +1,11 @@ package client import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/util" - "go.uber.org/zap" ) // Close closes connection to the remote side making @@ -24,71 +19,46 @@ func (c *Client) Close() { close(c.closeChan) } -// SubscribeForExecutionNotifications adds subscription for notifications -// generated during contract transaction execution to this instance of client. +// ReceiveExecutionNotifications performs subscription for notifications +// generated during contract execution. Events are sent to the specified channel. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. -func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error { +func (c *Client) ReceiveExecutionNotifications(contract util.Uint160, ch chan<- *state.ContainedNotificationEvent) (string, error) { c.switchLock.Lock() defer c.switchLock.Unlock() if c.inactive { - return ErrConnectionLost + return "", ErrConnectionLost } - _, subscribed := c.subscribedEvents[contract] - if subscribed { - // no need to subscribe one more time - return nil - } - - id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv) - if err != nil { - return err - } - - c.subscribedEvents[contract] = id - - return nil + return c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ch) } -// SubscribeForNewBlocks adds subscription for new block events to this -// instance of client. +// ReceiveBlocks performs subscription for new block events. Events are sent +// to the specified channel. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. -func (c *Client) SubscribeForNewBlocks() error { +func (c *Client) ReceiveBlocks(ch chan<- *block.Block) (string, error) { c.switchLock.Lock() defer c.switchLock.Unlock() if c.inactive { - return ErrConnectionLost + return "", ErrConnectionLost } - if c.subscribedToBlocks { - // no need to subscribe one more time - return nil - } - - _, err := c.client.ReceiveBlocks(nil, c.blockRcv) - if err != nil { - return err - } - - c.subscribedToBlocks = true - - return nil + return c.client.ReceiveBlocks(nil, ch) } -// SubscribeForNotaryRequests adds subscription for notary request payloads +// ReceiveNotaryRequests performsn subscription for notary request payloads // addition or removal events to this instance of client. Passed txSigner is // used as filter: subscription is only for the notary requests that must be -// signed by txSigner. +// signed by txSigner. Events are sent to the specified channel. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. -func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error { +func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160, ch chan<- *result.NotaryRequestEvent) (string, error) { if c.notary == nil { panic(notaryNotEnabledPanicMsg) } @@ -97,30 +67,17 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error { defer c.switchLock.Unlock() if c.inactive { - return ErrConnectionLost + return "", ErrConnectionLost } - _, subscribed := c.subscribedNotaryEvents[txSigner] - if subscribed { - // no need to subscribe one more time - return nil - } - - id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv) - if err != nil { - return err - } - - c.subscribedNotaryEvents[txSigner] = id - - return nil + return c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, ch) } -// UnsubscribeContract removes subscription for given contract event stream. +// Unsubscribe performs unsubscription for the given subscription ID. // // Returns ErrConnectionLost if client has not been able to establish // connection to any of passed RPC endpoints. -func (c *Client) UnsubscribeContract(contract util.Uint160) error { +func (c *Client) Unsubscribe(subID string) error { c.switchLock.Lock() defer c.switchLock.Unlock() @@ -128,55 +85,7 @@ func (c *Client) UnsubscribeContract(contract util.Uint160) error { return ErrConnectionLost } - _, subscribed := c.subscribedEvents[contract] - if !subscribed { - // no need to unsubscribe contract - // without subscription - return nil - } - - err := c.client.Unsubscribe(c.subscribedEvents[contract]) - if err != nil { - return err - } - - delete(c.subscribedEvents, contract) - - return nil -} - -// UnsubscribeNotaryRequest removes subscription for given notary requests -// signer. -// -// Returns ErrConnectionLost if client has not been able to establish -// connection to any of passed RPC endpoints. -func (c *Client) UnsubscribeNotaryRequest(signer util.Uint160) error { - if c.notary == nil { - panic(notaryNotEnabledPanicMsg) - } - - c.switchLock.Lock() - defer c.switchLock.Unlock() - - if c.inactive { - return ErrConnectionLost - } - - _, subscribed := c.subscribedNotaryEvents[signer] - if !subscribed { - // no need to unsubscribe signer's - // requests without subscription - return nil - } - - err := c.client.Unsubscribe(c.subscribedNotaryEvents[signer]) - if err != nil { - return err - } - - delete(c.subscribedNotaryEvents, signer) - - return nil + return c.client.Unsubscribe(subID) } // UnsubscribeAll removes all active subscriptions of current client. @@ -191,163 +100,10 @@ func (c *Client) UnsubscribeAll() error { return ErrConnectionLost } - // no need to unsubscribe if there are - // no active subscriptions - if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 && - !c.subscribedToBlocks { - return nil - } - err := c.client.UnsubscribeAll() if err != nil { return err } - c.subscribedEvents = make(map[util.Uint160]string) - c.subscribedNotaryEvents = make(map[util.Uint160]string) - c.subscribedToBlocks = false - return nil } - -// subsInfo includes channels for ws notifications; -// cached subscription information. -type subsInfo struct { - blockRcv chan *block.Block - notificationRcv chan *state.ContainedNotificationEvent - notaryReqRcv chan *result.NotaryRequestEvent - - subscribedToBlocks bool - subscribedEvents map[util.Uint160]string - subscribedNotaryEvents map[util.Uint160]string -} - -// restoreSubscriptions restores subscriptions according to cached -// information about them. -// -// If it is NOT a background operation switchLock MUST be held. -// Returns a pair: the second is a restoration status and the first -// one contains subscription information applied to the passed cli -// and receivers for the updated subscriptions. -// Does not change Client instance. -func (c *Client) restoreSubscriptions(ctx context.Context, cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) { - var ( - err error - id string - ) - - stopCh := make(chan struct{}) - defer close(stopCh) - - blockRcv := make(chan *block.Block) - notificationRcv := make(chan *state.ContainedNotificationEvent) - notaryReqRcv := make(chan *result.NotaryRequestEvent) - - c.startListen(ctx, stopCh, blockRcv, notificationRcv, notaryReqRcv, background) - - if background { - c.switchLock.RLock() - defer c.switchLock.RUnlock() - } - - si.subscribedToBlocks = c.subscribedToBlocks - si.subscribedEvents = copySubsMap(c.subscribedEvents) - si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents) - si.blockRcv = blockRcv - si.notificationRcv = notificationRcv - si.notaryReqRcv = notaryReqRcv - - // new block events restoration - if si.subscribedToBlocks { - _, err = cli.ReceiveBlocks(nil, blockRcv) - if err != nil { - c.logger.Error(logs.ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch, - zap.String("endpoint", endpoint), - zap.Error(err), - ) - - return - } - } - - // notification events restoration - for contract := range si.subscribedEvents { - contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890 - id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv) - if err != nil { - c.logger.Error(logs.ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch, - zap.String("endpoint", endpoint), - zap.Error(err), - ) - - return - } - - si.subscribedEvents[contract] = id - } - - // notary notification events restoration - if c.notary != nil { - for signer := range si.subscribedNotaryEvents { - signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890 - id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv) - if err != nil { - c.logger.Error(logs.ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch, - zap.String("endpoint", endpoint), - zap.Error(err), - ) - - return - } - - si.subscribedNotaryEvents[signer] = id - } - } - - return si, true -} - -func (c *Client) startListen(ctx context.Context, stopCh <-chan struct{}, blockRcv <-chan *block.Block, - notificationRcv <-chan *state.ContainedNotificationEvent, notaryReqRcv <-chan *result.NotaryRequestEvent, background bool) { - // neo-go WS client says to _always_ read notifications - // from its channel. Subscribing to any notification - // while not reading them in another goroutine may - // lead to a dead-lock, thus that async side notification - // listening while restoring subscriptions - - go func() { - var e any - var ok bool - - for { - select { - case <-stopCh: - return - case e, ok = <-blockRcv: - case e, ok = <-notificationRcv: - case e, ok = <-notaryReqRcv: - } - - if !ok { - return - } - - if background { - // background client (test) switch, no need to send - // any notification, just preventing dead-lock - continue - } - - c.routeEvent(ctx, e) - } - }() -} - -func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string { - newM := make(map[util.Uint160]string, len(m)) - for k, v := range m { - newM[k] = v - } - - return newM -} diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index 2d478c9c..383d5840 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -11,8 +11,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -35,16 +35,27 @@ type ( Close() } + subChannels struct { + NotifyChan chan *state.ContainedNotificationEvent + BlockChan chan *block.Block + NotaryChan chan *result.NotaryRequestEvent + } + subscriber struct { *sync.RWMutex log *logger.Logger client *client.Client notifyChan chan *state.ContainedNotificationEvent - - blockChan chan *block.Block - + blockChan chan *block.Block notaryChan chan *result.NotaryRequestEvent + + current subChannels + + // cached subscription information + subscribedEvents map[util.Uint160]bool + subscribedNotaryEvents map[util.Uint160]bool + subscribedToNewBlocks bool } // Params is a group of Subscriber constructor parameters. @@ -75,22 +86,28 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error { s.Lock() defer s.Unlock() - notifyIDs := make(map[util.Uint160]struct{}, len(contracts)) + notifyIDs := make([]string, 0, len(contracts)) for i := range contracts { + if s.subscribedEvents[contracts[i]] { + continue + } // subscribe to contract notifications - err := s.client.SubscribeForExecutionNotifications(contracts[i]) + id, err := s.client.ReceiveExecutionNotifications(contracts[i], s.current.NotifyChan) if err != nil { // if there is some error, undo all subscriptions and return error - for hash := range notifyIDs { - _ = s.client.UnsubscribeContract(hash) + for _, id := range notifyIDs { + _ = s.client.Unsubscribe(id) } return err } // save notification id - notifyIDs[contracts[i]] = struct{}{} + notifyIDs = append(notifyIDs, id) + } + for i := range contracts { + s.subscribedEvents[contracts[i]] = true } return nil @@ -101,82 +118,34 @@ func (s *subscriber) Close() { } func (s *subscriber) BlockNotifications() error { - if err := s.client.SubscribeForNewBlocks(); err != nil { + s.Lock() + defer s.Unlock() + if s.subscribedToNewBlocks { + return nil + } + if _, err := s.client.ReceiveBlocks(s.current.BlockChan); err != nil { return fmt.Errorf("could not subscribe for new block events: %w", err) } + s.subscribedToNewBlocks = true + return nil } func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error { - if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil { + s.Lock() + defer s.Unlock() + if s.subscribedNotaryEvents[mainTXSigner] { + return nil + } + if _, err := s.client.ReceiveNotaryRequests(mainTXSigner, s.current.NotaryChan); err != nil { return fmt.Errorf("could not subscribe for notary request events: %w", err) } + s.subscribedNotaryEvents[mainTXSigner] = true return nil } -func (s *subscriber) routeNotifications(ctx context.Context) { - notificationChan := s.client.NotificationChannel() - - for { - select { - case <-ctx.Done(): - return - case notification, ok := <-notificationChan: - if !ok { - s.log.Warn(logs.SubscriberRemoteNotificationChannelHasBeenClosed) - close(s.notifyChan) - close(s.blockChan) - close(s.notaryChan) - - return - } - - switch notification.Type { - case neorpc.NotificationEventID: - notifyEvent, ok := notification.Value.(*state.ContainedNotificationEvent) - if !ok { - s.log.Error(logs.SubscriberCantCastNotifyEventValueToTheNotifyStruct, - zap.String("received type", fmt.Sprintf("%T", notification.Value)), - ) - continue - } - - s.log.Debug(logs.SubscriberNewNotificationEventFromSidechain, - zap.String("name", notifyEvent.Name), - ) - - s.notifyChan <- notifyEvent - case neorpc.BlockEventID: - b, ok := notification.Value.(*block.Block) - if !ok { - s.log.Error(logs.SubscriberCantCastBlockEventValueToBlock, - zap.String("received type", fmt.Sprintf("%T", notification.Value)), - ) - continue - } - - s.blockChan <- b - case neorpc.NotaryRequestEventID: - notaryRequest, ok := notification.Value.(*result.NotaryRequestEvent) - if !ok { - s.log.Error(logs.SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct, - zap.String("received type", fmt.Sprintf("%T", notification.Value)), - ) - continue - } - - s.notaryChan <- notaryRequest - default: - s.log.Debug(logs.SubscriberUnsupportedNotificationFromTheChain, - zap.Uint8("type", uint8(notification.Type)), - ) - } - } - } -} - // New is a constructs Neo:Morph event listener and returns Subscriber interface. func New(ctx context.Context, p *Params) (Subscriber, error) { switch { @@ -200,16 +169,170 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { notifyChan: make(chan *state.ContainedNotificationEvent), blockChan: make(chan *block.Block), notaryChan: make(chan *result.NotaryRequestEvent), - } - // Worker listens all events from neo-go websocket and puts them - // into corresponding channel. It may be notifications, transactions, - // new blocks. For now only notifications. + current: newSubChannels(), + + subscribedEvents: make(map[util.Uint160]bool), + subscribedNotaryEvents: make(map[util.Uint160]bool), + } + // Worker listens all events from temporary NeoGo channel and puts them + // into corresponding permanent channels. go sub.routeNotifications(ctx) return sub, nil } +func (s *subscriber) routeNotifications(ctx context.Context) { + var ( + // TODO: not needed after nspcc-dev/neo-go#2980. + cliCh = s.client.NotificationChannel() + restoreCh = make(chan bool) + restoreInProgress bool + ) + +routeloop: + for { + var connLost bool + s.RLock() + curr := s.current + s.RUnlock() + select { + case <-ctx.Done(): + break routeloop + case ev, ok := <-curr.NotifyChan: + if ok { + s.notifyChan <- ev + } else { + connLost = true + } + case ev, ok := <-curr.BlockChan: + if ok { + s.blockChan <- ev + } else { + connLost = true + } + case ev, ok := <-curr.NotaryChan: + if ok { + s.notaryChan <- ev + } else { + connLost = true + } + case _, ok := <-cliCh: + connLost = !ok + case ok := <-restoreCh: + restoreInProgress = false + if !ok { + connLost = true + } + } + if connLost { + if !restoreInProgress { + restoreInProgress, cliCh = s.switchEndpoint(ctx, restoreCh) + if !restoreInProgress { + break routeloop + } + curr.drain() + } else { // Avoid getting additional !ok events. + s.Lock() + s.current.NotifyChan = nil + s.current.BlockChan = nil + s.current.NotaryChan = nil + s.Unlock() + } + } + } + close(s.notifyChan) + close(s.blockChan) + close(s.notaryChan) +} + +func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (bool, <-chan rpcclient.Notification) { + s.log.Info("RPC connection lost, attempting reconnect") + if !s.client.SwitchRPC(ctx) { + s.log.Error("can't switch RPC node") + return false, nil + } + + cliCh := s.client.NotificationChannel() + + s.Lock() + chs := newSubChannels() + go func() { + finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan) + }() + s.current = chs + s.Unlock() + + return true, cliCh +} + +func newSubChannels() subChannels { + return subChannels{ + NotifyChan: make(chan *state.ContainedNotificationEvent), + BlockChan: make(chan *block.Block), + NotaryChan: make(chan *result.NotaryRequestEvent), + } +} + +func (s *subChannels) drain() { +drainloop: + for { + select { + case _, ok := <-s.NotifyChan: + if !ok { + s.NotifyChan = nil + } + case _, ok := <-s.BlockChan: + if !ok { + s.BlockChan = nil + } + case _, ok := <-s.NotaryChan: + if !ok { + s.NotaryChan = nil + } + default: + break drainloop + } + } +} + +// restoreSubscriptions restores subscriptions according to +// cached information about them. +func (s *subscriber) restoreSubscriptions(notifCh chan<- *state.ContainedNotificationEvent, + blCh chan<- *block.Block, notaryCh chan<- *result.NotaryRequestEvent) bool { + var err error + + // new block events restoration + if s.subscribedToNewBlocks { + _, err = s.client.ReceiveBlocks(blCh) + if err != nil { + s.log.Error(logs.ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch, zap.Error(err)) + return false + } + } + + // notification events restoration + for contract := range s.subscribedEvents { + contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890 + _, err = s.client.ReceiveExecutionNotifications(contract, notifCh) + if err != nil { + s.log.Error(logs.ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch, zap.Error(err)) + return false + } + } + + // notary notification events restoration + for signer := range s.subscribedNotaryEvents { + signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890 + _, err = s.client.ReceiveNotaryRequests(signer, notaryCh) + if err != nil { + s.log.Error(logs.ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch, zap.Error(err)) + return false + } + } + return true +} + // awaitHeight checks if remote client has least expected block height and // returns error if it is not reached that height after timeout duration. // This function is required to avoid connections to unsynced RPC nodes, because