frostfs-node/pkg/morph/client/notifications.go
Evgenii Stratonikov 0ccea802e9 [] morph/client: Perform RPC switch and restore in one step
Otherwise we could switch infinitely if subscription restore has failed.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
2022-07-04 09:29:01 +03:00

259 lines
6 KiB
Go

package client
import (
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
// Close closes connection to the remote side making
// this client instance unusable. Closes notification
// channel returned from Client.NotificationChannel(),
// Removes all subscription.
func (c *Client) Close() {
// closing should be done via the channel
// to prevent switching to another RPC node
// in the notification loop
c.closeChan <- struct{}{}
}
// SubscribeForExecutionNotifications adds subscription for notifications
// generated during contract transaction execution to this instance of client.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
_, subscribed := c.subscribedEvents[contract]
if subscribed {
// no need to subscribe one more time
return nil
}
id, err := c.client.SubscribeForExecutionNotifications(&contract, nil)
if err != nil {
return err
}
c.subscribedEvents[contract] = id
return nil
}
// SubscribeForNewBlocks adds subscription for new block events to this
// instance of client.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) SubscribeForNewBlocks() error {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
if c.subscribedToNewBlocks {
// no need to subscribe one more time
return nil
}
_, err := c.client.SubscribeForNewBlocks(nil)
if err != nil {
return err
}
c.subscribedToNewBlocks = true
return nil
}
// SubscribeForNotaryRequests adds subscription for notary request payloads
// addition or removal events to this instance of client. Passed txSigner is
// used as filter: subscription is only for the notary requests that must be
// signed by txSigner.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
_, subscribed := c.subscribedNotaryEvents[txSigner]
if subscribed {
// no need to subscribe one more time
return nil
}
id, err := c.client.SubscribeForNotaryRequests(nil, &txSigner)
if err != nil {
return err
}
c.subscribedNotaryEvents[txSigner] = id
return nil
}
// UnsubscribeContract removes subscription for given contract event stream.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeContract(contract util.Uint160) error {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
_, subscribed := c.subscribedEvents[contract]
if !subscribed {
// no need to unsubscribe contract
// without subscription
return nil
}
err := c.client.Unsubscribe(c.subscribedEvents[contract])
if err != nil {
return err
}
delete(c.subscribedEvents, contract)
return nil
}
// UnsubscribeNotaryRequest removes subscription for given notary requests
// signer.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeNotaryRequest(signer util.Uint160) error {
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
_, subscribed := c.subscribedNotaryEvents[signer]
if !subscribed {
// no need to unsubscribe signer's
// requests without subscription
return nil
}
err := c.client.Unsubscribe(c.subscribedNotaryEvents[signer])
if err != nil {
return err
}
delete(c.subscribedNotaryEvents, signer)
return nil
}
// UnsubscribeAll removes all active subscriptions of current client.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeAll() error {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
// no need to unsubscribe if there are
// no active subscriptions
if len(c.subscribedEvents) == 0 && len(c.subscribedNotaryEvents) == 0 &&
!c.subscribedToNewBlocks {
return nil
}
err := c.client.UnsubscribeAll()
if err != nil {
return err
}
c.subscribedEvents = make(map[util.Uint160]string)
c.subscribedNotaryEvents = make(map[util.Uint160]string)
c.subscribedToNewBlocks = false
return nil
}
// restoreSubscriptions restores subscriptions according to
// cached information about them.
func (c *Client) restoreSubscriptions(endpoint string) bool {
var (
err error
id string
)
// new block events restoration
if c.subscribedToNewBlocks {
_, err = c.client.SubscribeForNewBlocks(nil)
if err != nil {
c.logger.Error("could not restore block subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return false
}
}
// notification events restoration
for contract := range c.subscribedEvents {
id, err = c.client.SubscribeForExecutionNotifications(&contract, nil)
if err != nil {
c.logger.Error("could not restore notification subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return false
}
c.subscribedEvents[contract] = id
}
// notary notification events restoration
if c.notary != nil {
for signer := range c.subscribedNotaryEvents {
id, err = c.client.SubscribeForNotaryRequests(nil, &signer)
if err != nil {
c.logger.Error("could not restore notary notification subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return false
}
c.subscribedNotaryEvents[signer] = id
}
}
return true
}