diff --git a/rpc/client/client.go b/rpc/client/client.go index 40b5d1a..e184d33 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -1,29 +1,19 @@ package client -import ( - "sync" - - "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/grpc" -) - // Client represents client for exchanging messages // with a remote server using Protobuf RPC. type Client struct { - *cfg - - gRPCClientOnce sync.Once - gRPCClient *grpc.Client + cfg } // New creates, configures via options and returns new Client instance. func New(opts ...Option) *Client { - c := defaultCfg() + var c Client + c.initDefault() for _, opt := range opts { - opt(c) + opt(&c.cfg) } - return &Client{ - cfg: c, - } + return &c } diff --git a/rpc/client/conn.go b/rpc/client/conn.go index ba53ab3..9fc7a51 100644 --- a/rpc/client/conn.go +++ b/rpc/client/conn.go @@ -9,14 +9,8 @@ import ( // Returns non-nil result after the first Init() call // completed without a connection error. // -// Conn is NPE-safe: returns nil if Client is nil. -// // Client should not be used after Close() call // on the connection: behavior is undefined. func (c *Client) Conn() io.Closer { - if c != nil { - return c.gRPCClient.Conn() - } - - return nil + return c.conn } diff --git a/rpc/client/connect.go b/rpc/client/connect.go index 2fe77d5..2f62130 100644 --- a/rpc/client/connect.go +++ b/rpc/client/connect.go @@ -7,28 +7,11 @@ import ( "net" "net/url" - "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/grpc" grpcstd "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" ) -func (c *Client) createGRPCClient(ctx context.Context) (err error) { - c.gRPCClientOnce.Do(func() { - if err = c.openGRPCConn(ctx); err != nil { - err = fmt.Errorf("open gRPC connection: %w", err) - return - } - - c.gRPCClient = grpc.New( - grpc.WithClientConnection(c.conn), - grpc.WithRWTimeout(c.rwTimeout), - ) - }) - - return -} - var errInvalidEndpoint = errors.New("invalid endpoint options") func (c *Client) openGRPCConn(ctx context.Context) error { diff --git a/rpc/client/init.go b/rpc/client/init.go index b45871b..7706c7a 100644 --- a/rpc/client/init.go +++ b/rpc/client/init.go @@ -1,11 +1,12 @@ package client import ( + "context" "io" "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/common" - "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/grpc" "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/message" + "google.golang.org/grpc" ) // MessageReader is an interface of the Message reader. @@ -45,39 +46,24 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe opt(prm) } - return c.initGRPC(info, prm) -} - -type rwGRPC struct { - grpc.MessageReadWriter -} - -func (g rwGRPC) ReadMessage(m message.Message) error { - // Can be optimized: we can create blank message here. - gm := m.ToGRPCMessage() - - if err := g.MessageReadWriter.ReadMessage(gm); err != nil { - return err - } - - return m.FromGRPCMessage(gm) -} - -func (g rwGRPC) WriteMessage(m message.Message) error { - return g.MessageReadWriter.WriteMessage(m.ToGRPCMessage()) -} - -func (c *Client) initGRPC(info common.CallMethodInfo, prm *callParameters) (MessageReadWriter, error) { - if err := c.createGRPCClient(prm.ctx); err != nil { + if err := c.openGRPCConn(prm.ctx); err != nil { return nil, err } - rw, err := c.gRPCClient.Init(info, grpc.WithContext(prm.ctx)) + ctx, cancel := context.WithCancel(prm.ctx) + stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{ + StreamName: info.Name, + ServerStreams: info.ServerStream(), + ClientStreams: info.ClientStream(), + }, toMethodName(info)) if err != nil { + cancel() return nil, err } - return &rwGRPC{ - MessageReadWriter: rw, + return &streamWrapper{ + ClientStream: stream, + cancel: cancel, + timeout: c.rwTimeout, }, nil } diff --git a/rpc/client/options.go b/rpc/client/options.go index f04fc9e..0e2455c 100644 --- a/rpc/client/options.go +++ b/rpc/client/options.go @@ -31,11 +31,9 @@ const ( defaultRWTimeout = 1 * time.Minute ) -func defaultCfg() *cfg { - return &cfg{ - dialTimeout: defaultDialTimeout, - rwTimeout: defaultRWTimeout, - } +func (c *cfg) initDefault() { + c.dialTimeout = defaultDialTimeout + c.rwTimeout = defaultRWTimeout } // WithNetworkAddress returns option to specify diff --git a/rpc/client/stream_wrapper.go b/rpc/client/stream_wrapper.go new file mode 100644 index 0000000..7f60db2 --- /dev/null +++ b/rpc/client/stream_wrapper.go @@ -0,0 +1,58 @@ +package client + +import ( + "context" + "time" + + "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/message" + "google.golang.org/grpc" +) + +type streamWrapper struct { + grpc.ClientStream + timeout time.Duration + cancel context.CancelFunc +} + +func (w streamWrapper) ReadMessage(m message.Message) error { + // Can be optimized: we can create blank message here. + gm := m.ToGRPCMessage() + + err := w.withTimeout(func() error { + return w.ClientStream.RecvMsg(gm) + }) + if err != nil { + return err + } + + return m.FromGRPCMessage(gm) +} + +func (w streamWrapper) WriteMessage(m message.Message) error { + return w.withTimeout(func() error { + return w.ClientStream.SendMsg(m.ToGRPCMessage()) + }) +} + +func (w *streamWrapper) Close() error { + return w.withTimeout(w.ClientStream.CloseSend) +} + +func (w *streamWrapper) withTimeout(closure func() error) error { + ch := make(chan error, 1) + go func() { + ch <- closure() + close(ch) + }() + + tt := time.NewTimer(w.timeout) + + select { + case err := <-ch: + tt.Stop() + return err + case <-tt.C: + w.cancel() + return context.DeadlineExceeded + } +} diff --git a/rpc/grpc/util.go b/rpc/client/util.go similarity index 93% rename from rpc/grpc/util.go rename to rpc/client/util.go index 220c656..291ebac 100644 --- a/rpc/grpc/util.go +++ b/rpc/client/util.go @@ -1,4 +1,4 @@ -package grpc +package client import ( "fmt" diff --git a/rpc/grpc/call_options.go b/rpc/grpc/call_options.go deleted file mode 100644 index 6faf019..0000000 --- a/rpc/grpc/call_options.go +++ /dev/null @@ -1,25 +0,0 @@ -package grpc - -import ( - "context" -) - -// CallOption is a messaging session option within RPC. -type CallOption func(*callParameters) - -type callParameters struct { - ctx context.Context -} - -func defaultCallParameters() *callParameters { - return &callParameters{ - ctx: context.Background(), - } -} - -// WithContext returns option to set RPC context. -func WithContext(ctx context.Context) CallOption { - return func(prm *callParameters) { - prm.ctx = ctx - } -} diff --git a/rpc/grpc/client.go b/rpc/grpc/client.go deleted file mode 100644 index 82df2ce..0000000 --- a/rpc/grpc/client.go +++ /dev/null @@ -1,23 +0,0 @@ -package grpc - -// Client represents client for exchanging messages -// with a remote server using gRPC protocol. -type Client struct { - *cfg -} - -// Option is a Client's constructor option. -type Option func(*cfg) - -// New creates, configures via options and returns new Client instance. -func New(opts ...Option) *Client { - c := defaultCfg() - - for _, opt := range opts { - opt(c) - } - - return &Client{ - cfg: c, - } -} diff --git a/rpc/grpc/conn.go b/rpc/grpc/conn.go deleted file mode 100644 index 40968b9..0000000 --- a/rpc/grpc/conn.go +++ /dev/null @@ -1,19 +0,0 @@ -package grpc - -import ( - "io" -) - -// Conn returns underlying connection. -// -// Conn is NPE-safe: returns nil if Client is nil. -// -// Client should not be used after Close() call -// on the connection: behavior is undefined. -func (c *Client) Conn() io.Closer { - if c != nil { - return c.con - } - - return nil -} diff --git a/rpc/grpc/init.go b/rpc/grpc/init.go index bc9aa9c..79f08c9 100644 --- a/rpc/grpc/init.go +++ b/rpc/grpc/init.go @@ -1,96 +1,4 @@ package grpc -import ( - "context" - "io" - "time" - - "github.com/TrueCloudLab/frostfs-api-go/v2/rpc/common" - "google.golang.org/grpc" -) - // Message represents raw gRPC message. type Message interface{} - -// MessageReadWriter is a component interface -// for transmitting raw messages over gRPC protocol. -type MessageReadWriter interface { - // ReadMessage reads the next message from the remote server, - // and writes it to the argument. - ReadMessage(Message) error - - // WriteMessage sends message from argument to remote server. - WriteMessage(Message) error - - // Closes the communication session with the remote server. - // - // All calls to send/receive messages must be done before closing. - io.Closer -} - -type streamWrapper struct { - grpc.ClientStream - timeout time.Duration - cancel context.CancelFunc -} - -func (w streamWrapper) ReadMessage(m Message) error { - return w.withTimeout(func() error { - return w.ClientStream.RecvMsg(m) - }) -} - -func (w streamWrapper) WriteMessage(m Message) error { - return w.withTimeout(func() error { - return w.ClientStream.SendMsg(m) - }) -} - -func (w *streamWrapper) Close() error { - return w.withTimeout(w.ClientStream.CloseSend) -} - -func (w *streamWrapper) withTimeout(closure func() error) error { - ch := make(chan error, 1) - go func() { - ch <- closure() - close(ch) - }() - - tt := time.NewTimer(w.timeout) - - select { - case err := <-ch: - tt.Stop() - return err - case <-tt.C: - w.cancel() - return context.DeadlineExceeded - } -} - -// Init initiates a messaging session within the RPC configured by options. -func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) { - prm := defaultCallParameters() - - for _, opt := range opts { - opt(prm) - } - - ctx, cancel := context.WithCancel(prm.ctx) - stream, err := c.con.NewStream(ctx, &grpc.StreamDesc{ - StreamName: info.Name, - ServerStreams: info.ServerStream(), - ClientStreams: info.ClientStream(), - }, toMethodName(info)) - if err != nil { - cancel() - return nil, err - } - - return &streamWrapper{ - ClientStream: stream, - cancel: cancel, - timeout: c.rwTimeout, - }, nil -} diff --git a/rpc/grpc/options.go b/rpc/grpc/options.go deleted file mode 100644 index 29b07ba..0000000 --- a/rpc/grpc/options.go +++ /dev/null @@ -1,36 +0,0 @@ -package grpc - -import ( - "time" - - "google.golang.org/grpc" -) - -const defaultRWTimeout = 1 * time.Minute - -type cfg struct { - con *grpc.ClientConn - rwTimeout time.Duration -} - -func defaultCfg() *cfg { - return &cfg{ - rwTimeout: defaultRWTimeout, - } -} - -// WithClientConnection returns option to set gRPC connection -// to the remote server. -func WithClientConnection(con *grpc.ClientConn) Option { - return func(c *cfg) { - c.con = con - } -} - -// WithRWTimeout returns option to specify rwTimeout -// for reading and writing single gRPC message. -func WithRWTimeout(t time.Duration) Option { - return func(c *cfg) { - c.rwTimeout = t - } -}