forked from TrueCloudLab/frostfs-node
[#2164] network/cache: Do not reconnect to failed clients immediately
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
f3caf6acfe
commit
6f5edac730
5 changed files with 120 additions and 21 deletions
117
pkg/network/cache/multi.go
vendored
117
pkg/network/cache/multi.go
vendored
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
rawclient "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
|
@ -12,23 +13,34 @@ import (
|
|||
"github.com/TrueCloudLab/frostfs-sdk-go/client"
|
||||
)
|
||||
|
||||
type singleClient struct {
|
||||
sync.RWMutex
|
||||
client clientcore.Client
|
||||
lastAttempt time.Time
|
||||
}
|
||||
|
||||
type multiClient struct {
|
||||
mtx sync.RWMutex
|
||||
|
||||
clients map[string]clientcore.Client
|
||||
clients map[string]*singleClient
|
||||
|
||||
// addrMtx protects addr field. Should not be taken before the mtx.
|
||||
addrMtx sync.RWMutex
|
||||
addr network.AddressGroup
|
||||
|
||||
opts ClientCacheOpts
|
||||
|
||||
reconnectInterval time.Duration
|
||||
}
|
||||
|
||||
const defaultReconnectInterval = time.Second * 30
|
||||
|
||||
func newMultiClient(addr network.AddressGroup, opts ClientCacheOpts) *multiClient {
|
||||
return &multiClient{
|
||||
clients: make(map[string]clientcore.Client),
|
||||
addr: addr,
|
||||
opts: opts,
|
||||
clients: make(map[string]*singleClient),
|
||||
addr: addr,
|
||||
opts: opts,
|
||||
reconnectInterval: defaultReconnectInterval,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -110,6 +122,8 @@ loop:
|
|||
x.addrMtx.Unlock()
|
||||
}
|
||||
|
||||
var errRecentlyFailed = errors.New("client has recently failed, skipping")
|
||||
|
||||
func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Client) error) error {
|
||||
var firstErr error
|
||||
|
||||
|
@ -134,16 +148,45 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
|
|||
|
||||
success := err == nil || errors.Is(err, context.Canceled)
|
||||
|
||||
if success || firstErr == nil {
|
||||
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
|
||||
firstErr = err
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
x.ReportError(err)
|
||||
}
|
||||
|
||||
return success
|
||||
})
|
||||
|
||||
return firstErr
|
||||
}
|
||||
|
||||
func (x *multiClient) ReportError(err error) {
|
||||
if errors.Is(err, errRecentlyFailed) {
|
||||
return
|
||||
}
|
||||
|
||||
// Dropping all clients here is not necessary, we do this
|
||||
// because `multiClient` doesn't yet provide convenient interface
|
||||
// for reporting individual errors for streaming operations.
|
||||
x.mtx.RLock()
|
||||
for _, sc := range x.clients {
|
||||
sc.invalidate()
|
||||
}
|
||||
x.mtx.RUnlock()
|
||||
}
|
||||
|
||||
func (s *singleClient) invalidate() {
|
||||
s.Lock()
|
||||
if s.client != nil {
|
||||
_ = s.client.Close()
|
||||
}
|
||||
s.client = nil
|
||||
s.lastAttempt = time.Now()
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (x *multiClient) ObjectPutInit(ctx context.Context, p client.PrmObjectPutInit) (res *client.ObjectWriter, err error) {
|
||||
err = x.iterateClients(ctx, func(c clientcore.Client) error {
|
||||
res, err = c.ObjectPutInit(ctx, p)
|
||||
|
@ -243,7 +286,9 @@ func (x *multiClient) Close() error {
|
|||
|
||||
{
|
||||
for _, c := range x.clients {
|
||||
_ = c.Close()
|
||||
if c.client != nil {
|
||||
_ = c.client.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -257,7 +302,12 @@ func (x *multiClient) RawForAddress(addr network.Address, f func(client *rawclie
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.ExecRaw(f)
|
||||
|
||||
err = c.ExecRaw(f)
|
||||
if err != nil {
|
||||
x.ReportError(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (x *multiClient) client(addr network.Address) (clientcore.Client, error) {
|
||||
|
@ -268,20 +318,45 @@ func (x *multiClient) client(addr network.Address) (clientcore.Client, error) {
|
|||
x.mtx.RUnlock()
|
||||
|
||||
if cached {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
x.mtx.Lock()
|
||||
defer x.mtx.Unlock()
|
||||
|
||||
c, cached = x.clients[strAddr]
|
||||
if !cached {
|
||||
var err error
|
||||
c, err = x.createForAddress(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
c.RLock()
|
||||
if c.client != nil {
|
||||
cl := c.client
|
||||
c.RUnlock()
|
||||
return cl, nil
|
||||
}
|
||||
x.clients[strAddr] = c
|
||||
if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval {
|
||||
c.RUnlock()
|
||||
return nil, errRecentlyFailed
|
||||
}
|
||||
c.RUnlock()
|
||||
} else {
|
||||
var ok bool
|
||||
x.mtx.Lock()
|
||||
c, ok = x.clients[strAddr]
|
||||
if !ok {
|
||||
c = new(singleClient)
|
||||
x.clients[strAddr] = c
|
||||
}
|
||||
x.mtx.Unlock()
|
||||
}
|
||||
return c, nil
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
if c.client != nil {
|
||||
return c.client, nil
|
||||
}
|
||||
|
||||
if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval {
|
||||
return nil, errRecentlyFailed
|
||||
}
|
||||
|
||||
cl, err := x.createForAddress(addr)
|
||||
if err != nil {
|
||||
c.lastAttempt = time.Now()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.client = cl
|
||||
return cl, nil
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
objectSvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
getsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
apistatus "github.com/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
frostfscrypto "github.com/TrueCloudLab/frostfs-sdk-go/crypto"
|
||||
|
@ -126,6 +127,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
|
|||
break
|
||||
}
|
||||
|
||||
internalclient.ReportError(c, err)
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
|
@ -288,6 +290,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
|||
break
|
||||
}
|
||||
|
||||
internalclient.ReportError(c, err)
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -170,6 +170,8 @@ func GetObject(prm GetObjectPrm) (*GetObjectRes, error) {
|
|||
if err == nil {
|
||||
// pull out an error from status
|
||||
err = apistatus.ErrFromStatus(res.Status())
|
||||
} else {
|
||||
ReportError(prm.cli, err)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("read object header: %w", err)
|
||||
|
@ -439,6 +441,8 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) {
|
|||
cliRes, err := w.Close()
|
||||
if err == nil {
|
||||
err = apistatus.ErrFromStatus(cliRes.Status())
|
||||
} else {
|
||||
ReportError(prm.cli, err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
14
pkg/services/object/internal/client/error.go
Normal file
14
pkg/services/object/internal/client/error.go
Normal file
|
@ -0,0 +1,14 @@
|
|||
package internal
|
||||
|
||||
import clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
|
||||
type errorReporter interface {
|
||||
ReportError(error)
|
||||
}
|
||||
|
||||
// ReportError drops client connection if possible.
|
||||
func ReportError(c clientcore.Client, err error) {
|
||||
if ce, ok := c.(errorReporter); ok {
|
||||
ce.ReportError(err)
|
||||
}
|
||||
}
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
putsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
)
|
||||
|
@ -153,12 +154,14 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien
|
|||
// send init part
|
||||
err = stream.Write(s.init)
|
||||
if err != nil {
|
||||
internalclient.ReportError(c, err)
|
||||
err = fmt.Errorf("sending the initial message to stream failed: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
for i := range s.chunks {
|
||||
if err = stream.Write(s.chunks[i]); err != nil {
|
||||
internalclient.ReportError(c, err)
|
||||
err = fmt.Errorf("sending the chunk %d failed: %w", i, err)
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue