From cf765a61a6a502cca1aae5d318d93dff59586209 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 12 Mar 2021 14:17:16 +0300 Subject: [PATCH] [#263] Implement client for exchanging raw messages using gRPC protocol Implement gRPC client that can uniformly execute any RPC on the remote server. In the primary implementation, the client is a thin wrapper over gRPC client connection that is required to create the client. In the future, it is planned to expand the library with convenient functionality. Signed-off-by: Leonard Lyubich --- rpc/common/call.go | 75 ++++++++++++++++++++++++++++++++++++++++ rpc/common/call_test.go | 49 ++++++++++++++++++++++++++ rpc/grpc/call_options.go | 25 ++++++++++++++ rpc/grpc/client.go | 23 ++++++++++++ rpc/grpc/init.go | 65 ++++++++++++++++++++++++++++++++++ rpc/grpc/options.go | 21 +++++++++++ rpc/grpc/util.go | 13 +++++++ 7 files changed, 271 insertions(+) create mode 100644 rpc/common/call.go create mode 100644 rpc/common/call_test.go create mode 100644 rpc/grpc/call_options.go create mode 100644 rpc/grpc/client.go create mode 100644 rpc/grpc/init.go create mode 100644 rpc/grpc/options.go create mode 100644 rpc/grpc/util.go diff --git a/rpc/common/call.go b/rpc/common/call.go new file mode 100644 index 0000000..bc3410a --- /dev/null +++ b/rpc/common/call.go @@ -0,0 +1,75 @@ +package common + +type callType uint8 + +const ( + _ callType = iota + callUnary + callClientStream + callServerStream + callBidirStream +) + +// CallMethodInfo is an information about the RPC. +type CallMethodInfo struct { + // Name of the service. + Service string + + // Name of the RPC. + Name string + + t callType +} + +// ServerStream checks if CallMethodInfo contains +// information about the server-side streaming RPC. +func (c CallMethodInfo) ServerStream() bool { + return c.t == callServerStream || c.t == callBidirStream +} + +// ClientStream checks if CallMethodInfo contains +// information about the client-side streaming RPC. +func (c CallMethodInfo) ClientStream() bool { + return c.t == callClientStream || c.t == callBidirStream +} + +func (c *CallMethodInfo) setCommon(service, name string) { + c.Service = service + c.Name = name +} + +// CallMethodInfoUnary returns CallMethodInfo structure +// initialized for the unary RPC. +func CallMethodInfoUnary(service, name string) (info CallMethodInfo) { + info.setCommon(service, name) + info.t = callUnary + + return +} + +// CallMethodInfoClientStream returns CallMethodInfo structure +// initialized for the client-side streaming RPC. +func CallMethodInfoClientStream(service, name string) (info CallMethodInfo) { + info.setCommon(service, name) + info.t = callClientStream + + return +} + +// CallMethodInfoServerStream returns CallMethodInfo structure +// initialized for the server-side streaming RPC. +func CallMethodInfoServerStream(service, name string) (info CallMethodInfo) { + info.setCommon(service, name) + info.t = callServerStream + + return +} + +// CallMethodInfoBidirectionalStream returns CallMethodInfo structure +// initialized for the bidirectional streaming RPC. +func CallMethodInfoBidirectionalStream(service, name string) (info CallMethodInfo) { + info.setCommon(service, name) + info.t = callBidirStream + + return +} diff --git a/rpc/common/call_test.go b/rpc/common/call_test.go new file mode 100644 index 0000000..e1414e3 --- /dev/null +++ b/rpc/common/call_test.go @@ -0,0 +1,49 @@ +package common_test + +import ( + "testing" + + "github.com/nspcc-dev/neofs-api-go/rpc/common" + "github.com/stretchr/testify/require" +) + +const ( + testServiceName = "test service" + testRPCName = "test RPC" +) + +func TestCallMethodInfoUnary(t *testing.T) { + i := common.CallMethodInfoUnary(testServiceName, testRPCName) + + require.Equal(t, testServiceName, i.Service) + require.Equal(t, testRPCName, i.Name) + require.False(t, i.ClientStream()) + require.False(t, i.ServerStream()) +} + +func TestCallMethodInfoServerStream(t *testing.T) { + i := common.CallMethodInfoServerStream(testServiceName, testRPCName) + + require.Equal(t, testServiceName, i.Service) + require.Equal(t, testRPCName, i.Name) + require.False(t, i.ClientStream()) + require.True(t, i.ServerStream()) +} + +func TestCallMethodInfoClientStream(t *testing.T) { + i := common.CallMethodInfoClientStream(testServiceName, testRPCName) + + require.Equal(t, testServiceName, i.Service) + require.Equal(t, testRPCName, i.Name) + require.True(t, i.ClientStream()) + require.False(t, i.ServerStream()) +} + +func TestCallMethodInfoBidirectionalStream(t *testing.T) { + i := common.CallMethodInfoBidirectionalStream(testServiceName, testRPCName) + + require.Equal(t, testServiceName, i.Service) + require.Equal(t, testRPCName, i.Name) + require.True(t, i.ClientStream()) + require.True(t, i.ServerStream()) +} diff --git a/rpc/grpc/call_options.go b/rpc/grpc/call_options.go new file mode 100644 index 0000000..6faf019 --- /dev/null +++ b/rpc/grpc/call_options.go @@ -0,0 +1,25 @@ +package grpc + +import ( + "context" +) + +// CallOption is a messaging session option within RPC. +type CallOption func(*callParameters) + +type callParameters struct { + ctx context.Context +} + +func defaultCallParameters() *callParameters { + return &callParameters{ + ctx: context.Background(), + } +} + +// WithContext returns option to set RPC context. +func WithContext(ctx context.Context) CallOption { + return func(prm *callParameters) { + prm.ctx = ctx + } +} diff --git a/rpc/grpc/client.go b/rpc/grpc/client.go new file mode 100644 index 0000000..82df2ce --- /dev/null +++ b/rpc/grpc/client.go @@ -0,0 +1,23 @@ +package grpc + +// Client represents client for exchanging messages +// with a remote server using gRPC protocol. +type Client struct { + *cfg +} + +// Option is a Client's constructor option. +type Option func(*cfg) + +// New creates, configures via options and returns new Client instance. +func New(opts ...Option) *Client { + c := defaultCfg() + + for _, opt := range opts { + opt(c) + } + + return &Client{ + cfg: c, + } +} diff --git a/rpc/grpc/init.go b/rpc/grpc/init.go new file mode 100644 index 0000000..ec3bc5a --- /dev/null +++ b/rpc/grpc/init.go @@ -0,0 +1,65 @@ +package grpc + +import ( + "io" + + "github.com/nspcc-dev/neofs-api-go/rpc/common" + "google.golang.org/grpc" +) + +// Message represents raw gRPC message. +type Message interface{} + +// MessageReadWriter is a component interface +// for transmitting raw messages over gRPC protocol. +type MessageReadWriter interface { + // ReadMessage reads the next message from the remote server, + // and writes it to the argument. + ReadMessage(Message) error + + // WriteMessage sends message from argument to remote server. + WriteMessage(Message) error + + // Closes the communication session with the remote server. + // + // All calls to send/receive messages must be done before closing. + io.Closer +} + +type streamWrapper struct { + grpc.ClientStream +} + +func (w streamWrapper) ReadMessage(m Message) error { + return w.ClientStream.RecvMsg(m) +} + +func (w streamWrapper) WriteMessage(m Message) error { + return w.ClientStream.SendMsg(m) +} + +func (w *streamWrapper) Close() error { + return w.ClientStream.CloseSend() +} + +// Init initiates a messaging session within the RPC configured by options. +func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) { + prm := defaultCallParameters() + + for _, opt := range opts { + opt(prm) + } + + stream, err := c.con.NewStream(prm.ctx, &grpc.StreamDesc{ + StreamName: info.Name, + ServerStreams: info.ServerStream(), + ClientStreams: info.ClientStream(), + }, toMethodName(info)) + if err != nil { + return nil, err + } + + return &streamWrapper{ + ClientStream: stream, + }, nil +} diff --git a/rpc/grpc/options.go b/rpc/grpc/options.go new file mode 100644 index 0000000..389bd63 --- /dev/null +++ b/rpc/grpc/options.go @@ -0,0 +1,21 @@ +package grpc + +import ( + "google.golang.org/grpc" +) + +type cfg struct { + con *grpc.ClientConn +} + +func defaultCfg() *cfg { + return new(cfg) +} + +// WithClientConnection returns option to set gRPC connection +// to the remote server. +func WithClientConnection(con *grpc.ClientConn) Option { + return func(c *cfg) { + c.con = con + } +} diff --git a/rpc/grpc/util.go b/rpc/grpc/util.go new file mode 100644 index 0000000..61d9abb --- /dev/null +++ b/rpc/grpc/util.go @@ -0,0 +1,13 @@ +package grpc + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-api-go/rpc/common" +) + +const methodNameFmt = "%s/%s" + +func toMethodName(p common.CallMethodInfo) string { + return fmt.Sprintf(methodNameFmt, p.Service, p.Name) +}