diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 51a030e6..c840a34c 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -69,17 +69,12 @@ type Client struct { // on every normal call. switchLock *sync.RWMutex - // channel for ws notifications notifications chan rpcclient.Notification + subsInfo // protected with switchLock // channel for internal stop closeChan chan struct{} - // cached subscription information - subscribedEvents map[util.Uint160]string - subscribedNotaryEvents map[util.Uint160]string - subscribedToNewBlocks bool - // indicates that Client is not able to // establish connection to any of the // provided RPC endpoints diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index efb3d0bf..e4569ad0 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -9,8 +9,11 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" lru "github.com/hashicorp/golang-lru/v2" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/util" @@ -102,17 +105,22 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) { } cli := &Client{ - cache: newClientCache(), - logger: cfg.logger, - acc: acc, - accAddr: accAddr, - signer: cfg.signer, - cfg: *cfg, - switchLock: &sync.RWMutex{}, - notifications: make(chan rpcclient.Notification), - subscribedEvents: make(map[util.Uint160]string), - subscribedNotaryEvents: make(map[util.Uint160]string), - closeChan: make(chan struct{}), + cache: newClientCache(), + logger: cfg.logger, + acc: acc, + accAddr: accAddr, + signer: cfg.signer, + cfg: *cfg, + switchLock: &sync.RWMutex{}, + notifications: make(chan rpcclient.Notification), + subsInfo: subsInfo{ + blockRcv: make(chan *block.Block), + notificationRcv: make(chan *state.ContainedNotificationEvent), + notaryReqRcv: make(chan *result.NotaryRequestEvent), + subscribedEvents: make(map[util.Uint160]string), + subscribedNotaryEvents: make(map[util.Uint160]string), + }, + closeChan: make(chan struct{}), } cli.endpoints.init(cfg.endpoints) diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index e0eecd92..54af56b2 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -4,6 +4,11 @@ import ( "sort" "time" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" "go.uber.org/zap" ) @@ -51,7 +56,8 @@ func (c *Client) switchRPC() bool { c.logger.Info("connection to the new RPC node has been established", zap.String("endpoint", newEndpoint)) - if !c.restoreSubscriptions(cli, newEndpoint) { + subs, ok := c.restoreSubscriptions(cli, newEndpoint, false) + if !ok { // new WS client does not allow // restoring subscription, client // could not work correctly => @@ -63,6 +69,7 @@ func (c *Client) switchRPC() bool { c.client = cli c.setActor(act) + c.subsInfo = subs if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() && c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority { @@ -77,9 +84,14 @@ func (c *Client) switchRPC() bool { } func (c *Client) notificationLoop() { + var e any + var ok bool + for { c.switchLock.RLock() - nChan := c.client.Notifications + bChan := c.blockRcv + nChan := c.notificationRcv + nrChan := c.notaryReqRcv c.switchLock.RUnlock() select { @@ -93,57 +105,74 @@ func (c *Client) notificationLoop() { c.close() return - case n, ok := <-nChan: - // notification channel is used as a connection - // state: if it is closed, the connection is - // considered to be lost - if !ok { - if closeErr := c.client.GetError(); closeErr != nil { - c.logger.Warn("switching to the next RPC node", - zap.String("reason", closeErr.Error()), - ) - } else { - // neo-go client was closed by calling `Close` - // method that happens only when the client has - // switched to the more prioritized RPC - continue - } + case e, ok = <-bChan: + case e, ok = <-nChan: + case e, ok = <-nrChan: + } - if !c.switchRPC() { - c.logger.Error("could not establish connection to any RPC node") + if ok { + c.routeEvent(e) + continue + } - // could not connect to all endpoints => - // switch client to inactive mode - c.inactiveMode() - - return - } - - // TODO(@carpawell): call here some callback retrieved in constructor - // of the client to allow checking chain state since during switch - // process some notification could be lost - - continue - } - - select { - case c.notifications <- n: - continue - case <-c.cfg.ctx.Done(): - _ = c.UnsubscribeAll() - c.close() - - return - case <-c.closeChan: - _ = c.UnsubscribeAll() - c.close() - - return - } + if !c.reconnect() { + return } } } +func (c *Client) routeEvent(e any) { + typedNotification := rpcclient.Notification{Value: e} + + switch e.(type) { + case *block.Block: + typedNotification.Type = neorpc.BlockEventID + case *state.ContainedNotificationEvent: + typedNotification.Type = neorpc.NotificationEventID + case *result.NotaryRequestEvent: + typedNotification.Type = neorpc.NotaryRequestEventID + } + + select { + case c.notifications <- typedNotification: + case <-c.cfg.ctx.Done(): + _ = c.UnsubscribeAll() + c.close() + case <-c.closeChan: + _ = c.UnsubscribeAll() + c.close() + } +} + +func (c *Client) reconnect() bool { + if closeErr := c.client.GetError(); closeErr != nil { + c.logger.Warn("switching to the next RPC node", + zap.String("reason", closeErr.Error()), + ) + } else { + // neo-go client was closed by calling `Close` + // method, that happens only when a client has + // switched to the more prioritized RPC + return true + } + + if !c.switchRPC() { + c.logger.Error("could not establish connection to any RPC node") + + // could not connect to all endpoints => + // switch client to inactive mode + c.inactiveMode() + + return false + } + + // TODO(@carpawell): call here some callback retrieved in constructor + // of the client to allow checking chain state since during switch + // process some notification could be lost + + return true +} + func (c *Client) switchToMostPrioritized() { t := time.NewTicker(c.cfg.switchInterval) defer t.Stop() @@ -156,11 +185,12 @@ mainLoop: return case <-t.C: c.switchLock.RLock() + endpointsCopy := make([]Endpoint, len(c.endpoints.list)) copy(endpointsCopy, c.endpoints.list) - currPriority := c.endpoints.list[c.endpoints.curr].Priority highestPriority := c.endpoints.list[0].Priority + c.switchLock.RUnlock() if currPriority == highestPriority { @@ -186,7 +216,7 @@ mainLoop: continue } - if c.restoreSubscriptions(cli, tryE) { + if subs, ok := c.restoreSubscriptions(cli, tryE, true); ok { c.switchLock.Lock() // higher priority node could have been @@ -201,6 +231,7 @@ mainLoop: c.cache.invalidate() c.client = cli c.setActor(act) + c.subsInfo = subs c.endpoints.curr = i c.switchLock.Unlock() diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 2afeebb8..1d287527 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -1,6 +1,10 @@ package client import ( + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" @@ -36,7 +40,7 @@ func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error return nil } - id, err := c.client.SubscribeForExecutionNotifications(&contract, nil) + id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv) if err != nil { return err } @@ -59,17 +63,17 @@ func (c *Client) SubscribeForNewBlocks() error { return ErrConnectionLost } - if c.subscribedToNewBlocks { + if c.subscribedToBlocks { // no need to subscribe one more time return nil } - _, err := c.client.SubscribeForNewBlocks(nil) + _, err := c.client.ReceiveBlocks(nil, c.blockRcv) if err != nil { return err } - c.subscribedToNewBlocks = true + c.subscribedToBlocks = true return nil } @@ -99,7 +103,7 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error { return nil } - id, err := c.client.SubscribeForNotaryRequests(nil, &txSigner) + id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv) if err != nil { return err } @@ -187,7 +191,7 @@ func (c *Client) UnsubscribeAll() error { // no need to unsubscribe if there are // no active subscriptions if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 && - !c.subscribedToNewBlocks { + !c.subscribedToBlocks { return nil } @@ -198,14 +202,32 @@ func (c *Client) UnsubscribeAll() error { c.subscribedEvents = make(map[util.Uint160]string) c.subscribedNotaryEvents = make(map[util.Uint160]string) - c.subscribedToNewBlocks = false + c.subscribedToBlocks = false return nil } -// restoreSubscriptions restores subscriptions according to -// cached information about them. -func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) bool { +// subsInfo includes channels for ws notifications; +// cached subscription information. +type subsInfo struct { + blockRcv chan *block.Block + notificationRcv chan *state.ContainedNotificationEvent + notaryReqRcv chan *result.NotaryRequestEvent + + subscribedToBlocks bool + subscribedEvents map[util.Uint160]string + subscribedNotaryEvents map[util.Uint160]string +} + +// restoreSubscriptions restores subscriptions according to cached +// information about them. +// +// If it is NOT a background operation switchLock MUST be held. +// Returns a pair: the second is a restoration status and the first +// one contains subscription information applied to the passed cli +// and receivers for the updated subscriptions. +// Does not change Client instance. +func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) { var ( err error id string @@ -214,72 +236,109 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) stopCh := make(chan struct{}) defer close(stopCh) + blockRcv := make(chan *block.Block) + notificationRcv := make(chan *state.ContainedNotificationEvent) + notaryReqRcv := make(chan *result.NotaryRequestEvent) + // neo-go WS client says to _always_ read notifications // from its channel. Subscribing to any notification // while not reading them in another goroutine may // lead to a dead-lock, thus that async side notification // listening while restoring subscriptions go func() { + var e any + var ok bool + for { select { case <-stopCh: return - case n, ok := <-cli.Notifications: - if !ok { - return - } - - c.notifications <- n + case e, ok = <-blockRcv: + case e, ok = <-notificationRcv: + case e, ok = <-notaryReqRcv: } + + if !ok { + return + } + + if background { + // background client (test) switch, no need to send + // any notification, just preventing dead-lock + continue + } + + c.routeEvent(e) } }() + if background { + c.switchLock.RLock() + defer c.switchLock.RUnlock() + } + + si.subscribedToBlocks = c.subscribedToBlocks + si.subscribedEvents = copySubsMap(c.subscribedEvents) + si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents) + si.blockRcv = blockRcv + si.notificationRcv = notificationRcv + si.notaryReqRcv = notaryReqRcv + // new block events restoration - if c.subscribedToNewBlocks { - _, err = cli.SubscribeForNewBlocks(nil) + if si.subscribedToBlocks { + _, err = cli.ReceiveBlocks(nil, blockRcv) if err != nil { c.logger.Error("could not restore block subscription after RPC switch", zap.String("endpoint", endpoint), zap.Error(err), ) - return false + return } } // notification events restoration - for contract := range c.subscribedEvents { + for contract := range si.subscribedEvents { contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890 - id, err = cli.SubscribeForExecutionNotifications(&contract, nil) + id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv) if err != nil { c.logger.Error("could not restore notification subscription after RPC switch", zap.String("endpoint", endpoint), zap.Error(err), ) - return false + return } - c.subscribedEvents[contract] = id + si.subscribedEvents[contract] = id } // notary notification events restoration if c.notary != nil { - for signer := range c.subscribedNotaryEvents { + for signer := range si.subscribedNotaryEvents { signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890 - id, err = cli.SubscribeForNotaryRequests(nil, &signer) + id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv) if err != nil { c.logger.Error("could not restore notary notification subscription after RPC switch", zap.String("endpoint", endpoint), zap.Error(err), ) - return false + return } - c.subscribedNotaryEvents[signer] = id + si.subscribedNotaryEvents[signer] = id } } - return true + return si, true +} + +func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string { + newM := make(map[util.Uint160]string, len(m)) + for k, v := range m { + newM[k] = v + } + + return newM }