diff --git a/go.mod b/go.mod index c911b151e..97deb7256 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230915114754-555ccc63b255 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 + github.com/cenkalti/backoff v2.2.1+incompatible github.com/cheggaaa/pb v1.0.29 github.com/chzyer/readline v1.5.1 github.com/dgraph-io/ristretto v0.1.1 diff --git a/go.sum b/go.sum index 0df98fe6d..2a1d808aa 100644 Binary files a/go.sum and b/go.sum differ diff --git a/internal/logs/logs.go b/internal/logs/logs.go index c18d191f8..aeed881fb 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -195,6 +195,8 @@ const ( SubscriberCantCastBlockEventValueToBlock = "can't cast block event value to block" SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct = "can't cast notify event value to the notary request struct" SubscriberUnsupportedNotificationFromTheChain = "unsupported notification from the chain" + SubscriberCouldNotSwitchRPCDuringUnsubscriptionFromEvents = "could not switch rpc during the unsubscription from events" + SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy = "could not unsubscribe from events on backoff policy" BlobovniczaCreatingDirectoryForBoltDB = "creating directory for BoltDB" BlobovniczaOpeningBoltDB = "opening BoltDB" BlobovniczaInitializing = "initializing..." diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 606f3bd66..e8253a54f 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -74,6 +74,9 @@ type Client struct { // channel for internal stop closeChan chan struct{} + // channel to indicate that close is done + closeDone chan struct{} + // indicates that Client is not able to // establish connection to any of the // provided RPC endpoints diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index e7e1bbca9..0a1b3dff7 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -120,6 +120,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er accAddr: accAddr, cfg: *cfg, closeChan: make(chan struct{}), + closeDone: make(chan struct{}), } cli.endpoints.init(cfg.endpoints) @@ -169,6 +170,7 @@ func (c *Client) newCli(ctx context.Context, endpoint string) (*rpcclient.WSClie Options: rpcclient.Options{ DialTimeout: c.cfg.dialTimeout, }, + CloseNotificationChannelIfFull: true, }) if err != nil { return nil, nil, fmt.Errorf("WS client creation: %w", err) diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index e006ca69a..adbc3f42c 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -79,8 +79,10 @@ func (c *Client) closeWaiter(ctx context.Context) { case <-ctx.Done(): case <-c.closeChan: } + //nolint:contextcheck _ = c.UnsubscribeAll() c.close() + close(c.closeDone) } func (c *Client) switchToMostPrioritized(ctx context.Context) { diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 121dccfb7..d97d73aeb 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -1,11 +1,18 @@ package client import ( + "context" + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "github.com/cenkalti/backoff" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/util" + "go.uber.org/zap" ) // Close closes connection to the remote side making @@ -17,6 +24,11 @@ func (c *Client) Close() { // to prevent switching to another RPC node // in the notification loop close(c.closeChan) + + // closeWaiter performs asynchronously and thus + // we may abrupt all process related to close process + // if we do not wait for closing process finish. + <-c.closeDone } // ReceiveExecutionNotifications performs subscription for notifications @@ -100,6 +112,27 @@ func (c *Client) UnsubscribeAll() error { return ErrConnectionLost } - err := c.client.UnsubscribeAll() - return err + if err := c.client.UnsubscribeAll(); err != nil { + // TODO (aarifullin): consider the situation when the morph client + // failed to subscribe for events because of websocket client problems + // "under hood". After failed subscription the client invokes Close() + // that invokes UnsubscribeAll(). This requires to push new request + // via websocket to neo-go but the websocket client may be down. + // Therefore, morph will never request neo-go to unsubscribe from events, but + // we can try to fix this by reconnecting to neo-go. + backoffSettings := backoff.NewExponentialBackOff() + backoffSettings.MaxElapsedTime = 30 * time.Second + return backoff.Retry(func() error { + if !c.SwitchRPC(context.TODO()) { + c.logger.Warn(logs.SubscriberCouldNotSwitchRPCDuringUnsubscriptionFromEvents) + return fmt.Errorf("could not switch rpc") + } + err := c.client.UnsubscribeAll() + if err != nil { + c.logger.Warn(logs.SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy, zap.Error(err)) + } + return err + }, backoffSettings) + } + return nil }