[#117] rpc: Allow to specify custom gRPC dialer
After grpc upgrade there is no DialContext call. So connection is not actually established after created. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
c9782cf3ef
commit
c866fe308e
4 changed files with 26 additions and 10 deletions
|
@ -2,6 +2,8 @@ package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CallOption is a messaging session option within Protobuf RPC.
|
// CallOption is a messaging session option within Protobuf RPC.
|
||||||
|
@ -9,6 +11,7 @@ type CallOption func(*callParameters)
|
||||||
|
|
||||||
type callParameters struct {
|
type callParameters struct {
|
||||||
ctx context.Context // nolint:containedctx
|
ctx context.Context // nolint:containedctx
|
||||||
|
dialer func(context.Context, grpc.ClientConnInterface) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCallParameters() *callParameters {
|
func defaultCallParameters() *callParameters {
|
||||||
|
@ -27,3 +30,11 @@ func WithContext(ctx context.Context) CallOption {
|
||||||
prm.ctx = ctx
|
prm.ctx = ctx
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithContext returns option to specify grpc dialer. If passed, it will be
|
||||||
|
// called after the connection is successfully created.
|
||||||
|
func WithDialer(dialer func(context.Context, grpc.ClientConnInterface) error) CallOption {
|
||||||
|
return func(prm *callParameters) {
|
||||||
|
prm.dialer = dialer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
|
|
||||||
var errInvalidEndpoint = errors.New("invalid endpoint options")
|
var errInvalidEndpoint = errors.New("invalid endpoint options")
|
||||||
|
|
||||||
func (c *Client) openGRPCConn(ctx context.Context) error {
|
func (c *Client) openGRPCConn(ctx context.Context, dialer func(ctx context.Context, cc grpcstd.ClientConnInterface) error) error {
|
||||||
if c.conn != nil {
|
if c.conn != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -21,16 +21,22 @@ func (c *Client) openGRPCConn(ctx context.Context) error {
|
||||||
return errInvalidEndpoint
|
return errInvalidEndpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
dialCtx, cancel := context.WithTimeout(ctx, c.dialTimeout)
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
c.conn, err = grpcstd.DialContext(dialCtx, c.addr, c.grpcDialOpts...)
|
c.conn, err = grpcstd.NewClient(c.addr, c.grpcDialOpts...)
|
||||||
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
return fmt.Errorf("gRPC new client: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if dialer != nil {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := dialer(ctx, c.conn); err != nil {
|
||||||
|
_ = c.conn.Close()
|
||||||
return fmt.Errorf("gRPC dial: %w", err)
|
return fmt.Errorf("gRPC dial: %w", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
|
||||||
opt(prm)
|
opt(prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.openGRPCConn(prm.ctx); err != nil {
|
if err := c.openGRPCConn(prm.ctx, prm.dialer); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,6 @@ func (c *cfg) initDefault() {
|
||||||
c.dialTimeout = defaultDialTimeout
|
c.dialTimeout = defaultDialTimeout
|
||||||
c.rwTimeout = defaultRWTimeout
|
c.rwTimeout = defaultRWTimeout
|
||||||
c.grpcDialOpts = []grpc.DialOption{
|
c.grpcDialOpts = []grpc.DialOption{
|
||||||
grpc.WithBlock(),
|
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue