[#114] tree: Don't ignore unhealthy endpoints

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
pull/116/head
Denis Kirillov 2023-05-19 16:32:17 +03:00
parent 24390fdec8
commit 136a186c14
3 changed files with 151 additions and 66 deletions

View File

@ -0,0 +1,70 @@
package client
import (
"context"
"fmt"
"sync"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
"google.golang.org/grpc"
)
type TreeClient struct {
mu sync.RWMutex
address string
opts []grpc.DialOption
conn *grpc.ClientConn
service grpcService.TreeServiceClient
dialed bool
}
// NewTreeClient creates new tree client with auto dial.
func NewTreeClient(addr string, opts ...grpc.DialOption) *TreeClient {
return &TreeClient{
address: addr,
opts: opts,
}
}
func (c *TreeClient) dial(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.dialed {
return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
}
conn, err := grpc.Dial(c.address, c.opts...)
if err != nil {
return fmt.Errorf("grpc dial node tree service: %w", err)
}
serviceClient := grpcService.NewTreeServiceClient(conn)
if _, err = serviceClient.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
return fmt.Errorf("healthcheck tree service: %w", err)
}
c.conn = conn
c.service = serviceClient
c.dialed = true
return nil
}
func (c *TreeClient) TreeClient(ctx context.Context) (grpcService.TreeServiceClient, error) {
c.mu.RLock()
dialed := c.dialed
c.mu.RUnlock()
if !dialed {
if err := c.dial(ctx); err != nil {
return nil, err
}
}
return c.service, nil
}
func (c *TreeClient) Address() string {
return c.address
}

View File

