[#263] Implement client for exchanging raw Protobuf messages
Implement generic `Client` that can communicate with the remote server via protobuf `Message`'s. The client can uniformly execute any protobuf RPC on the remote server using any of the supported transport protocols. Currently only gRPC protocol is supported. Additionally implement helpful functions to transmit messages by one of the flow types: unary, client- or server-side stream. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
c61656a43f
commit
30c6ca0714
6 changed files with 364 additions and 0 deletions
27
rpc/client/call_options.go
Normal file
27
rpc/client/call_options.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/rpc/grpc"
|
||||
)
|
||||
|
||||
// CallOption is a messaging session option within Protobuf RPC.
|
||||
type CallOption func(*callParameters)
|
||||
|
||||
type callParameters struct {
|
||||
callOpts []grpc.CallOption
|
||||
}
|
||||
|
||||
func defaultCallParameters() *callParameters {
|
||||
return &callParameters{
|
||||
callOpts: make([]grpc.CallOption, 0, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// WithContext return options to specify call context.
|
||||
func WithContext(ctx context.Context) CallOption {
|
||||
return func(prm *callParameters) {
|
||||
prm.callOpts = append(prm.callOpts, grpc.WithContext(ctx))
|
||||
}
|
||||
}
|
29
rpc/client/client.go
Normal file
29
rpc/client/client.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/rpc/grpc"
|
||||
)
|
||||
|
||||
// Client represents client for exchanging messages
|
||||
// with a remote server using Protobuf RPC.
|
||||
type Client struct {
|
||||
*cfg
|
||||
|
||||
gRPCClientOnce sync.Once
|
||||
gRPCClient *grpc.Client
|
||||
}
|
||||
|
||||
// New creates, configures via options and returns new Client instance.
|
||||
func New(opts ...Option) *Client {
|
||||
c := defaultCfg()
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(c)
|
||||
}
|
||||
|
||||
return &Client{
|
||||
cfg: c,
|
||||
}
|
||||
}
|
41
rpc/client/connect.go
Normal file
41
rpc/client/connect.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/rpc/grpc"
|
||||
grpcstd "google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func (c *Client) createGRPCClient() (err error) {
|
||||
c.gRPCClientOnce.Do(func() {
|
||||
if err = c.openGRPCConn(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.gRPCClient = grpc.New(grpc.WithClientConnection(c.conn))
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
var errInvalidEndpoint = errors.New("invalid endpoint options")
|
||||
|
||||
func (c *Client) openGRPCConn() error {
|
||||
if c.conn != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if c.addr == "" {
|
||||
return errInvalidEndpoint
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
dialCtx, cancel := context.WithTimeout(context.Background(), c.dialTimeout)
|
||||
c.conn, err = grpcstd.DialContext(dialCtx, c.addr, grpcstd.WithInsecure())
|
||||
cancel()
|
||||
|
||||
return err
|
||||
}
|
124
rpc/client/flows.go
Normal file
124
rpc/client/flows.go
Normal file
|
@ -0,0 +1,124 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/rpc/common"
|
||||
"github.com/nspcc-dev/neofs-api-go/rpc/message"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// SendUnary initializes communication session by RPC info, performs unary RPC
|
||||
// and closes the session.
|
||||
func SendUnary(cli *Client, info common.CallMethodInfo, req, resp message.Message, opts ...CallOption) error {
|
||||
rw, err := cli.Init(info, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = rw.WriteMessage(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = rw.ReadMessage(resp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return rw.Close()
|
||||
}
|
||||
|
||||
// MessageWriterCloser wraps MessageWriter
|
||||
// and io.Closer interfaces.
|
||||
type MessageWriterCloser interface {
|
||||
MessageWriter
|
||||
io.Closer
|
||||
}
|
||||
|
||||
type clientStreamWriterCloser struct {
|
||||
MessageReadWriter
|
||||
|
||||
resp message.Message
|
||||
}
|
||||
|
||||
func (c *clientStreamWriterCloser) Close() error {
|
||||
err := c.MessageReadWriter.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.ReadMessage(c.resp)
|
||||
}
|
||||
|
||||
// OpenClientStream initializes communication session by RPC info, opens client-side stream
|
||||
// and returns its interface.
|
||||
//
|
||||
// All stream writes must be performed before the closing. Close must be called once.
|
||||
func OpenClientStream(cli *Client, info common.CallMethodInfo, resp message.Message, opts ...CallOption) (MessageWriterCloser, error) {
|
||||
rw, err := cli.Init(info, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &clientStreamWriterCloser{
|
||||
MessageReadWriter: rw,
|
||||
resp: resp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// MessageReaderCloser wraps MessageReader
|
||||
// and io.Closer interface.
|
||||
type MessageReaderCloser interface {
|
||||
MessageReader
|
||||
io.Closer
|
||||
}
|
||||
|
||||
type serverStreamReaderCloser struct {
|
||||
rw MessageReadWriter
|
||||
|
||||
once sync.Once
|
||||
|
||||
req message.Message
|
||||
}
|
||||
|
||||
func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
|
||||
var err error
|
||||
|
||||
s.once.Do(func() {
|
||||
err = s.rw.WriteMessage(s.req)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.rw.ReadMessage(msg)
|
||||
if !errors.Is(err, io.EOF) {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.rw.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
// OpenServerStream initializes communication session by RPC info, opens server-side stream
|
||||
// and returns its interface.
|
||||
//
|
||||
// All stream reads must be performed before the closing. Close must be called once.
|
||||
func OpenServerStream(cli *Client, info common.CallMethodInfo, req message.Message, opts ...CallOption) (MessageReader, error) {
|
||||
rw, err := cli.Init(info, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &serverStreamReaderCloser{
|
||||
rw: rw,
|
||||
req: req,
|
||||
}, nil
|
||||
}
|
83
rpc/client/init.go
Normal file
83
rpc/client/init.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/rpc/common"
|
||||
"github.com/nspcc-dev/neofs-api-go/rpc/grpc"
|
||||
"github.com/nspcc-dev/neofs-api-go/rpc/message"
|
||||
)
|
||||
|
||||
// MessageReader is an interface of the Message reader.
|
||||
type MessageReader interface {
|
||||
// ReadMessage reads the next Message.
|
||||
//
|
||||
// Returns io.EOF if there are no more messages to read.
|
||||
// ReadMessage should not be called after io.EOF occasion.
|
||||
ReadMessage(message.Message) error
|
||||
}
|
||||
|
||||
// MessageWriter is an interface of the Message writer.
|
||||
type MessageWriter interface {
|
||||
// WriteMessage writers the next Message.
|
||||
//
|
||||
// WriteMessage should not be called after any error.
|
||||
WriteMessage(message.Message) error
|
||||
}
|
||||
|
||||
// MessageReadWriter is a component interface
|
||||
// for transmitting raw Protobuf messages.
|
||||
type MessageReadWriter interface {
|
||||
MessageReader
|
||||
MessageWriter
|
||||
|
||||
// Closes the communication session.
|
||||
//
|
||||
// All calls to send/receive messages must be done before closing.
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// Init initiates a messaging session and returns the interface for message transmitting.
|
||||
func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) {
|
||||
prm := defaultCallParameters()
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(prm)
|
||||
}
|
||||
|
||||
return c.initGRPC(info, prm)
|
||||
}
|
||||
|
||||
type rwGRPC struct {
|
||||
grpc.MessageReadWriter
|
||||
}
|
||||
|
||||
func (g rwGRPC) ReadMessage(m message.Message) error {
|
||||
// Can be optimized: we can create blank message here.
|
||||
gm := m.ToGRPCMessage()
|
||||
|
||||
if err := g.MessageReadWriter.ReadMessage(gm); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return m.FromGRPCMessage(gm)
|
||||
}
|
||||
|
||||
func (g rwGRPC) WriteMessage(m message.Message) error {
|
||||
return g.MessageReadWriter.WriteMessage(m.ToGRPCMessage())
|
||||
}
|
||||
|
||||
func (c *Client) initGRPC(info common.CallMethodInfo, prm *callParameters) (MessageReadWriter, error) {
|
||||
if err := c.createGRPCClient(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rw, err := c.gRPCClient.Init(info, prm.callOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &rwGRPC{
|
||||
MessageReadWriter: rw,
|
||||
}, nil
|
||||
}
|
60
rpc/client/options.go
Normal file
60
rpc/client/options.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// Option is a Client's option.
|
||||
type Option func(*cfg)
|
||||
|
||||
type cfg struct {
|
||||
addr string
|
||||
|
||||
dialTimeout time.Duration
|
||||
|
||||
conn *grpc.ClientConn
|
||||
}
|
||||
|
||||
const defaultDialTimeout = 5 * time.Second
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
dialTimeout: defaultDialTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
// WithNetworkAddress returns option to specify
|
||||
// network address of the remote server.
|
||||
//
|
||||
// Ignored if WithGRPCConn is provided.
|
||||
func WithNetworkAddress(v string) Option {
|
||||
return func(c *cfg) {
|
||||
if v != "" {
|
||||
c.addr = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithDialTimeout returns option to specify
|
||||
// dial timeout of the remote server connection.
|
||||
//
|
||||
// Ignored if WithGRPCConn is provided.
|
||||
func WithDialTimeout(v time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
if v > 0 {
|
||||
c.dialTimeout = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithGRPCConn returns option to specify
|
||||
// gRPC virtual connection.
|
||||
func WithGRPCConn(v *grpc.ClientConn) Option {
|
||||
return func(c *cfg) {
|
||||
if v != nil {
|
||||
c.conn = v
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue