[#337] morph: Move subscription logic to subscriber

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2023-05-11 17:05:24 +03:00 committed by Evgenii Stratonikov
parent a5f118a987
commit 35fdf6f315
5 changed files with 258 additions and 507 deletions

View file

@ -69,9 +69,6 @@ type Client struct {
// on every normal call. // on every normal call.
switchLock *sync.RWMutex switchLock *sync.RWMutex
notifications chan rpcclient.Notification
subsInfo // protected with switchLock
// channel for internal stop // channel for internal stop
closeChan chan struct{} closeChan chan struct{}
@ -566,26 +563,11 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (val
// NotificationChannel returns channel than receives subscribed // NotificationChannel returns channel than receives subscribed
// notification from the connected RPC node. // notification from the connected RPC node.
// Channel is closed when connection to the RPC node has been // Channel is closed when connection to the RPC node is lost.
// lost without the possibility of recovery.
func (c *Client) NotificationChannel() <-chan rpcclient.Notification { func (c *Client) NotificationChannel() <-chan rpcclient.Notification {
return c.notifications c.switchLock.RLock()
} defer c.switchLock.RUnlock()
return c.client.Notifications //lint:ignore SA1019 waits for neo-go v0.102.0 https://github.com/nspcc-dev/neo-go/pull/2980
// inactiveMode switches Client to an inactive mode:
// - notification channel is closed;
// - all the new RPC request would return ErrConnectionLost;
// - inactiveModeCb is called if not nil.
func (c *Client) inactiveMode() {
c.switchLock.Lock()
defer c.switchLock.Unlock()
close(c.notifications)
c.inactive = true
if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
}
} }
func (c *Client) setActor(act *actor.Actor) { func (c *Client) setActor(act *actor.Actor) {

View file

@ -10,11 +10,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -108,21 +105,13 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
} }
cli := &Client{ cli := &Client{
cache: newClientCache(), cache: newClientCache(),
logger: cfg.logger, logger: cfg.logger,
acc: acc, acc: acc,
accAddr: accAddr, accAddr: accAddr,
cfg: *cfg, cfg: *cfg,
switchLock: &sync.RWMutex{}, switchLock: &sync.RWMutex{},
notifications: make(chan rpcclient.Notification), closeChan: make(chan struct{}),
subsInfo: subsInfo{
blockRcv: make(chan *block.Block),
notificationRcv: make(chan *state.ContainedNotificationEvent),
notaryReqRcv: make(chan *result.NotaryRequestEvent),
subscribedEvents: make(map[util.Uint160]string),
subscribedNotaryEvents: make(map[util.Uint160]string),
},
closeChan: make(chan struct{}),
} }
cli.endpoints.init(cfg.endpoints) cli.endpoints.init(cfg.endpoints)
@ -162,7 +151,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
} }
cli.setActor(act) cli.setActor(act)
go cli.notificationLoop(ctx) go cli.closeWaiter(ctx)
return cli, nil return cli, nil
} }

View file