@ -13,6 +13,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
treeClient "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/client"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree" grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -71,20 +72,22 @@ func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
type ServiceClientGRPC struct { type ServiceClientGRPC struct {
key *keys.PrivateKey key *keys.PrivateKey
log *zap.Logger log *zap.Logger
clients []treeClient clients []*treeClient.TreeClient
startIndex int32 startIndex int32
} }
type treeClient struct { func (c *ServiceClientGRPC) getStartIndex() int {
address string return int(atomic.LoadInt32(&c.startIndex))
conn *grpc.ClientConn }
service grpcService.TreeServiceClient
func (c *ServiceClientGRPC) setStartIndex(index int) {
atomic.StoreInt32(&c.startIndex, int32(index))
} }
func (c *ServiceClientGRPC) Endpoints() []string { func (c *ServiceClientGRPC) Endpoints() []string {
res := make([]string, len(c.clients)) res := make([]string, len(c.clients))
for i, client := range c.clients { for i, client := range c.clients {
res[i] = client.address res[i] = client.Address()
} }
return res return res
} }
@ -95,30 +98,26 @@ func NewTreeServiceClientGRPC(ctx context.Context, endpoints []string, key *keys
log: log, log: log,
} }
for _, addr := range endpoints { firstHealthy := -1
conn, err := grpc.Dial(addr, grpcOpts...)
if err != nil { res.clients = make([]*treeClient.TreeClient, len(endpoints))
log.Warn("dial node tree service", zap.String("address", addr), zap.Error(err)) for i, addr := range endpoints {
res.clients[i] = treeClient.NewTreeClient(addr, grpcOpts...)
if _, err := res.clients[i].TreeClient(ctx); err != nil {
log.Warn("dial tree", zap.String("address", addr), zap.Error(err))
continue continue
} }
if firstHealthy == -1 {
c := grpcService.NewTreeServiceClient(conn) firstHealthy = i
if _, err = c.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
log.Warn("healthcheck tree service", zap.String("address", addr), zap.Error(err))
continue
} }
res.clients = append(res.clients, treeClient{
address: addr,
conn: conn,
service: c,
})
} }
if len(res.clients) == 0 { if firstHealthy == -1 {
return nil, errors.New("no healthy tree grpc client") return nil, errors.New("no healthy tree grpc client")
} }
res.setStartIndex(firstHealthy)
return res, nil return res, nil
} }
@ -149,8 +148,8 @@ func (c *ServiceClientGRPC) GetNodes(ctx context.Context, p *tree.GetNodesParams
zap.String("method", "GetNodeByPath")) zap.String("method", "GetNodeByPath"))
var resp *grpcService.GetNodeByPathResponse var resp *grpcService.GetNodeByPathResponse
if err := c.requestWithRetry(log, func(client treeClient) (inErr error) { if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.service.GetNodeByPath(ctx, request) resp, inErr = client.GetNodeByPath(ctx, request)
return handleError("failed to get node by path", inErr) return handleError("failed to get node by path", inErr)
}); err != nil { }); err != nil {
return nil, err return nil, err
@ -188,8 +187,8 @@ func (c *ServiceClientGRPC) GetSubTree(ctx context.Context, bktInfo *data.Bucket
zap.String("method", "GetSubTree")) zap.String("method", "GetSubTree"))
var cli grpcService.TreeService_GetSubTreeClient var cli grpcService.TreeService_GetSubTreeClient
if err := c.requestWithRetry(log, func(client treeClient) (inErr error) { if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.service.GetSubTree(ctx, request) cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr) return handleError("failed to get sub tree client", inErr)
}); err != nil { }); err != nil {
return nil, err return nil, err
@ -232,8 +231,8 @@ func (c *ServiceClientGRPC) AddNode(ctx context.Context, bktInfo *data.BucketInf
zap.String("method", "Add")) zap.String("method", "Add"))
var resp *grpcService.AddResponse var resp *grpcService.AddResponse
if err := c.requestWithRetry(log, func(client treeClient) (inErr error) { if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.service.Add(ctx, request) resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr) return handleError("failed to add node", inErr)
}); err != nil { }); err != nil {
return 0, err return 0, err
@ -267,8 +266,8 @@ func (c *ServiceClientGRPC) AddNodeByPath(ctx context.Context, bktInfo *data.Buc
zap.String("method", "AddByPath")) zap.String("method", "AddByPath"))
var resp *grpcService.AddByPathResponse var resp *grpcService.AddByPathResponse
if err := c.requestWithRetry(log, func(client treeClient) (inErr error) { if err := c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.service.AddByPath(ctx, request) resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr) return handleError("failed to add node by path", inErr)
}); err != nil { }); err != nil {
return 0, err return 0, err
@ -309,8 +308,8 @@ func (c *ServiceClientGRPC) MoveNode(ctx context.Context, bktInfo *data.BucketIn
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID), log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
zap.String("method", "Move")) zap.String("method", "Move"))
return c.requestWithRetry(log, func(client treeClient) error { return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error {
if _, err := client.service.Move(ctx, request); err != nil { if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err) return handleError("failed to move node", err)
} }
return nil return nil
@ -338,24 +337,31 @@ func (c *ServiceClientGRPC) RemoveNode(ctx context.Context, bktInfo *data.Bucket
log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID), log := c.log.With(zap.String("request_id", api.GetRequestID(ctx)), zap.String("tree", treeID),
zap.String("method", "Remove")) zap.String("method", "Remove"))
return c.requestWithRetry(log, func(client treeClient) error { return c.requestWithRetry(ctx, log, func(client grpcService.TreeServiceClient) error {
if _, err := client.service.Remove(ctx, request); err != nil { if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err) return handleError("failed to remove node", err)
} }
return nil return nil
}) })
} }
func (c *ServiceClientGRPC) requestWithRetry(log *zap.Logger, fn func(client treeClient) error) (err error) { func (c *ServiceClientGRPC) requestWithRetry(ctx context.Context, log *zap.Logger, fn func(client grpcService.TreeServiceClient) error) error {
start := int(atomic.LoadInt32(&c.startIndex)) var (
err error
cl grpcService.TreeServiceClient
)
start := c.getStartIndex()
for i := start; i < start+len(c.clients); i++ { for i := start; i < start+len(c.clients); i++ {
index := i % len(c.clients) index := i % len(c.clients)
err = fn(c.clients[index]) if cl, err = c.clients[index].TreeClient(ctx); err == nil {
err = fn(cl)
}
if !shouldTryAgain(err) { if !shouldTryAgain(err) {
atomic.StoreInt32(&c.startIndex, int32(index)) c.setStartIndex(index)
return err return err
} }
log.Debug("tree request error", zap.String("address", c.clients[index].address), zap.Error(err)) log.Debug("tree request error", zap.String("address", c.clients[index].Address()), zap.Error(err))
} }
return err return err

View File

@ -1,11 +1,15 @@
package services package services
import ( import (
"context"
"errors" "errors"
"sync/atomic" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/client"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services/tree"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -37,56 +41,61 @@ func TestHandleError(t *testing.T) {
} }
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
ctx := context.Background()
log := zaptest.NewLogger(t)
grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
cl := &ServiceClientGRPC{ cl := &ServiceClientGRPC{
log: zaptest.NewLogger(t), log: zaptest.NewLogger(t),
clients: []treeClient{ clients: []*client.TreeClient{
{address: "node0"}, client.NewTreeClient("node0", grpcDialOpt),
{address: "node1"}, client.NewTreeClient("node1", grpcDialOpt),
{address: "node2"}, client.NewTreeClient("node2", grpcDialOpt),
{address: "node3"}, client.NewTreeClient("node3", grpcDialOpt),
}, },
} }
makeFn := func(shouldFail []string) func(treeClient) error { makeFn := func(shouldFail []string) func(grpcService.TreeServiceClient) error {
return func(client treeClient) error { return func(client grpcService.TreeServiceClient) error {
for _, item := range shouldFail {
if item == client.address { //for _, item := range shouldFail {
return errors.New("not found") //if item == client.address {
} // return errors.New("not found")
} //}
//}
return nil return nil
} }
} }
t.Run("first ok", func(t *testing.T) { t.Run("first ok", func(t *testing.T) {
err := cl.requestWithRetry(makeFn([]string{})) err := cl.requestWithRetry(ctx, log, makeFn([]string{}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex))) require.Equal(t, 0, cl.getStartIndex())
atomic.StoreInt32(&cl.startIndex, 0) cl.setStartIndex(0)
}) })
t.Run("first failed", func(t *testing.T) { t.Run("first failed", func(t *testing.T) {
err := cl.requestWithRetry(makeFn([]string{"node0"})) err := cl.requestWithRetry(ctx, log, makeFn([]string{"node0"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, int(atomic.LoadInt32(&cl.startIndex))) require.Equal(t, 1, cl.getStartIndex())
atomic.StoreInt32(&cl.startIndex, 0) cl.setStartIndex(0)
}) })
t.Run("all failed", func(t *testing.T) { t.Run("all failed", func(t *testing.T) {
err := cl.requestWithRetry(makeFn([]string{"node0", "node1", "node2", "node3"})) err := cl.requestWithRetry(ctx, log, makeFn([]string{"node0", "node1", "node2", "node3"}))
require.Error(t, err) require.Error(t, err)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex))) require.Equal(t, 0, cl.getStartIndex())
atomic.StoreInt32(&cl.startIndex, 0) cl.setStartIndex(0)
}) })
t.Run("round", func(t *testing.T) { t.Run("round", func(t *testing.T) {
err := cl.requestWithRetry(makeFn([]string{"node0", "node1"})) err := cl.requestWithRetry(ctx, log, makeFn([]string{"node0", "node1"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 2, int(atomic.LoadInt32(&cl.startIndex))) require.Equal(t, 2, cl.getStartIndex())
err = cl.requestWithRetry(makeFn([]string{"node2", "node3"})) err = cl.requestWithRetry(ctx, log, makeFn([]string{"node2", "node3"}))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 0, int(atomic.LoadInt32(&cl.startIndex))) require.Equal(t, 0, cl.getStartIndex())
atomic.StoreInt32(&cl.startIndex, 0) cl.setStartIndex(0)
}) })
} }