From 04ab939e4c616e85814edb991e800720d79b084a Mon Sep 17 00:00:00 2001 From: aarifullin Date: Tue, 26 Sep 2023 15:46:23 +0300 Subject: [PATCH] [#706] morph: Use backoff strategy for UnsubscribeAll() * Sometimes the morph client cannot unsubscribe from events because websocket client may be got down and neo-go will never the request for unsubscription. Signed-off-by: Airat Arifullin --- go.mod | 1 + go.sum | Bin 164563 -> 164762 bytes internal/logs/logs.go | 2 ++ pkg/morph/client/client.go | 3 +++ pkg/morph/client/constructor.go | 2 ++ pkg/morph/client/multi.go | 2 ++ pkg/morph/client/notifications.go | 37 ++++++++++++++++++++++++++++-- 7 files changed, 45 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index c911b151e..97deb7256 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230915114754-555ccc63b255 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 + github.com/cenkalti/backoff v2.2.1+incompatible github.com/cheggaaa/pb v1.0.29 github.com/chzyer/readline v1.5.1 github.com/dgraph-io/ristretto v0.1.1 diff --git a/go.sum b/go.sum index 0df98fe6de825b8e3e4b7b2007a12acd56ef7927..2a1d808aa1c7bf4fb7d94f17835b356202273047 100644 GIT binary patch delta 149 zcmccI$~CK@XlFX!>RD}#fs}jHb@(@eQ67x{Mv>cy` zw8+Td9Ph|tr-FzS*VGaN$!nL&vO~?8oH$KF%hDhyHK^RoJ*A*B xt;i!WJ1ombKhPsFzcS1u(Iqr2EIT65%gf8yFCt>{nxz`ebxXI`EoBV73jj%aH7WoA delta 18 acmbQ$&ULw!YeUJ>X6t3!t(P&z-vt0r^azju diff --git a/internal/logs/logs.go b/internal/logs/logs.go index c18d191f8..aeed881fb 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -195,6 +195,8 @@ const ( SubscriberCantCastBlockEventValueToBlock = "can't cast block event value to block" SubscriberCantCastNotifyEventValueToTheNotaryRequestStruct = "can't cast notify event value to the notary request struct" SubscriberUnsupportedNotificationFromTheChain = "unsupported notification from the chain" + SubscriberCouldNotSwitchRPCDuringUnsubscriptionFromEvents = "could not switch rpc during the unsubscription from events" + SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy = "could not unsubscribe from events on backoff policy" BlobovniczaCreatingDirectoryForBoltDB = "creating directory for BoltDB" BlobovniczaOpeningBoltDB = "opening BoltDB" BlobovniczaInitializing = "initializing..." diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 606f3bd66..e8253a54f 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -74,6 +74,9 @@ type Client struct { // channel for internal stop closeChan chan struct{} + // channel to indicate that close is done + closeDone chan struct{} + // indicates that Client is not able to // establish connection to any of the // provided RPC endpoints diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index e7e1bbca9..0a1b3dff7 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -120,6 +120,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er accAddr: accAddr, cfg: *cfg, closeChan: make(chan struct{}), + closeDone: make(chan struct{}), } cli.endpoints.init(cfg.endpoints) @@ -169,6 +170,7 @@ func (c *Client) newCli(ctx context.Context, endpoint string) (*rpcclient.WSClie Options: rpcclient.Options{ DialTimeout: c.cfg.dialTimeout, }, + CloseNotificationChannelIfFull: true, }) if err != nil { return nil, nil, fmt.Errorf("WS client creation: %w", err) diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index e006ca69a..adbc3f42c 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -79,8 +79,10 @@ func (c *Client) closeWaiter(ctx context.Context) { case <-ctx.Done(): case <-c.closeChan: } + //nolint:contextcheck _ = c.UnsubscribeAll() c.close() + close(c.closeDone) } func (c *Client) switchToMostPrioritized(ctx context.Context) { diff --git a/pkg/morph/client/notifications.go b/pkg/morph/client/notifications.go index 121dccfb7..d97d73aeb 100644 --- a/pkg/morph/client/notifications.go +++ b/pkg/morph/client/notifications.go @@ -1,11 +1,18 @@ package client import ( + "context" + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "github.com/cenkalti/backoff" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/util" + "go.uber.org/zap" ) // Close closes connection to the remote side making @@ -17,6 +24,11 @@ func (c *Client) Close() { // to prevent switching to another RPC node // in the notification loop close(c.closeChan) + + // closeWaiter performs asynchronously and thus + // we may abrupt all process related to close process + // if we do not wait for closing process finish. + <-c.closeDone } // ReceiveExecutionNotifications performs subscription for notifications @@ -100,6 +112,27 @@ func (c *Client) UnsubscribeAll() error { return ErrConnectionLost } - err := c.client.UnsubscribeAll() - return err + if err := c.client.UnsubscribeAll(); err != nil { + // TODO (aarifullin): consider the situation when the morph client + // failed to subscribe for events because of websocket client problems + // "under hood". After failed subscription the client invokes Close() + // that invokes UnsubscribeAll(). This requires to push new request + // via websocket to neo-go but the websocket client may be down. + // Therefore, morph will never request neo-go to unsubscribe from events, but + // we can try to fix this by reconnecting to neo-go. + backoffSettings := backoff.NewExponentialBackOff() + backoffSettings.MaxElapsedTime = 30 * time.Second + return backoff.Retry(func() error { + if !c.SwitchRPC(context.TODO()) { + c.logger.Warn(logs.SubscriberCouldNotSwitchRPCDuringUnsubscriptionFromEvents) + return fmt.Errorf("could not switch rpc") + } + err := c.client.UnsubscribeAll() + if err != nil { + c.logger.Warn(logs.SubscriberCouldNotUnsubscribeFromEventsOnBackoffPolicy, zap.Error(err)) + } + return err + }, backoffSettings) + } + return nil }