@ -6,11 +6,6 @@ import (
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -34,7 +29,8 @@ func (e *endpoints) init(ee []Endpoint) {
e.list = ee e.list = ee
} }
func (c *Client) switchRPC(ctx context.Context) bool { // SwitchRPC performs reconnection and returns true if it was successful.
func (c *Client) SwitchRPC(ctx context.Context) bool {
c.switchLock.Lock() c.switchLock.Lock()
defer c.switchLock.Unlock() defer c.switchLock.Unlock()
@ -58,20 +54,8 @@ func (c *Client) switchRPC(ctx context.Context) bool {
c.logger.Info(logs.ClientConnectionToTheNewRPCNodeHasBeenEstablished, c.logger.Info(logs.ClientConnectionToTheNewRPCNodeHasBeenEstablished,
zap.String("endpoint", newEndpoint)) zap.String("endpoint", newEndpoint))
subs, ok := c.restoreSubscriptions(ctx, cli, newEndpoint, false)
if !ok {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
// closing connection to RPC node
// to switch to another one
cli.Close()
continue
}
c.client = cli c.client = cli
c.setActor(act) c.setActor(act)
c.subsInfo = subs
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() && if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority { c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
@ -82,97 +66,21 @@ func (c *Client) switchRPC(ctx context.Context) bool {
return true return true
} }
c.inactive = true
if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
}
return false return false
} }
func (c *Client) notificationLoop(ctx context.Context) { func (c *Client) closeWaiter(ctx context.Context) {
var e any
var ok bool
for {
c.switchLock.RLock()
bChan := c.blockRcv
nChan := c.notificationRcv
nrChan := c.notaryReqRcv
c.switchLock.RUnlock()
select {
case <-ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
return
case e, ok = <-bChan:
case e, ok = <-nChan:
case e, ok = <-nrChan:
}
if ok {
c.routeEvent(ctx, e)
continue
}
if !c.reconnect(ctx) {
return
}
}
}
func (c *Client) routeEvent(ctx context.Context, e any) {
typedNotification := rpcclient.Notification{Value: e}
switch e.(type) {
case *block.Block:
typedNotification.Type = neorpc.BlockEventID
case *state.ContainedNotificationEvent:
typedNotification.Type = neorpc.NotificationEventID
case *result.NotaryRequestEvent:
typedNotification.Type = neorpc.NotaryRequestEventID
}
select { select {
case c.notifications <- typedNotification:
case <-ctx.Done(): case <-ctx.Done():
_ = c.UnsubscribeAll()
c.close()
case <-c.closeChan: case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
} }
} _ = c.UnsubscribeAll()
c.close()
func (c *Client) reconnect(ctx context.Context) bool {
if closeErr := c.client.GetError(); closeErr != nil {
c.logger.Warn(logs.ClientSwitchingToTheNextRPCNode,
zap.String("reason", closeErr.Error()),
)
} else {
// neo-go client was closed by calling `Close`
// method, that happens only when a client has
// switched to the more prioritized RPC
return true
}
if !c.switchRPC(ctx) {
c.logger.Error(logs.ClientCouldNotEstablishConnectionToAnyRPCNode)
// could not connect to all endpoints =>
// switch client to inactive mode
c.inactiveMode()
return false
}
// TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch
// process some notification could be lost
return true
} }
func (c *Client) switchToMostPrioritized(ctx context.Context) { func (c *Client) switchToMostPrioritized(ctx context.Context) {
@ -218,36 +126,28 @@ mainLoop:
continue continue
} }
if subs, ok := c.restoreSubscriptions(ctx, cli, tryE, true); ok { c.switchLock.Lock()
c.switchLock.Lock()
// higher priority node could have been
// connected in the other goroutine
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
cli.Close()
c.switchLock.Unlock()
return
}
c.client.Close()
c.cache.invalidate()
c.client = cli
c.setActor(act)
c.subsInfo = subs
c.endpoints.curr = i
// higher priority node could have been
// connected in the other goroutine
if e.Priority >= c.endpoints.list[c.endpoints.curr].Priority {
cli.Close()
c.switchLock.Unlock() c.switchLock.Unlock()
c.logger.Info(logs.ClientSwitchedToTheHigherPriorityRPC,
zap.String("endpoint", tryE))
return return
} }
c.logger.Warn(logs.ClientCouldNotRestoreSideChainSubscriptionsUsingNode, c.client.Close()
zap.String("endpoint", tryE), c.cache.invalidate()
zap.Error(err), c.client = cli
) c.setActor(act)
c.endpoints.curr = i
c.switchLock.Unlock()
c.logger.Info(logs.ClientSwitchedToTheHigherPriorityRPC,
zap.String("endpoint", tryE))
return
} }
} }
} }
@ -255,6 +155,7 @@ mainLoop:
// close closes notification channel and wrapped WS client. // close closes notification channel and wrapped WS client.
func (c *Client) close() { func (c *Client) close() {
close(c.notifications) c.switchLock.RLock()
defer c.switchLock.RUnlock()
c.client.Close() c.client.Close()
} }

View file

