diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 152b4b92..64b14282 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" rawclient "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/client" clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client" @@ -12,23 +13,34 @@ import ( "github.com/TrueCloudLab/frostfs-sdk-go/client" ) +type singleClient struct { + sync.RWMutex + client clientcore.Client + lastAttempt time.Time +} + type multiClient struct { mtx sync.RWMutex - clients map[string]clientcore.Client + clients map[string]*singleClient // addrMtx protects addr field. Should not be taken before the mtx. addrMtx sync.RWMutex addr network.AddressGroup opts ClientCacheOpts + + reconnectInterval time.Duration } +const defaultReconnectInterval = time.Second * 30 + func newMultiClient(addr network.AddressGroup, opts ClientCacheOpts) *multiClient { return &multiClient{ - clients: make(map[string]clientcore.Client), - addr: addr, - opts: opts, + clients: make(map[string]*singleClient), + addr: addr, + opts: opts, + reconnectInterval: defaultReconnectInterval, } } @@ -110,6 +122,8 @@ loop: x.addrMtx.Unlock() } +var errRecentlyFailed = errors.New("client has recently failed, skipping") + func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Client) error) error { var firstErr error @@ -134,16 +148,45 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie success := err == nil || errors.Is(err, context.Canceled) - if success || firstErr == nil { + if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) { firstErr = err } + if err != nil { + x.ReportError(err) + } + return success }) return firstErr } +func (x *multiClient) ReportError(err error) { + if errors.Is(err, errRecentlyFailed) { + return + } + + // Dropping all clients here is not necessary, we do this + // because `multiClient` doesn't yet provide convenient interface + // for reporting individual errors for streaming operations. + x.mtx.RLock() + for _, sc := range x.clients { + sc.invalidate() + } + x.mtx.RUnlock() +} + +func (s *singleClient) invalidate() { + s.Lock() + if s.client != nil { + _ = s.client.Close() + } + s.client = nil + s.lastAttempt = time.Now() + s.Unlock() +} + func (x *multiClient) ObjectPutInit(ctx context.Context, p client.PrmObjectPutInit) (res *client.ObjectWriter, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { res, err = c.ObjectPutInit(ctx, p) @@ -243,7 +286,9 @@ func (x *multiClient) Close() error { { for _, c := range x.clients { - _ = c.Close() + if c.client != nil { + _ = c.client.Close() + } } } @@ -257,7 +302,12 @@ func (x *multiClient) RawForAddress(addr network.Address, f func(client *rawclie if err != nil { return err } - return c.ExecRaw(f) + + err = c.ExecRaw(f) + if err != nil { + x.ReportError(err) + } + return err } func (x *multiClient) client(addr network.Address) (clientcore.Client, error) { @@ -268,20 +318,45 @@ func (x *multiClient) client(addr network.Address) (clientcore.Client, error) { x.mtx.RUnlock() if cached { - return c, nil - } - - x.mtx.Lock() - defer x.mtx.Unlock() - - c, cached = x.clients[strAddr] - if !cached { - var err error - c, err = x.createForAddress(addr) - if err != nil { - return nil, err + c.RLock() + if c.client != nil { + cl := c.client + c.RUnlock() + return cl, nil } - x.clients[strAddr] = c + if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval { + c.RUnlock() + return nil, errRecentlyFailed + } + c.RUnlock() + } else { + var ok bool + x.mtx.Lock() + c, ok = x.clients[strAddr] + if !ok { + c = new(singleClient) + x.clients[strAddr] = c + } + x.mtx.Unlock() } - return c, nil + + c.Lock() + defer c.Unlock() + + if c.client != nil { + return c.client, nil + } + + if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval { + return nil, errRecentlyFailed + } + + cl, err := x.createForAddress(addr) + if err != nil { + c.lastAttempt = time.Now() + return nil, err + } + + c.client = cl + return cl, nil } diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index f887b3da..1907ae80 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -21,6 +21,7 @@ import ( objectSvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object" getsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/get" "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal" + internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "github.com/TrueCloudLab/frostfs-node/pkg/services/object/util" apistatus "github.com/TrueCloudLab/frostfs-sdk-go/client/status" frostfscrypto "github.com/TrueCloudLab/frostfs-sdk-go/crypto" @@ -126,6 +127,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre break } + internalclient.ReportError(c, err) return nil, fmt.Errorf("reading the response failed: %w", err) } @@ -288,6 +290,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get break } + internalclient.ReportError(c, err) return nil, fmt.Errorf("reading the response failed: %w", err) } diff --git a/pkg/services/object/internal/client/client.go b/pkg/services/object/internal/client/client.go index 2f9e9508..ff4053ff 100644 --- a/pkg/services/object/internal/client/client.go +++ b/pkg/services/object/internal/client/client.go @@ -170,6 +170,8 @@ func GetObject(prm GetObjectPrm) (*GetObjectRes, error) { if err == nil { // pull out an error from status err = apistatus.ErrFromStatus(res.Status()) + } else { + ReportError(prm.cli, err) } return nil, fmt.Errorf("read object header: %w", err) @@ -439,6 +441,8 @@ func PutObject(prm PutObjectPrm) (*PutObjectRes, error) { cliRes, err := w.Close() if err == nil { err = apistatus.ErrFromStatus(cliRes.Status()) + } else { + ReportError(prm.cli, err) } if err != nil { diff --git a/pkg/services/object/internal/client/error.go b/pkg/services/object/internal/client/error.go new file mode 100644 index 00000000..c0efe591 --- /dev/null +++ b/pkg/services/object/internal/client/error.go @@ -0,0 +1,14 @@ +package internal + +import clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client" + +type errorReporter interface { + ReportError(error) +} + +// ReportError drops client connection if possible. +func ReportError(c clientcore.Client, err error) { + if ce, ok := c.(errorReporter); ok { + ce.ReportError(err) + } +} diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index 4ee9439a..348bdf84 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -11,6 +11,7 @@ import ( "github.com/TrueCloudLab/frostfs-node/pkg/core/client" "github.com/TrueCloudLab/frostfs-node/pkg/network" "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal" + internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" putsvc "github.com/TrueCloudLab/frostfs-node/pkg/services/object/put" "github.com/TrueCloudLab/frostfs-node/pkg/services/object/util" ) @@ -153,12 +154,14 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien // send init part err = stream.Write(s.init) if err != nil { + internalclient.ReportError(c, err) err = fmt.Errorf("sending the initial message to stream failed: %w", err) return } for i := range s.chunks { if err = stream.Write(s.chunks[i]); err != nil { + internalclient.ReportError(c, err) err = fmt.Errorf("sending the chunk %d failed: %w", i, err) return }