From 6f5edac7303ba047b48e833ac31d8c9140dbbc61 Mon Sep 17 00:00:00 2001
From: Evgenii Stratonikov <e.stratonikov@yadro.com>
Date: Mon, 19 Dec 2022 17:47:28 +0300
Subject: [PATCH] [#2164] network/cache: Do not reconnect to failed clients
 immediately

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
---
 pkg/network/cache/multi.go                    | 117 ++++++++++++++----
 pkg/services/object/get/v2/util.go            |   3 +
 pkg/services/object/internal/client/client.go |   4 +
 pkg/services/object/internal/client/error.go  |  14 +++
 pkg/services/object/put/v2/streamer.go        |   3 +
 5 files changed, 120 insertions(+), 21 deletions(-)
 create mode 100644 pkg/services/object/internal/client/error.go

diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go
index 152b4b92f..64b142824 100644
--- a/pkg/network/cache/multi.go
+++ b/pkg/network/cache/multi.go
@@ -5,6 +5,7 @@ import (
 	"errors"
 	"fmt"
 	"sync"
+	"time"
 
 	rawclient "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/client"
 	clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
@@ -12,23 +13,34 @@ import (
 	"github.com/TrueCloudLab/frostfs-sdk-go/client"
 )
 
+type singleClient struct {
+	sync.RWMutex
+	client      clientcore.Client
+	lastAttempt time.Time
+}
+
 type multiClient struct {
 	mtx sync.RWMutex
 
-	clients map[string]clientcore.Client
+	clients map[string]*singleClient
 
 	// addrMtx protects addr field. Should not be taken before the mtx.
 	addrMtx sync.RWMutex
 	addr    network.AddressGroup
 
 	opts ClientCacheOpts
+
+	reconnectInterval time.Duration
 }
 
+const defaultReconnectInterval = time.Second * 30
+
 func newMultiClient(addr network.AddressGroup, opts ClientCacheOpts) *multiClient {
 	return &multiClient{
-		clients: make(map[string]clientcore.Client),
-		addr:    addr,
-		opts:    opts,
+		clients:           make(map[string]*singleClient),
+		addr:              addr,
+		opts:              opts,
+		reconnectInterval: defaultReconnectInterval,
 	}
 }
 
@@ -110,6 +122,8 @@ loop:
 	x.addrMtx.Unlock()
 }
 
+var errRecentlyFailed = errors.New("client has recently failed, skipping")
+
 func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Client) error) error {
 	var firstErr error
 
@@ -134,16 +148,45 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
 
 		success := err == nil || errors.Is(err, context.Canceled)
 
-		if success || firstErr == nil {
+		if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
 			firstErr = err
 		}
 
+		if err != nil {
+			x.ReportError(err)
+		}
+
 		return success
 	})
 
 	return firstErr
 }
 