@ -1,16 +1,11 @@
package client package client
import ( import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
) )
// Close closes connection to the remote side making // Close closes connection to the remote side making
@ -24,71 +19,46 @@ func (c *Client) Close() {
close(c.closeChan) close(c.closeChan)
} }
// SubscribeForExecutionNotifications adds subscription for notifications // ReceiveExecutionNotifications performs subscription for notifications
// generated during contract transaction execution to this instance of client. // generated during contract execution. Events are sent to the specified channel.
// //
// Returns ErrConnectionLost if client has not been able to establish // Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints. // connection to any of passed RPC endpoints.
func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error { func (c *Client) ReceiveExecutionNotifications(contract util.Uint160, ch chan<- *state.ContainedNotificationEvent) (string, error) {
c.switchLock.Lock() c.switchLock.Lock()
defer c.switchLock.Unlock() defer c.switchLock.Unlock()
if c.inactive { if c.inactive {
return ErrConnectionLost return "", ErrConnectionLost
} }
_, subscribed := c.subscribedEvents[contract] return c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ch)
if subscribed {
// no need to subscribe one more time
return nil
}
id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
if err != nil {
return err
}
c.subscribedEvents[contract] = id
return nil
} }
// SubscribeForNewBlocks adds subscription for new block events to this // ReceiveBlocks performs subscription for new block events. Events are sent
// instance of client. // to the specified channel.
// //
// Returns ErrConnectionLost if client has not been able to establish // Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints. // connection to any of passed RPC endpoints.
func (c *Client) SubscribeForNewBlocks() error { func (c *Client) ReceiveBlocks(ch chan<- *block.Block) (string, error) {
c.switchLock.Lock() c.switchLock.Lock()
defer c.switchLock.Unlock() defer c.switchLock.Unlock()
if c.inactive { if c.inactive {
return ErrConnectionLost return "", ErrConnectionLost
} }
if c.subscribedToBlocks { return c.client.ReceiveBlocks(nil, ch)
// no need to subscribe one more time
return nil
}
_, err := c.client.ReceiveBlocks(nil, c.blockRcv)
if err != nil {
return err
}
c.subscribedToBlocks = true
return nil
} }
// SubscribeForNotaryRequests adds subscription for notary request payloads // ReceiveNotaryRequests performsn subscription for notary request payloads
// addition or removal events to this instance of client. Passed txSigner is // addition or removal events to this instance of client. Passed txSigner is
// used as filter: subscription is only for the notary requests that must be // used as filter: subscription is only for the notary requests that must be
// signed by txSigner. // signed by txSigner. Events are sent to the specified channel.
// //
// Returns ErrConnectionLost if client has not been able to establish // Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints. // connection to any of passed RPC endpoints.
func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error { func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160, ch chan<- *result.NotaryRequestEvent) (string, error) {
if c.notary == nil { if c.notary == nil {
panic(notaryNotEnabledPanicMsg) panic(notaryNotEnabledPanicMsg)
} }
@ -97,30 +67,17 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
defer c.switchLock.Unlock() defer c.switchLock.Unlock()
if c.inactive { if c.inactive {
return ErrConnectionLost return "", ErrConnectionLost
} }
_, subscribed := c.subscribedNotaryEvents[txSigner] return c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, ch)
if subscribed {
// no need to subscribe one more time
return nil
}
id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
if err != nil {
return err
}
c.subscribedNotaryEvents[txSigner] = id
return nil
} }
// UnsubscribeContract removes subscription for given contract event stream. // Unsubscribe performs unsubscription for the given subscription ID.
// //
// Returns ErrConnectionLost if client has not been able to establish // Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints. // connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeContract(contract util.Uint160) error { func (c *Client) Unsubscribe(subID string) error {
c.switchLock.Lock() c.switchLock.Lock()
defer c.switchLock.Unlock() defer c.switchLock.Unlock()
@ -128,55 +85,7 @@ func (c *Client) UnsubscribeContract(contract util.Uint160) error {
return ErrConnectionLost return ErrConnectionLost
} }
_, subscribed := c.subscribedEvents[contract] return c.client.Unsubscribe(subID)
if !subscribed {
// no need to unsubscribe contract
// without subscription
return nil
}
err := c.client.Unsubscribe(c.subscribedEvents[contract])
if err != nil {
return err
}
delete(c.subscribedEvents, contract)
return nil
}
// UnsubscribeNotaryRequest removes subscription for given notary requests
// signer.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeNotaryRequest(signer util.Uint160) error {
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
_, subscribed := c.subscribedNotaryEvents[signer]
if !subscribed {
// no need to unsubscribe signer's
// requests without subscription
return nil
}
err := c.client.Unsubscribe(c.subscribedNotaryEvents[signer])
if err != nil {
return err
}
delete(c.subscribedNotaryEvents, signer)
return nil
} }
// UnsubscribeAll removes all active subscriptions of current client. // UnsubscribeAll removes all active subscriptions of current client.
@ -191,163 +100,10 @@ func (c *Client) UnsubscribeAll() error {
return ErrConnectionLost return ErrConnectionLost
} }
// no need to unsubscribe if there are
// no active subscriptions
if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 &&
!c.subscribedToBlocks {
return nil
}
err := c.client.UnsubscribeAll() err := c.client.UnsubscribeAll()
if err != nil { if err != nil {
return err return err
} }
c.subscribedEvents = make(map[util.Uint160]string)
c.subscribedNotaryEvents = make(map[util.Uint160]string)
c.subscribedToBlocks = false
return nil return nil
} }
// subsInfo includes channels for ws notifications;
// cached subscription information.
type subsInfo struct {
blockRcv chan *block.Block
notificationRcv chan *state.ContainedNotificationEvent
notaryReqRcv chan *result.NotaryRequestEvent
subscribedToBlocks bool
subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string
}
// restoreSubscriptions restores subscriptions according to cached
// information about them.
//
// If it is NOT a background operation switchLock MUST be held.
// Returns a pair: the second is a restoration status and the first
// one contains subscription information applied to the passed cli
// and receivers for the updated subscriptions.
// Does not change Client instance.
func (c *Client) restoreSubscriptions(ctx context.Context, cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
var (
err error
id string
)
stopCh := make(chan struct{})
defer close(stopCh)
blockRcv := make(chan *block.Block)
notificationRcv := make(chan *state.ContainedNotificationEvent)
notaryReqRcv := make(chan *result.NotaryRequestEvent)
c.startListen(ctx, stopCh, blockRcv, notificationRcv, notaryReqRcv, background)
if background {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
}
si.subscribedToBlocks = c.subscribedToBlocks
si.subscribedEvents = copySubsMap(c.subscribedEvents)
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
si.blockRcv = blockRcv
si.notificationRcv = notificationRcv
si.notaryReqRcv = notaryReqRcv
// new block events restoration
if si.subscribedToBlocks {
_, err = cli.ReceiveBlocks(nil, blockRcv)
if err != nil {
c.logger.Error(logs.ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch,
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
}
// notification events restoration
for contract := range si.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
if err != nil {
c.logger.Error(logs.ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch,
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
si.subscribedEvents[contract] = id
}
// notary notification events restoration
if c.notary != nil {
for signer := range si.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
if err != nil {
c.logger.Error(logs.ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch,
zap.String("endpoint", endpoint),
zap.Error(err),
)
return
}
si.subscribedNotaryEvents[signer] = id
}
}
return si, true
}
func (c *Client) startListen(ctx context.Context, stopCh <-chan struct{}, blockRcv <-chan *block.Block,
notificationRcv <-chan *state.ContainedNotificationEvent, notaryReqRcv <-chan *result.NotaryRequestEvent, background bool) {
// neo-go WS client says to _always_ read notifications
// from its channel. Subscribing to any notification
// while not reading them in another goroutine may
// lead to a dead-lock, thus that async side notification
// listening while restoring subscriptions
go func() {
var e any
var ok bool
for {
select {
case <-stopCh:
return
case e, ok = <-blockRcv:
case e, ok = <-notificationRcv:
case e, ok = <-notaryReqRcv:
}
if !ok {
return
}
if background {
// background client (test) switch, no need to send
// any notification, just preventing dead-lock
continue
}
c.routeEvent(ctx, e)
}
}()
}
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
newM := make(map[util.Uint160]string, len(m))
for k, v := range m {
newM[k] = v
}
return newM
}

View file

@ -11,8 +11,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -35,16 +35,27 @@ type (
Close() Close()
} }
subChannels struct {
NotifyChan chan *state.ContainedNotificationEvent
BlockChan chan *block.Block
NotaryChan chan *result.NotaryRequestEvent
}
subscriber struct { subscriber struct {
*sync.RWMutex *sync.RWMutex
log *logger.Logger log *logger.Logger
client *client.Client client *client.Client
notifyChan chan *state.ContainedNotificationEvent notifyChan chan *state.ContainedNotificationEvent
blockChan chan *block.Block
blockChan chan *block.Block
notaryChan chan *result.NotaryRequestEvent notaryChan chan *result.NotaryRequestEvent
current subChannels
// cached subscription information
subscribedEvents map[util.Uint160]bool
subscribedNotaryEvents map[util.Uint160]bool
subscribedToNewBlocks bool
} }
// Params is a group of Subscriber constructor parameters. // Params is a group of Subscriber constructor parameters.
@ -75,22 +86,28 @@ func (s *subscriber) SubscribeForNotification(contracts ...util.Uint160) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
notifyIDs := make(map[util.Uint160]struct{}, len(contracts)) notifyIDs := make([]string, 0, len(contracts))
for i := range contracts { for i := range contracts {
if s.subscribedEvents[contracts[i]] {
continue
}
// subscribe to contract notifications // subscribe to contract notifications
err := s.client.SubscribeForExecutionNotifications(contracts[i]) id, err := s.client.ReceiveExecutionNotifications(contracts[i], s.current.NotifyChan)
if err != nil { if err != nil {
// if there is some error, undo all subscriptions and return error // if there is some error, undo all subscriptions and return error
for hash := range notifyIDs { for _, id := range notifyIDs {
_ = s.client.UnsubscribeContract(hash) _ = s.client.Unsubscribe(id)
} }
return err return err
} }
// save notification id // save notification id
notifyIDs[contracts[i]] = struct{}{} notifyIDs = append(notifyIDs, id)
}
for i := range contracts {
s.subscribedEvents[contracts[i]] = true
} }
return nil return nil
@ -101,82 +118,34 @@ func (s *subscriber) Close() {
} }
func (s *subscriber) BlockNotifications() error { func (s *subscriber) BlockNotifications() error {
if err := s.client.SubscribeForNewBlocks(); err != nil { s.Lock()
defer s.Unlock()
if s.subscribedToNewBlocks {
return nil
}
if _, err := s.client.ReceiveBlocks(s.current.BlockChan); err != nil {
return fmt.Errorf("could not subscribe for new block events: %w", err) return fmt.Errorf("could not subscribe for new block events: %w", err)
} }
s.subscribedToNewBlocks = true
return nil return nil
} }
func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error { func (s *subscriber) SubscribeForNotaryRequests(mainTXSigner util.Uint160) error {
if err := s.client.SubscribeForNotaryRequests(mainTXSigner); err != nil { s.Lock()
defer s.Unlock()
if s.subscribedNotaryEvents[mainTXSigner] {
return nil
}
if _, err := s.client.ReceiveNotaryRequests(mainTXSigner, s.current.NotaryChan); err != nil {
return fmt.Errorf("could not subscribe for notary request events: %w", err) return fmt.Errorf("could not subscribe for notary request events: %w", err)
} }
s.subscribedNotaryEvents[mainTXSigner] = true
return nil return nil
} }
func (s *subscriber) routeNotifications(ctx context.Context) {
notificationChan := s.client.NotificationChannel()
for {
select {
case <-ctx.Done():
return
case notification, ok := <-notificationChan:
if !ok {
s.log.Warn(logs.SubscriberRemoteNotificationChannelHasBeenClosed)
close(s.notifyChan)
close(s.blockChan)
close(s.notaryChan)
return
}
switch notification.Type {
case neorpc.NotificationEventID:
notifyEvent, ok := notification.Value.(*state.ContainedNotificationEvent)
if !ok {
s.log.Error(logs.SubscriberCantCastNotifyEventValueToTheNotifyStruct,
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.log.Debug(logs.SubscriberNewNotificationEventFromSidechain,
zap.String("name", notifyEvent.Name),
)
s.notifyChan <- notifyEvent
case neorpc.BlockEventID:
b, ok := notification.Value.(*block.Block)
if !ok {
s.log.Error(logs.SubscriberCantCastBlockEventValueToBlock,
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.blockChan <- b
case neorpc.NotaryRequestEventID:
notaryRequest, ok := notification.Value.(*result.NotaryRequestEvent)
if !ok {
s.log.Error(logs.SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct,
zap.String("received type", fmt.Sprintf("%T", notification.Value)),
)
continue
}
s.notaryChan <- notaryRequest
default:
s.log.Debug(logs.SubscriberUnsupportedNotificationFromTheChain,
zap.Uint8("type", uint8(notification.Type)),
)
}
}
}
}
// New is a constructs Neo:Morph event listener and returns Subscriber interface. // New is a constructs Neo:Morph event listener and returns Subscriber interface.
func New(ctx context.Context, p *Params) (Subscriber, error) { func New(ctx context.Context, p *Params) (Subscriber, error) {
switch { switch {
@ -200,16 +169,170 @@ func New(ctx context.Context, p *Params) (Subscriber, error) {
notifyChan: make(chan *state.ContainedNotificationEvent), notifyChan: make(chan *state.ContainedNotificationEvent),
blockChan: make(chan *block.Block), blockChan: make(chan *block.Block),
notaryChan: make(chan *result.NotaryRequestEvent), notaryChan: make(chan *result.NotaryRequestEvent),
}
// Worker listens all events from neo-go websocket and puts them current: newSubChannels(),
// into corresponding channel. It may be notifications, transactions,
// new blocks. For now only notifications. subscribedEvents: make(map[util.Uint160]bool),
subscribedNotaryEvents: make(map[util.Uint160]bool),
}
// Worker listens all events from temporary NeoGo channel and puts them
// into corresponding permanent channels.
go sub.routeNotifications(ctx) go sub.routeNotifications(ctx)
return sub, nil return sub, nil
} }
func (s *subscriber) routeNotifications(ctx context.Context) {
var (
// TODO: not needed after nspcc-dev/neo-go#2980.
cliCh = s.client.NotificationChannel()
restoreCh = make(chan bool)
restoreInProgress bool
)
routeloop:
for {
var connLost bool
s.RLock()
curr := s.current
s.RUnlock()
select {
case <-ctx.Done():
break routeloop
case ev, ok := <-curr.NotifyChan:
if ok {
s.notifyChan <- ev
} else {
connLost = true
}
case ev, ok := <-curr.BlockChan:
if ok {
s.blockChan <- ev
} else {
connLost = true
}
case ev, ok := <-curr.NotaryChan:
if ok {
s.notaryChan <- ev
} else {
connLost = true
}
case _, ok := <-cliCh:
connLost = !ok
case ok := <-restoreCh:
restoreInProgress = false
if !ok {
connLost = true
}
}
if connLost {
if !restoreInProgress {
restoreInProgress, cliCh = s.switchEndpoint(ctx, restoreCh)
if !restoreInProgress {
break routeloop
}
curr.drain()
} else { // Avoid getting additional !ok events.
s.Lock()
s.current.NotifyChan = nil
s.current.BlockChan = nil
s.current.NotaryChan = nil
s.Unlock()
}
}
}
close(s.notifyChan)
close(s.blockChan)
close(s.notaryChan)
}
func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (bool, <-chan rpcclient.Notification) {
s.log.Info("RPC connection lost, attempting reconnect")
if !s.client.SwitchRPC(ctx) {
s.log.Error("can't switch RPC node")
return false, nil
}
cliCh := s.client.NotificationChannel()
s.Lock()
chs := newSubChannels()
go func() {
finishCh <- s.restoreSubscriptions(chs.NotifyChan, chs.BlockChan, chs.NotaryChan)
}()
s.current = chs
s.Unlock()
return true, cliCh
}
func newSubChannels() subChannels {
return subChannels{
NotifyChan: make(chan *state.ContainedNotificationEvent),
BlockChan: make(chan *block.Block),
NotaryChan: make(chan *result.NotaryRequestEvent),
}
}
func (s *subChannels) drain() {
drainloop:
for {
select {
case _, ok := <-s.NotifyChan:
if !ok {
s.NotifyChan = nil
}
case _, ok := <-s.BlockChan:
if !ok {
s.BlockChan = nil
}
case _, ok := <-s.NotaryChan:
if !ok {
s.NotaryChan = nil
}
default:
break drainloop
}
}
}
// restoreSubscriptions restores subscriptions according to
// cached information about them.
func (s *subscriber) restoreSubscriptions(notifCh chan<- *state.ContainedNotificationEvent,
blCh chan<- *block.Block, notaryCh chan<- *result.NotaryRequestEvent) bool {
var err error
// new block events restoration
if s.subscribedToNewBlocks {
_, err = s.client.ReceiveBlocks(blCh)
if err != nil {
s.log.Error(logs.ClientCouldNotRestoreBlockSubscriptionAfterRPCSwitch, zap.Error(err))
return false
}
}
// notification events restoration
for contract := range s.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
_, err = s.client.ReceiveExecutionNotifications(contract, notifCh)
if err != nil {
s.log.Error(logs.ClientCouldNotRestoreNotificationSubscriptionAfterRPCSwitch, zap.Error(err))
return false
}
}
// notary notification events restoration
for signer := range s.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
_, err = s.client.ReceiveNotaryRequests(signer, notaryCh)
if err != nil {
s.log.Error(logs.ClientCouldNotRestoreNotaryNotificationSubscriptionAfterRPCSwitch, zap.Error(err))
return false
}
}
return true
}
// awaitHeight checks if remote client has least expected block height and // awaitHeight checks if remote client has least expected block height and
// returns error if it is not reached that height after timeout duration. // returns error if it is not reached that height after timeout duration.
// This function is required to avoid connections to unsynced RPC nodes, because // This function is required to avoid connections to unsynced RPC nodes, because