[#263] Implement client for exchanging raw messages using gRPC protocol

Implement gRPC client that can uniformly execute any RPC on the remote
server. In the primary implementation, the client is a thin wrapper over
gRPC client connection that is required to create the client. In the future,
it is planned to expand the library with convenient functionality.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-03-12 14:17:16 +03:00 committed by Alex Vanin
parent ae2fb263f1
commit cf765a61a6
7 changed files with 271 additions and 0 deletions

75
rpc/common/call.go Normal file
View file

@ -0,0 +1,75 @@
package common
type callType uint8
const (
_ callType = iota
callUnary
callClientStream
callServerStream
callBidirStream
)
// CallMethodInfo is an information about the RPC.
type CallMethodInfo struct {
// Name of the service.
Service string
// Name of the RPC.
Name string
t callType
}
// ServerStream checks if CallMethodInfo contains
// information about the server-side streaming RPC.
func (c CallMethodInfo) ServerStream() bool {
return c.t == callServerStream || c.t == callBidirStream
}
// ClientStream checks if CallMethodInfo contains
// information about the client-side streaming RPC.
func (c CallMethodInfo) ClientStream() bool {
return c.t == callClientStream || c.t == callBidirStream
}
func (c *CallMethodInfo) setCommon(service, name string) {
c.Service = service
c.Name = name
}
// CallMethodInfoUnary returns CallMethodInfo structure
// initialized for the unary RPC.
func CallMethodInfoUnary(service, name string) (info CallMethodInfo) {
info.setCommon(service, name)
info.t = callUnary
return
}
// CallMethodInfoClientStream returns CallMethodInfo structure
// initialized for the client-side streaming RPC.
func CallMethodInfoClientStream(service, name string) (info CallMethodInfo) {
info.setCommon(service, name)
info.t = callClientStream
return
}
// CallMethodInfoServerStream returns CallMethodInfo structure
// initialized for the server-side streaming RPC.
func CallMethodInfoServerStream(service, name string) (info CallMethodInfo) {
info.setCommon(service, name)
info.t = callServerStream
return
}
// CallMethodInfoBidirectionalStream returns CallMethodInfo structure
// initialized for the bidirectional streaming RPC.
func CallMethodInfoBidirectionalStream(service, name string) (info CallMethodInfo) {
info.setCommon(service, name)
info.t = callBidirStream
return
}

49
rpc/common/call_test.go Normal file
View file

@ -0,0 +1,49 @@
package common_test
import (
"testing"
"github.com/nspcc-dev/neofs-api-go/rpc/common"
"github.com/stretchr/testify/require"
)
const (
testServiceName = "test service"
testRPCName = "test RPC"
)
func TestCallMethodInfoUnary(t *testing.T) {
i := common.CallMethodInfoUnary(testServiceName, testRPCName)
require.Equal(t, testServiceName, i.Service)
require.Equal(t, testRPCName, i.Name)
require.False(t, i.ClientStream())
require.False(t, i.ServerStream())
}
func TestCallMethodInfoServerStream(t *testing.T) {
i := common.CallMethodInfoServerStream(testServiceName, testRPCName)
require.Equal(t, testServiceName, i.Service)
require.Equal(t, testRPCName, i.Name)
require.False(t, i.ClientStream())
require.True(t, i.ServerStream())
}
func TestCallMethodInfoClientStream(t *testing.T) {
i := common.CallMethodInfoClientStream(testServiceName, testRPCName)
require.Equal(t, testServiceName, i.Service)
require.Equal(t, testRPCName, i.Name)
require.True(t, i.ClientStream())
require.False(t, i.ServerStream())
}
func TestCallMethodInfoBidirectionalStream(t *testing.T) {
i := common.CallMethodInfoBidirectionalStream(testServiceName, testRPCName)
require.Equal(t, testServiceName, i.Service)
require.Equal(t, testRPCName, i.Name)
require.True(t, i.ClientStream())
require.True(t, i.ServerStream())
}

25
rpc/grpc/call_options.go Normal file
View file

@ -0,0 +1,25 @@
package grpc
import (
"context"
)
// CallOption is a messaging session option within RPC.
type CallOption func(*callParameters)
type callParameters struct {
ctx context.Context
}
func defaultCallParameters() *callParameters {
return &callParameters{
ctx: context.Background(),
}
}
// WithContext returns option to set RPC context.
func WithContext(ctx context.Context) CallOption {
return func(prm *callParameters) {
prm.ctx = ctx
}
}

23
rpc/grpc/client.go Normal file
View file

@ -0,0 +1,23 @@
package grpc
// Client represents client for exchanging messages
// with a remote server using gRPC protocol.
type Client struct {
*cfg
}
// Option is a Client's constructor option.
type Option func(*cfg)
// New creates, configures via options and returns new Client instance.
func New(opts ...Option) *Client {
c := defaultCfg()
for _, opt := range opts {
opt(c)
}
return &Client{
cfg: c,
}
}

65
rpc/grpc/init.go Normal file
View file

@ -0,0 +1,65 @@
package grpc
import (
"io"
"github.com/nspcc-dev/neofs-api-go/rpc/common"
"google.golang.org/grpc"
)
// Message represents raw gRPC message.
type Message interface{}
// MessageReadWriter is a component interface
// for transmitting raw messages over gRPC protocol.
type MessageReadWriter interface {
// ReadMessage reads the next message from the remote server,
// and writes it to the argument.
ReadMessage(Message) error
// WriteMessage sends message from argument to remote server.
WriteMessage(Message) error
// Closes the communication session with the remote server.
//
// All calls to send/receive messages must be done before closing.
io.Closer
}
type streamWrapper struct {
grpc.ClientStream
}
func (w streamWrapper) ReadMessage(m Message) error {
return w.ClientStream.RecvMsg(m)
}
func (w streamWrapper) WriteMessage(m Message) error {
return w.ClientStream.SendMsg(m)
}
func (w *streamWrapper) Close() error {
return w.ClientStream.CloseSend()
}
// Init initiates a messaging session within the RPC configured by options.
func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) {
prm := defaultCallParameters()
for _, opt := range opts {
opt(prm)
}
stream, err := c.con.NewStream(prm.ctx, &grpc.StreamDesc{
StreamName: info.Name,
ServerStreams: info.ServerStream(),
ClientStreams: info.ClientStream(),
}, toMethodName(info))
if err != nil {
return nil, err
}
return &streamWrapper{
ClientStream: stream,
}, nil
}

21
rpc/grpc/options.go Normal file
View file

@ -0,0 +1,21 @@
package grpc
import (
"google.golang.org/grpc"
)
type cfg struct {
con *grpc.ClientConn
}
func defaultCfg() *cfg {
return new(cfg)
}
// WithClientConnection returns option to set gRPC connection
// to the remote server.
func WithClientConnection(con *grpc.ClientConn) Option {
return func(c *cfg) {
c.con = con
}
}

13
rpc/grpc/util.go Normal file
View file

@ -0,0 +1,13 @@
package grpc
import (
"fmt"
"github.com/nspcc-dev/neofs-api-go/rpc/common"
)
const methodNameFmt = "%s/%s"
func toMethodName(p common.CallMethodInfo) string {
return fmt.Sprintf(methodNameFmt, p.Service, p.Name)
}