+func (x *multiClient) ReportError(err error) {
+	if errors.Is(err, errRecentlyFailed) {
+		return
+	}
+
+	// Dropping all clients here is not necessary, we do this
+	// because `multiClient` doesn't yet provide convenient interface
+	// for reporting individual errors for streaming operations.
+	x.mtx.RLock()
+	for _, sc := range x.clients {
+		sc.invalidate()
+	}
+	x.mtx.RUnlock()
+}
+
+func (s *singleClient) invalidate() {
+	s.Lock()
+	if s.client != nil {
+		_ = s.client.Close()
+	}
+	s.client = nil
+	s.lastAttempt = time.Now()
+	s.Unlock()
+}
+
 func (x *multiClient) ObjectPutInit(ctx context.Context, p client.PrmObjectPutInit) (res *client.ObjectWriter, err error) {
 	err = x.iterateClients(ctx, func(c clientcore.Client) error {
 		res, err = c.ObjectPutInit(ctx, p)
@@ -243,7 +286,9 @@ func (x *multiClient) Close() error {
 
 	{
 		for _, c := range x.clients {
-			_ = c.Close()
+			if c.client != nil {
+				_ = c.client.Close()
+			}
 		}
 	}
 
@@ -257,7 +302,12 @@ func (x *multiClient) RawForAddress(addr network.Address, f func(client *rawclie
 	if err != nil {
 		return err
 	}
-	return c.ExecRaw(f)
+
+	err = c.ExecRaw(f)
+	if err != nil {
+		x.ReportError(err)
+	}
+	return err
 }
 
 func (x *multiClient) client(addr network.Address) (clientcore.Client, error) {
@@ -268,20 +318,45 @@ func (x *multiClient) client(addr network.Address) (clientcore.Client, error) {
 	x.mtx.RUnlock()
 
 	if cached {
-		return c, nil
-	}
-
-	x.mtx.Lock()
-	defer x.mtx.Unlock()
-
-	c, cached = x.clients[strAddr]
-	if !cached {
-		var err error
-		c, err = x.createForAddress(addr)
-		if err != nil {
-			return nil, err
+		c.RLock()
+		if c.client != nil {
+			cl := c.client
+			c.RUnlock()
+			return cl, nil
 		}
-		x.clients[strAddr] = c
+		if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval {
+			c.RUnlock()
+			return nil, errRecentlyFailed
+		}
+		c.RUnlock()
+	} else {
+		var ok bool
+		x.mtx.Lock()
+		c, ok = x.clients[strAddr]
+		if !ok {
+			c = new(singleClient)
+			x.clients[strAddr] = c
+		}
+		x.mtx.Unlock()
 	}
-	return c, nil
+
+	c.Lock()
+	defer c.Unlock()
+
+	if c.client != nil {
+		return c.client, nil
+	}
+
+	if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval {
+		return nil, errRecentlyFailed
+	}
+
+	cl, err := x.createForAddress(addr)
+	if err != nil {
+		c.lastAttempt = time.Now()
+		return nil, err
+	}
+
+	c.client = cl
+	return cl, nil
 }
diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go
index f887b3da9..1907ae808 100644
--- a/pkg/services/object/get/v2/util.go
+++ b/pkg/services/object/get/v2/util.go
@@ -21,6 +21,7 @@ import (
 	objectSvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object"
 	getsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/get"
 	"github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal"
+	internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
 	"github.com/TrueCloudLab/frostfs-node/pkg/services/object/util"
 	apistatus "github.com/TrueCloudLab/frostfs-sdk-go/client/status"
 	frostfscrypto "github.com/TrueCloudLab/frostfs-sdk-go/crypto"
@@ -126,6 +127,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
 						break
 					}
 
+					internalclient.ReportError(c, err)
 					return nil, fmt.Errorf("reading the response failed: %w", err)
 				}
 
@@ -288,6 +290,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
 						break
 					}
 
+					internalclient.ReportError(c, err)
 					return nil, fmt.Errorf("reading the response failed: %w", err)
 				}
 
diff --git a/pkg/services/object/internal/client/client.go b/pkg/services/object/internal/client/client.go
index 2f9e95082..ff4053ff0 100644
--- a/pkg/services/object/internal/client/client.go
+++ b/pkg/services/object/internal/client/client.go
@@ -170,6 +170,8 @@ func GetObject(prm GetObjectPrm) (*GetObjectRes, error) {
 		if err == nil {
 			// pull out an error from status
 			err = apistatus.ErrFromStatus(res.Status())
+		} else {
+			ReportError(prm.cli, err)
 		}
 
 		return nil, fmt.Errorf("read object header: %w", err)
@@ -439,6 +441,8 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) {
 	cliRes, err := w.Close()
 	if err == nil {
 		err = apistatus.ErrFromStatus(cliRes.Status())
+	} else {
+		ReportError(prm.cli, err)
 	}
 
 	if err != nil {
diff --git a/pkg/services/object/internal/client/error.go b/pkg/services/object/internal/client/error.go
new file mode 100644
index 000000000..c0efe5913
--- /dev/null
+++ b/pkg/services/object/internal/client/error.go
@@ -0,0 +1,14 @@
+package internal
+
+import clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
+
+type errorReporter interface {
+	ReportError(error)
+}
+
+// ReportError drops client connection if possible.
+func ReportError(c clientcore.Client, err error) {
+	if ce, ok := c.(errorReporter); ok {
+		ce.ReportError(err)
+	}
+}
diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go
index 4ee9439ac..348bdf84a 100644
--- a/pkg/services/object/put/v2/streamer.go
+++ b/pkg/services/object/put/v2/streamer.go
@@ -11,6 +11,7 @@ import (
 	"github.com/TrueCloudLab/frostfs-node/pkg/core/client"
 	"github.com/TrueCloudLab/frostfs-node/pkg/network"
 	"github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal"
+	internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
 	putsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/put"
 	"github.com/TrueCloudLab/frostfs-node/pkg/services/object/util"
 )
@@ -153,12 +154,14 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien
 		// send init part
 		err = stream.Write(s.init)
 		if err != nil {
+			internalclient.ReportError(c, err)
 			err = fmt.Errorf("sending the initial message to stream failed: %w", err)
 			return
 		}
 
 		for i := range s.chunks {
 			if err = stream.Write(s.chunks[i]); err != nil {
+				internalclient.ReportError(c, err)
 				err = fmt.Errorf("sending the chunk %d failed: %w", i, err)
 				return
 			}