frostfs-node/pkg/morph/client/notifications.go
aarifullin 04ab939e4c
All checks were successful
DCO action / DCO (pull_request) Successful in 2m42s
Build / Build Components (1.20) (pull_request) Successful in 4m26s
Tests and linters / Staticcheck (pull_request) Successful in 4m12s
Tests and linters / Lint (pull_request) Successful in 4m43s
Vulncheck / Vulncheck (pull_request) Successful in 4m58s
Tests and linters / Tests (1.20) (pull_request) Successful in 5m8s
Tests and linters / Tests (1.21) (pull_request) Successful in 5m23s
Tests and linters / Tests with -race (pull_request) Successful in 7m14s
Build / Build Components (1.21) (pull_request) Successful in 9m15s
[#706] morph: Use backoff strategy for UnsubscribeAll()
* Sometimes the morph client cannot unsubscribe from events
  because websocket client may be got down and neo-go will
  never the request for unsubscription.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-26 17:47:16 +03:00

138 lines
4.4 KiB
Go

package client
import (
"context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"github.com/cenkalti/backoff"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
// Close closes connection to the remote side making
// this client instance unusable. Closes notification
// channel returned from Client.NotificationChannel(),
// Removes all subscription.
func (c *Client) Close() {
// closing should be done via the channel
// to prevent switching to another RPC node
// in the notification loop
close(c.closeChan)
// closeWaiter performs asynchronously and thus
// we may abrupt all process related to close process
// if we do not wait for closing process finish.
<-c.closeDone
}
// ReceiveExecutionNotifications performs subscription for notifications
// generated during contract execution. Events are sent to the specified channel.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) ReceiveExecutionNotifications(contract util.Uint160, ch chan<- *state.ContainedNotificationEvent) (string, error) {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return "", ErrConnectionLost
}
return c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ch)
}
// ReceiveBlocks performs subscription for new block events. Events are sent
// to the specified channel.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) ReceiveBlocks(ch chan<- *block.Block) (string, error) {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return "", ErrConnectionLost
}
return c.client.ReceiveBlocks(nil, ch)
}
// ReceiveNotaryRequests performsn subscription for notary request payloads
// addition or removal events to this instance of client. Passed txSigner is
// used as filter: subscription is only for the notary requests that must be
// signed by txSigner. Events are sent to the specified channel.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) ReceiveNotaryRequests(txSigner util.Uint160, ch chan<- *result.NotaryRequestEvent) (string, error) {
if c.notary == nil {
panic(notaryNotEnabledPanicMsg)
}
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return "", ErrConnectionLost
}
return c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, ch)
}
// Unsubscribe performs unsubscription for the given subscription ID.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) Unsubscribe(subID string) error {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
return c.client.Unsubscribe(subID)
}
// UnsubscribeAll removes all active subscriptions of current client.
//
// Returns ErrConnectionLost if client has not been able to establish
// connection to any of passed RPC endpoints.
func (c *Client) UnsubscribeAll() error {
c.switchLock.Lock()
defer c.switchLock.Unlock()
if c.inactive {
return ErrConnectionLost
}
if err := c.client.UnsubscribeAll(); err != nil {
// TODO (aarifullin): consider the situation when the morph client
// failed to subscribe for events because of websocket client problems
// "under hood". After failed subscription the client invokes Close()
// that invokes UnsubscribeAll(). This requires to push new request
// via websocket to neo-go but the websocket client may be down.
// Therefore, morph will never request neo-go to unsubscribe from events, but
// we can try to fix this by reconnecting to neo-go.
backoffSettings := backoff.NewExponentialBackOff()
backoffSettings.MaxElapsedTime = 30 * time.Second
return backoff.Retry(func() error {
if !c.SwitchRPC(context.TODO()) {
c.logger.Warn(logs.SubscriberCouldNotSwitchRPCDuringUnsubscriptionFromEvents)
return fmt.Errorf("could not switch rpc")
}
err := c.client.UnsubscribeAll()
if err != nil {
c.logger.Warn(logs.SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy, zap.Error(err))
}
return err
}, backoffSettings)
}
return nil
}