diff --git a/rpc/client/client.go b/rpc/client/client.go index 40b5d1a..7e914db 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -1,29 +1,28 @@ package client import ( - "sync" - - "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/grpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) // Client represents client for exchanging messages // with a remote server using Protobuf RPC. type Client struct { - *cfg - - gRPCClientOnce sync.Once - gRPCClient *grpc.Client + cfg } // New creates, configures via options and returns new Client instance. func New(opts ...Option) *Client { - c := defaultCfg() + var c Client + c.initDefault() for _, opt := range opts { - opt(c) + opt(&c.cfg) } - return &Client{ - cfg: c, + if c.tlsCfg != nil { + c.grpcDialOpts = append(c.grpcDialOpts, grpc.WithTransportCredentials(credentials.NewTLS(c.tlsCfg))) } + + return &c } diff --git a/rpc/client/conn.go b/rpc/client/conn.go index ba53ab3..9fc7a51 100644 --- a/rpc/client/conn.go +++ b/rpc/client/conn.go @@ -9,14 +9,8 @@ import ( // Returns non-nil result after the first Init() call // completed without a connection error. // -// Conn is NPE-safe: returns nil if Client is nil. -// // Client should not be used after Close() call // on the connection: behavior is undefined. func (c *Client) Conn() io.Closer { - if c != nil { - return c.gRPCClient.Conn() - } - - return nil + return c.conn } diff --git a/rpc/client/connect.go b/rpc/client/connect.go index 2fe77d5..29f4189 100644 --- a/rpc/client/connect.go +++ b/rpc/client/connect.go @@ -7,28 +7,9 @@ import ( "net" "net/url" - "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/grpc" grpcstd "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" ) -func (c *Client) createGRPCClient(ctx context.Context) (err error) { - c.gRPCClientOnce.Do(func() { - if err = c.openGRPCConn(ctx); err != nil { - err = fmt.Errorf("open gRPC connection: %w", err) - return - } - - c.gRPCClient = grpc.New( - grpc.WithClientConnection(c.conn), - grpc.WithRWTimeout(c.rwTimeout), - ) - }) - - return -} - var errInvalidEndpoint = errors.New("invalid endpoint options") func (c *Client) openGRPCConn(ctx context.Context) error { @@ -40,21 +21,10 @@ func (c *Client) openGRPCConn(ctx context.Context) error { return errInvalidEndpoint } - var creds credentials.TransportCredentials - - if c.tlsCfg != nil { - creds = credentials.NewTLS(c.tlsCfg) - } else { - creds = insecure.NewCredentials() - } - dialCtx, cancel := context.WithTimeout(ctx, c.dialTimeout) var err error - c.conn, err = grpcstd.DialContext(dialCtx, c.addr, - grpcstd.WithTransportCredentials(creds), - grpcstd.WithBlock(), - ) + c.conn, err = grpcstd.DialContext(dialCtx, c.addr, c.grpcDialOpts...) cancel() diff --git a/rpc/client/init.go b/rpc/client/init.go index b45871b..7706c7a 100644 --- a/rpc/client/init.go +++ b/rpc/client/init.go @@ -1,11 +1,12 @@ package client import ( + "context" "io" "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/common" - "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/grpc" "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/message" + "google.golang.org/grpc" ) // MessageReader is an interface of the Message reader. @@ -45,39 +46,24 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe opt(prm) } - return c.initGRPC(info, prm) -} - -type rwGRPC struct { - grpc.MessageReadWriter -} - -func (g rwGRPC) ReadMessage(m message.Message) error { - // Can be optimized: we can create blank message here. - gm := m.ToGRPCMessage() - - if err := g.MessageReadWriter.ReadMessage(gm); err != nil { - return err - } - - return m.FromGRPCMessage(gm) -} - -func (g rwGRPC) WriteMessage(m message.Message) error { - return g.MessageReadWriter.WriteMessage(m.ToGRPCMessage()) -} - -func (c *Client) initGRPC(info common.CallMethodInfo, prm *callParameters) (MessageReadWriter, error) { - if err := c.createGRPCClient(prm.ctx); err != nil { + if err := c.openGRPCConn(prm.ctx); err != nil { return nil, err } - rw, err := c.gRPCClient.Init(info, grpc.WithContext(prm.ctx)) + ctx, cancel := context.WithCancel(prm.ctx) + stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{ + StreamName: info.Name, + ServerStreams: info.ServerStream(), + ClientStreams: info.ClientStream(), + }, toMethodName(info)) if err != nil { + cancel() return nil, err } - return &rwGRPC{ - MessageReadWriter: rw, + return &streamWrapper{ + ClientStream: stream, + cancel: cancel, + timeout: c.rwTimeout, }, nil } diff --git a/rpc/client/options.go b/rpc/client/options.go index f04fc9e..005594c 100644 --- a/rpc/client/options.go +++ b/rpc/client/options.go @@ -5,6 +5,8 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" ) const ( @@ -21,20 +23,27 @@ type cfg struct { dialTimeout time.Duration rwTimeout time.Duration - tlsCfg *tls.Config + tlsCfg *tls.Config + grpcDialOpts []grpc.DialOption conn *grpc.ClientConn } const ( - defaultDialTimeout = 5 * time.Second - defaultRWTimeout = 1 * time.Minute + defaultDialTimeout = 5 * time.Second + defaultKeepAliveTimeout = 5 * time.Second + defaultRWTimeout = 1 * time.Minute ) -func defaultCfg() *cfg { - return &cfg{ - dialTimeout: defaultDialTimeout, - rwTimeout: defaultRWTimeout, +func (c *cfg) initDefault() { + c.dialTimeout = defaultDialTimeout + c.rwTimeout = defaultRWTimeout + c.grpcDialOpts = []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Timeout: defaultKeepAliveTimeout, + }), } } @@ -117,3 +126,10 @@ func WithGRPCConn(v *grpc.ClientConn) Option { } } } + +// WithGRPCDialOptions returns an option to specify grpc.DialOption. +func WithGRPCDialOptions(opts []grpc.DialOption) Option { + return func(c *cfg) { + c.grpcDialOpts = append(c.grpcDialOpts, opts...) + } +} diff --git a/rpc/client/stream_wrapper.go b/rpc/client/stream_wrapper.go new file mode 100644 index 0000000..7f60db2 --- /dev/null +++ b/rpc/client/stream_wrapper.go @@ -0,0 +1,58 @@ +package client + +import ( + "context" + "time" + + "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/message" + "google.golang.org/grpc" +) + +type streamWrapper struct { + grpc.ClientStream + timeout time.Duration + cancel context.CancelFunc +} + +func (w streamWrapper) ReadMessage(m message.Message) error { + // Can be optimized: we can create blank message here. + gm := m.ToGRPCMessage() + + err := w.withTimeout(func() error { + return w.ClientStream.RecvMsg(gm) + }) + if err != nil { + return err + } + + return m.FromGRPCMessage(gm) +} + +func (w streamWrapper) WriteMessage(m message.Message) error { + return w.withTimeout(func() error { + return w.ClientStream.SendMsg(m.ToGRPCMessage()) + }) +} + +func (w *streamWrapper) Close() error { + return w.withTimeout(w.ClientStream.CloseSend) +} + +func (w *streamWrapper) withTimeout(closure func() error) error { + ch := make(chan error, 1) + go func() { + ch <- closure() + close(ch) + }() + + tt := time.NewTimer(w.timeout) + + select { + case err := <-ch: + tt.Stop() + return err + case <-tt.C: + w.cancel() + return context.DeadlineExceeded + } +} diff --git a/rpc/grpc/util.go b/rpc/client/util.go similarity index 93% rename from rpc/grpc/util.go rename to rpc/client/util.go index 220c656..291ebac 100644 --- a/rpc/grpc/util.go +++ b/rpc/client/util.go @@ -1,4 +1,4 @@ -package grpc +package client import ( "fmt" diff --git a/rpc/grpc/call_options.go b/rpc/grpc/call_options.go deleted file mode 100644 index 6faf019..0000000 --- a/rpc/grpc/call_options.go +++ /dev/null @@ -1,25 +0,0 @@ -package grpc - -import ( - "context" -) - -// CallOption is a messaging session option within RPC. -type CallOption func(*callParameters) - -type callParameters struct { - ctx context.Context -} - -func defaultCallParameters() *callParameters { - return &callParameters{ - ctx: context.Background(), - } -} - -// WithContext returns option to set RPC context. -func WithContext(ctx context.Context) CallOption { - return func(prm *callParameters) { - prm.ctx = ctx - } -} diff --git a/rpc/grpc/client.go b/rpc/grpc/client.go deleted file mode 100644 index 82df2ce..0000000 --- a/rpc/grpc/client.go +++ /dev/null @@ -1,23 +0,0 @@ -package grpc - -// Client represents client for exchanging messages -// with a remote server using gRPC protocol. -type Client struct { - *cfg -} - -// Option is a Client's constructor option. -type Option func(*cfg) - -// New creates, configures via options and returns new Client instance. -func New(opts ...Option) *Client { - c := defaultCfg() - - for _, opt := range opts { - opt(c) - } - - return &Client{ - cfg: c, - } -} diff --git a/rpc/grpc/conn.go b/rpc/grpc/conn.go deleted file mode 100644 index 40968b9..0000000 --- a/rpc/grpc/conn.go +++ /dev/null @@ -1,19 +0,0 @@ -package grpc - -import ( - "io" -) - -// Conn returns underlying connection. -// -// Conn is NPE-safe: returns nil if Client is nil. -// -// Client should not be used after Close() call -// on the connection: behavior is undefined. -func (c *Client) Conn() io.Closer { - if c != nil { - return c.con - } - - return nil -} diff --git a/rpc/grpc/init.go b/rpc/grpc/init.go index bc9aa9c..79f08c9 100644 --- a/rpc/grpc/init.go +++ b/rpc/grpc/init.go @@ -1,96 +1,4 @@ package grpc -import ( - "context" - "io" - "time" - - "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/common" - "google.golang.org/grpc" -) - // Message represents raw gRPC message. type Message interface{} - -// MessageReadWriter is a component interface -// for transmitting raw messages over gRPC protocol. -type MessageReadWriter interface { - // ReadMessage reads the next message from the remote server, - // and writes it to the argument. - ReadMessage(Message) error - - // WriteMessage sends message from argument to remote server. - WriteMessage(Message) error - - // Closes the communication session with the remote server. - // - // All calls to send/receive messages must be done before closing. - io.Closer -} - -type streamWrapper struct { - grpc.ClientStream - timeout time.Duration - cancel context.CancelFunc -} - -func (w streamWrapper) ReadMessage(m Message) error { - return w.withTimeout(func() error { - return w.ClientStream.RecvMsg(m) - }) -} - -func (w streamWrapper) WriteMessage(m Message) error { - return w.withTimeout(func() error { - return w.ClientStream.SendMsg(m) - }) -} - -func (w *streamWrapper) Close() error { - return w.withTimeout(w.ClientStream.CloseSend) -} - -func (w *streamWrapper) withTimeout(closure func() error) error { - ch := make(chan error, 1) - go func() { - ch <- closure() - close(ch) - }() - - tt := time.NewTimer(w.timeout) - - select { - case err := <-ch: - tt.Stop() - return err - case <-tt.C: - w.cancel() - return context.DeadlineExceeded - } -} - -// Init initiates a messaging session within the RPC configured by options. -func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) { - prm := defaultCallParameters() - - for _, opt := range opts { - opt(prm) - } - - ctx, cancel := context.WithCancel(prm.ctx) - stream, err := c.con.NewStream(ctx, &grpc.StreamDesc{ - StreamName: info.Name, - ServerStreams: info.ServerStream(), - ClientStreams: info.ClientStream(), - }, toMethodName(info)) - if err != nil { - cancel() - return nil, err - } - - return &streamWrapper{ - ClientStream: stream, - cancel: cancel, - timeout: c.rwTimeout, - }, nil -} diff --git a/rpc/grpc/options.go b/rpc/grpc/options.go deleted file mode 100644 index 29b07ba..0000000 --- a/rpc/grpc/options.go +++ /dev/null @@ -1,36 +0,0 @@ -package grpc - -import ( - "time" - - "google.golang.org/grpc" -) - -const defaultRWTimeout = 1 * time.Minute - -type cfg struct { - con *grpc.ClientConn - rwTimeout time.Duration -} - -func defaultCfg() *cfg { - return &cfg{ - rwTimeout: defaultRWTimeout, - } -} - -// WithClientConnection returns option to set gRPC connection -// to the remote server. -func WithClientConnection(con *grpc.ClientConn) Option { - return func(c *cfg) { - c.con = con - } -} - -// WithRWTimeout returns option to specify rwTimeout -// for reading and writing single gRPC message. -func WithRWTimeout(t time.Duration) Option { - return func(c *cfg) { - c.rwTimeout = t - } -}