[#117] rpc: Allow to specify custom gRPC dialer
All checks were successful
Tests and linters / Lint (pull_request) Successful in 41s
DCO action / DCO (pull_request) Successful in 45s
Tests and linters / Tests (pull_request) Successful in 46s
Tests and linters / Tests with -race (pull_request) Successful in 56s

After grpc upgrade there is no DialContext call.
So connection is not actually established after created.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-09-13 17:18:53 +03:00
parent c9782cf3ef
commit 13fa0da374
4 changed files with 26 additions and 10 deletions

View file

@ -2,6 +2,8 @@ package client
import (
"context"
"google.golang.org/grpc"
)
// CallOption is a messaging session option within Protobuf RPC.
@ -9,6 +11,7 @@ type CallOption func(*callParameters)
type callParameters struct {
ctx context.Context // nolint:containedctx
dialer func(context.Context, grpc.ClientConnInterface) error
}
func defaultCallParameters() *callParameters {
@ -27,3 +30,11 @@ func WithContext(ctx context.Context) CallOption {
prm.ctx = ctx
}
}
// WithDialer returns option to specify grpc dialer. If passed, it will be
// called after the connection is successfully created.
func WithDialer(dialer func(context.Context, grpc.ClientConnInterface) error) CallOption {
return func(prm *callParameters) {
prm.dialer = dialer
}
}

View file

@ -12,7 +12,7 @@ import (
var errInvalidEndpoint = errors.New("invalid endpoint options")
func (c *Client) openGRPCConn(ctx context.Context) error {
func (c *Client) openGRPCConn(ctx context.Context, dialer func(ctx context.Context, cc grpcstd.ClientConnInterface) error) error {
if c.conn != nil {
return nil
}
@ -21,16 +21,22 @@ func (c *Client) openGRPCConn(ctx context.Context) error {
return errInvalidEndpoint
}
dialCtx, cancel := context.WithTimeout(ctx, c.dialTimeout)
var err error
c.conn, err = grpcstd.DialContext(dialCtx, c.addr, c.grpcDialOpts...)
cancel()
c.conn, err = grpcstd.NewClient(c.addr, c.grpcDialOpts...)
if err != nil {
return fmt.Errorf("gRPC new client: %w", err)
}
if dialer != nil {
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
defer cancel()
if err := dialer(ctx, c.conn); err != nil {
_ = c.conn.Close()
return fmt.Errorf("gRPC dial: %w", err)
}
}
return nil
}

View file

@ -46,7 +46,7 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
opt(prm)
}
if err := c.openGRPCConn(prm.ctx); err != nil {
if err := c.openGRPCConn(prm.ctx, prm.dialer); err != nil {
return nil, err
}

View file

@ -37,7 +37,6 @@ func (c *cfg) initDefault() {
c.dialTimeout = defaultDialTimeout
c.rwTimeout = defaultRWTimeout
c.grpcDialOpts = []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
}