WIP: [#706] morph: Use backoff strategy for UnsubscribeAll() #707
7 changed files with 47 additions and 2 deletions
@ -9,6 +9,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230915114754-555ccc63b255
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cheggaaa/pb v1.0.29
github.com/chzyer/readline v1.5.1
github.com/dgraph-io/ristretto v0.1.1
@ -447,6 +447,8 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku
github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -195,6 +195,8 @@ const (
SubscriberCantCastBlockEventValueToBlock = "can't cast block event value to block"
SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct = "can't cast notify event value to the notary request struct"
SubscriberUnsupportedNotificationFromTheChain = "unsupported notification from the chain"
SubscriberCouldNotSwitchRPCDuringUnsubscriptionFromEvents = "could not switch rpc during the unsubscription from events"
SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy = "could not unsubscribe from events on backoff policy"
BlobovniczaCreatingDirectoryForBoltDB = "creating directory for BoltDB"
BlobovniczaOpeningBoltDB = "opening BoltDB"
BlobovniczaInitializing = "initializing..."
@ -74,6 +74,9 @@ type Client struct {
// channel for internal stop
closeChan chan struct{}
// channel to indicate that close is done
closeDone chan struct{}
// indicates that Client is not able to
// establish connection to any of the
// provided RPC endpoints
@ -120,6 +120,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
accAddr: accAddr,
cfg: *cfg,
closeChan: make(chan struct{}),
closeDone: make(chan struct{}),
@ -169,6 +170,7 @@ func (c *Client) newCli(ctx context.Context, endpoint string) (*rpcclient.WSClie
Options: rpcclient.Options{
DialTimeout: c.cfg.dialTimeout,
CloseNotificationChannelIfFull: true,
if err != nil {
return nil, nil, fmt.Errorf("WS client creation: %w", err)
@ -79,8 +79,10 @@ func (c *Client) closeWaiter(ctx context.Context) {
case <-ctx.Done():
case <-c.closeChan:
_ = c.UnsubscribeAll()
func (c *Client) switchToMostPrioritized(ctx context.Context) {
@ -1,11 +1,18 @@
package client
import (
// Close closes connection to the remote side making
@ -17,6 +24,11 @@ func (c *Client) Close() {
// to prevent switching to another RPC node
// in the notification loop
// closeWaiter performs asynchronously and thus
// we may abrupt all process related to close process
// if we do not wait for closing process finish.
// ReceiveExecutionNotifications performs subscription for notifications
@ -100,6 +112,27 @@ func (c *Client) UnsubscribeAll() error {
return ErrConnectionLost
err := c.client.UnsubscribeAll()
return err
if err := c.client.UnsubscribeAll(); err != nil {
// TODO (aarifullin): consider the situation when the morph client
// failed to subscribe for events because of websocket client problems
// "under hood". After failed subscription the client invokes Close()
// that invokes UnsubscribeAll(). This requires to push new request
// via websocket to neo-go but the websocket client may be down.
// Therefore, morph will never request neo-go to unsubscribe from events, but
// we can try to fix this by reconnecting to neo-go.
backoffSettings := backoff.NewExponentialBackOff()
backoffSettings.MaxElapsedTime = 30 * time.Second
return backoff.Retry(func() error {
if !c.SwitchRPC(context.TODO()) {
return fmt.Errorf("could not switch rpc")
err := c.client.UnsubscribeAll()
if err != nil {
c.logger.Warn(logs.SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy, zap.Error(err))
return err
}, backoffSettings)
return nil
Add table
Reference in a new issue