diff --git a/rpc/client/call_options.go b/rpc/client/call_options.go new file mode 100644 index 0000000..ef849f3 --- /dev/null +++ b/rpc/client/call_options.go @@ -0,0 +1,27 @@ +package client + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/rpc/grpc" +) + +// CallOption is a messaging session option within Protobuf RPC. +type CallOption func(*callParameters) + +type callParameters struct { + callOpts []grpc.CallOption +} + +func defaultCallParameters() *callParameters { + return &callParameters{ + callOpts: make([]grpc.CallOption, 0, 1), + } +} + +// WithContext return options to specify call context. +func WithContext(ctx context.Context) CallOption { + return func(prm *callParameters) { + prm.callOpts = append(prm.callOpts, grpc.WithContext(ctx)) + } +} diff --git a/rpc/client/client.go b/rpc/client/client.go new file mode 100644 index 0000000..e51cdcf --- /dev/null +++ b/rpc/client/client.go @@ -0,0 +1,29 @@ +package client + +import ( + "sync" + + "github.com/nspcc-dev/neofs-api-go/rpc/grpc" +) + +// Client represents client for exchanging messages +// with a remote server using Protobuf RPC. +type Client struct { + *cfg + + gRPCClientOnce sync.Once + gRPCClient *grpc.Client +} + +// New creates, configures via options and returns new Client instance. +func New(opts ...Option) *Client { + c := defaultCfg() + + for _, opt := range opts { + opt(c) + } + + return &Client{ + cfg: c, + } +} diff --git a/rpc/client/connect.go b/rpc/client/connect.go new file mode 100644 index 0000000..f9cd940 --- /dev/null +++ b/rpc/client/connect.go @@ -0,0 +1,41 @@ +package client + +import ( + "context" + "errors" + + "github.com/nspcc-dev/neofs-api-go/rpc/grpc" + grpcstd "google.golang.org/grpc" +) + +func (c *Client) createGRPCClient() (err error) { + c.gRPCClientOnce.Do(func() { + if err = c.openGRPCConn(); err != nil { + return + } + + c.gRPCClient = grpc.New(grpc.WithClientConnection(c.conn)) + }) + + return +} + +var errInvalidEndpoint = errors.New("invalid endpoint options") + +func (c *Client) openGRPCConn() error { + if c.conn != nil { + return nil + } + + if c.addr == "" { + return errInvalidEndpoint + } + + var err error + + dialCtx, cancel := context.WithTimeout(context.Background(), c.dialTimeout) + c.conn, err = grpcstd.DialContext(dialCtx, c.addr, grpcstd.WithInsecure()) + cancel() + + return err +} diff --git a/rpc/client/flows.go b/rpc/client/flows.go new file mode 100644 index 0000000..ceb890e --- /dev/null +++ b/rpc/client/flows.go @@ -0,0 +1,124 @@ +package client + +import ( + "io" + "sync" + + "github.com/nspcc-dev/neofs-api-go/rpc/common" + "github.com/nspcc-dev/neofs-api-go/rpc/message" + "github.com/pkg/errors" +) + +// SendUnary initializes communication session by RPC info, performs unary RPC +// and closes the session. +func SendUnary(cli *Client, info common.CallMethodInfo, req, resp message.Message, opts ...CallOption) error { + rw, err := cli.Init(info, opts...) + if err != nil { + return err + } + + err = rw.WriteMessage(req) + if err != nil { + return err + } + + err = rw.ReadMessage(resp) + if err != nil { + return err + } + + return rw.Close() +} + +// MessageWriterCloser wraps MessageWriter +// and io.Closer interfaces. +type MessageWriterCloser interface { + MessageWriter + io.Closer +} + +type clientStreamWriterCloser struct { + MessageReadWriter + + resp message.Message +} + +func (c *clientStreamWriterCloser) Close() error { + err := c.MessageReadWriter.Close() + if err != nil { + return err + } + + return c.ReadMessage(c.resp) +} + +// OpenClientStream initializes communication session by RPC info, opens client-side stream +// and returns its interface. +// +// All stream writes must be performed before the closing. Close must be called once. +func OpenClientStream(cli *Client, info common.CallMethodInfo, resp message.Message, opts ...CallOption) (MessageWriterCloser, error) { + rw, err := cli.Init(info, opts...) + if err != nil { + return nil, err + } + + return &clientStreamWriterCloser{ + MessageReadWriter: rw, + resp: resp, + }, nil +} + +// MessageReaderCloser wraps MessageReader +// and io.Closer interface. +type MessageReaderCloser interface { + MessageReader + io.Closer +} + +type serverStreamReaderCloser struct { + rw MessageReadWriter + + once sync.Once + + req message.Message +} + +func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error { + var err error + + s.once.Do(func() { + err = s.rw.WriteMessage(s.req) + }) + + if err != nil { + return err + } + + err = s.rw.ReadMessage(msg) + if !errors.Is(err, io.EOF) { + return err + } + + err = s.rw.Close() + if err != nil { + return err + } + + return io.EOF +} + +// OpenServerStream initializes communication session by RPC info, opens server-side stream +// and returns its interface. +// +// All stream reads must be performed before the closing. Close must be called once. +func OpenServerStream(cli *Client, info common.CallMethodInfo, req message.Message, opts ...CallOption) (MessageReader, error) { + rw, err := cli.Init(info, opts...) + if err != nil { + return nil, err + } + + return &serverStreamReaderCloser{ + rw: rw, + req: req, + }, nil +} diff --git a/rpc/client/init.go b/rpc/client/init.go new file mode 100644 index 0000000..aa0d420 --- /dev/null +++ b/rpc/client/init.go @@ -0,0 +1,83 @@ +package client + +import ( + "io" + + "github.com/nspcc-dev/neofs-api-go/rpc/common" + "github.com/nspcc-dev/neofs-api-go/rpc/grpc" + "github.com/nspcc-dev/neofs-api-go/rpc/message" +) + +// MessageReader is an interface of the Message reader. +type MessageReader interface { + // ReadMessage reads the next Message. + // + // Returns io.EOF if there are no more messages to read. + // ReadMessage should not be called after io.EOF occasion. + ReadMessage(message.Message) error +} + +// MessageWriter is an interface of the Message writer. +type MessageWriter interface { + // WriteMessage writers the next Message. + // + // WriteMessage should not be called after any error. + WriteMessage(message.Message) error +} + +// MessageReadWriter is a component interface +// for transmitting raw Protobuf messages. +type MessageReadWriter interface { + MessageReader + MessageWriter + + // Closes the communication session. + // + // All calls to send/receive messages must be done before closing. + io.Closer +} + +// Init initiates a messaging session and returns the interface for message transmitting. +func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) { + prm := defaultCallParameters() + + for _, opt := range opts { + opt(prm) + } + + return c.initGRPC(info, prm) +} + +type rwGRPC struct { + grpc.MessageReadWriter +} + +func (g rwGRPC) ReadMessage(m message.Message) error { + // Can be optimized: we can create blank message here. + gm := m.ToGRPCMessage() + + if err := g.MessageReadWriter.ReadMessage(gm); err != nil { + return err + } + + return m.FromGRPCMessage(gm) +} + +func (g rwGRPC) WriteMessage(m message.Message) error { + return g.MessageReadWriter.WriteMessage(m.ToGRPCMessage()) +} + +func (c *Client) initGRPC(info common.CallMethodInfo, prm *callParameters) (MessageReadWriter, error) { + if err := c.createGRPCClient(); err != nil { + return nil, err + } + + rw, err := c.gRPCClient.Init(info, prm.callOpts...) + if err != nil { + return nil, err + } + + return &rwGRPC{ + MessageReadWriter: rw, + }, nil +} diff --git a/rpc/client/options.go b/rpc/client/options.go new file mode 100644 index 0000000..9f2a226 --- /dev/null +++ b/rpc/client/options.go @@ -0,0 +1,60 @@ +package client + +import ( + "time" + + "google.golang.org/grpc" +) + +// Option is a Client's option. +type Option func(*cfg) + +type cfg struct { + addr string + + dialTimeout time.Duration + + conn *grpc.ClientConn +} + +const defaultDialTimeout = 5 * time.Second + +func defaultCfg() *cfg { + return &cfg{ + dialTimeout: defaultDialTimeout, + } +} + +// WithNetworkAddress returns option to specify +// network address of the remote server. +// +// Ignored if WithGRPCConn is provided. +func WithNetworkAddress(v string) Option { + return func(c *cfg) { + if v != "" { + c.addr = v + } + } +} + +// WithDialTimeout returns option to specify +// dial timeout of the remote server connection. +// +// Ignored if WithGRPCConn is provided. +func WithDialTimeout(v time.Duration) Option { + return func(c *cfg) { + if v > 0 { + c.dialTimeout = v + } + } +} + +// WithGRPCConn returns option to specify +// gRPC virtual connection. +func WithGRPCConn(v *grpc.ClientConn) Option { + return func(c *cfg) { + if v != nil { + c.conn = v + } + } +}