Simplify client #2

Merged
fyrchik merged 2 commits from fix-allocations into master 2023-02-06 08:30:33 +00:00
12 changed files with 108 additions and 280 deletions

View File

@ -1,29 +1,28 @@
package client
import (
"sync"
"github.com/TrueCloudLab/frostfs-api-go/v2/rpc/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// Client represents client for exchanging messages
// with a remote server using Protobuf RPC.
type Client struct {
*cfg
gRPCClientOnce sync.Once
gRPCClient *grpc.Client
cfg
}
// New creates, configures via options and returns new Client instance.
func New(opts ...Option) *Client {
c := defaultCfg()
var c Client
c.initDefault()
for _, opt := range opts {
opt(c)
opt(&c.cfg)
}
return &Client{
cfg: c,
if c.tlsCfg != nil {
c.grpcDialOpts = append(c.grpcDialOpts, grpc.WithTransportCredentials(credentials.NewTLS(c.tlsCfg)))
}
return &c
}

View File

@ -9,14 +9,8 @@ import (
// Returns non-nil result after the first Init() call
// completed without a connection error.
//
// Conn is NPE-safe: returns nil if Client is nil.
//
// Client should not be used after Close() call
// on the connection: behavior is undefined.
func (c *Client) Conn() io.Closer {
if c != nil {
return c.gRPCClient.Conn()
}
return nil
return c.conn
}

View File

@ -7,28 +7,9 @@ import (
"net"
"net/url"
"github.com/TrueCloudLab/frostfs-api-go/v2/rpc/grpc"
grpcstd "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
func (c *Client) createGRPCClient(ctx context.Context) (err error) {
c.gRPCClientOnce.Do(func() {
if err = c.openGRPCConn(ctx); err != nil {
err = fmt.Errorf("open gRPC connection: %w", err)
return
}
c.gRPCClient = grpc.New(
grpc.WithClientConnection(c.conn),
grpc.WithRWTimeout(c.rwTimeout),
)
})
return
}
var errInvalidEndpoint = errors.New("invalid endpoint options")
func (c *Client) openGRPCConn(ctx context.Context) error {
@ -40,21 +21,10 @@ func (c *Client) openGRPCConn(ctx context.Context) error {
return errInvalidEndpoint
}
var creds credentials.TransportCredentials
if c.tlsCfg != nil {
creds = credentials.NewTLS(c.tlsCfg)
} else {
creds = insecure.NewCredentials()
}
dialCtx, cancel := context.WithTimeout(ctx, c.dialTimeout)
var err error
c.conn, err = grpcstd.DialContext(dialCtx, c.addr,
grpcstd.WithTransportCredentials(creds),
grpcstd.WithBlock(),
)
c.conn, err = grpcstd.DialContext(dialCtx, c.addr, c.grpcDialOpts...)
cancel()

View File

@ -1,11 +1,12 @@
package client
import (
"context"
"io"
"github.com/TrueCloudLab/frostfs-api-go/v2/rpc/common"
"github.com/TrueCloudLab/frostfs-api-go/v2/rpc/grpc"
"github.com/TrueCloudLab/frostfs-api-go/v2/rpc/message"
"google.golang.org/grpc"
)
// MessageReader is an interface of the Message reader.
@ -45,39 +46,24 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
opt(prm)
}
return c.initGRPC(info, prm)
}
type rwGRPC struct {
grpc.MessageReadWriter
}
func (g rwGRPC) ReadMessage(m message.Message) error {
// Can be optimized: we can create blank message here.
gm := m.ToGRPCMessage()
if err := g.MessageReadWriter.ReadMessage(gm); err != nil {
return err
}
return m.FromGRPCMessage(gm)
}
func (g rwGRPC) WriteMessage(m message.Message) error {
return g.MessageReadWriter.WriteMessage(m.ToGRPCMessage())
}
func (c *Client) initGRPC(info common.CallMethodInfo, prm *callParameters) (MessageReadWriter, error) {
if err := c.createGRPCClient(prm.ctx); err != nil {
if err := c.openGRPCConn(prm.ctx); err != nil {
return nil, err
}
rw, err := c.gRPCClient.Init(info, grpc.WithContext(prm.ctx))
ctx, cancel := context.WithCancel(prm.ctx)
stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{
StreamName: info.Name,
ServerStreams: info.ServerStream(),
ClientStreams: info.ClientStream(),
}, toMethodName(info))
if err != nil {
cancel()
return nil, err
}
return &rwGRPC{
MessageReadWriter: rw,
return &streamWrapper{
ClientStream: stream,
cancel: cancel,
timeout: c.rwTimeout,
}, nil
}

View File

@ -5,6 +5,8 @@ import (
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
const (
@ -21,20 +23,27 @@ type cfg struct {
dialTimeout time.Duration
rwTimeout time.Duration
tlsCfg *tls.Config
tlsCfg *tls.Config
grpcDialOpts []grpc.DialOption
conn *grpc.ClientConn
}
const (
defaultDialTimeout = 5 * time.Second
defaultRWTimeout = 1 * time.Minute
defaultDialTimeout = 5 * time.Second
defaultKeepAliveTimeout = 5 * time.Second
defaultRWTimeout = 1 * time.Minute
)
func defaultCfg() *cfg {
return &cfg{
dialTimeout: defaultDialTimeout,
rwTimeout: defaultRWTimeout,
func (c *cfg) initDefault() {
c.dialTimeout = defaultDialTimeout
c.rwTimeout = defaultRWTimeout
c.grpcDialOpts = []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Timeout: defaultKeepAliveTimeout,
}),
}
}
@ -117,3 +126,10 @@ func WithGRPCConn(v *grpc.ClientConn) Option {
}
}
}
// WithGRPCDialOptions returns an option to specify grpc.DialOption.
func WithGRPCDialOptions(opts []grpc.DialOption) Option {
return func(c *cfg) {
c.grpcDialOpts = append(c.grpcDialOpts, opts...)
}
}

View File

@ -0,0 +1,58 @@
package client
import (
"context"
"time"
"github.com/TrueCloudLab/frostfs-api-go/v2/rpc/message"
"google.golang.org/grpc"
)
type streamWrapper struct {
grpc.ClientStream
timeout time.Duration
cancel context.CancelFunc
}
func (w streamWrapper) ReadMessage(m message.Message) error {
// Can be optimized: we can create blank message here.
gm := m.ToGRPCMessage()
err := w.withTimeout(func() error {
return w.ClientStream.RecvMsg(gm)
})
if err != nil {
return err
}
return m.FromGRPCMessage(gm)
}
func (w streamWrapper) WriteMessage(m message.Message) error {
return w.withTimeout(func() error {
return w.ClientStream.SendMsg(m.ToGRPCMessage())
})
}
func (w *streamWrapper) Close() error {
return w.withTimeout(w.ClientStream.CloseSend)
}
func (w *streamWrapper) withTimeout(closure func() error) error {
ch := make(chan error, 1)
go func() {
ch <- closure()
close(ch)
}()
tt := time.NewTimer(w.timeout)
select {
case err := <-ch:
tt.Stop()
return err
case <-tt.C:
w.cancel()
return context.DeadlineExceeded
}
}

View File

@ -1,4 +1,4 @@
package grpc
package client
import (
"fmt"

View File

@ -1,25 +0,0 @@
package grpc
import (
"context"
)
// CallOption is a messaging session option within RPC.
type CallOption func(*callParameters)
type callParameters struct {
ctx context.Context
}
func defaultCallParameters() *callParameters {
return &callParameters{
ctx: context.Background(),
}
}
// WithContext returns option to set RPC context.
func WithContext(ctx context.Context) CallOption {
return func(prm *callParameters) {
prm.ctx = ctx
}
}

View File

@ -1,23 +0,0 @@
package grpc
// Client represents client for exchanging messages
// with a remote server using gRPC protocol.
type Client struct {
*cfg
}
// Option is a Client's constructor option.
type Option func(*cfg)
// New creates, configures via options and returns new Client instance.
func New(opts ...Option) *Client {
c := defaultCfg()
for _, opt := range opts {
opt(c)
}
return &Client{
cfg: c,
}
}

View File

@ -1,19 +0,0 @@
package grpc
import (
"io"
)
// Conn returns underlying connection.
//
// Conn is NPE-safe: returns nil if Client is nil.
//
// Client should not be used after Close() call
// on the connection: behavior is undefined.
func (c *Client) Conn() io.Closer {
if c != nil {
return c.con
}
return nil
}

View File

@ -1,96 +1,4 @@
package grpc
import (
"context"
"io"
"time"
"github.com/TrueCloudLab/frostfs-api-go/v2/rpc/common"
"google.golang.org/grpc"
)
// Message represents raw gRPC message.
type Message interface{}
// MessageReadWriter is a component interface
// for transmitting raw messages over gRPC protocol.
type MessageReadWriter interface {
// ReadMessage reads the next message from the remote server,
// and writes it to the argument.
ReadMessage(Message) error
// WriteMessage sends message from argument to remote server.
WriteMessage(Message) error
// Closes the communication session with the remote server.
//
// All calls to send/receive messages must be done before closing.
io.Closer
}
type streamWrapper struct {
grpc.ClientStream
timeout time.Duration
cancel context.CancelFunc
}
func (w streamWrapper) ReadMessage(m Message) error {
return w.withTimeout(func() error {
return w.ClientStream.RecvMsg(m)
})
}
func (w streamWrapper) WriteMessage(m Message) error {
return w.withTimeout(func() error {
return w.ClientStream.SendMsg(m)
})
}
func (w *streamWrapper) Close() error {
return w.withTimeout(w.ClientStream.CloseSend)
}
func (w *streamWrapper) withTimeout(closure func() error) error {
ch := make(chan error, 1)
go func() {
ch <- closure()
close(ch)
}()
tt := time.NewTimer(w.timeout)
select {
case err := <-ch:
tt.Stop()
return err
case <-tt.C:
w.cancel()
return context.DeadlineExceeded
}
}
// Init initiates a messaging session within the RPC configured by options.
func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) {
prm := defaultCallParameters()
for _, opt := range opts {
opt(prm)
}
ctx, cancel := context.WithCancel(prm.ctx)
stream, err := c.con.NewStream(ctx, &grpc.StreamDesc{
StreamName: info.Name,
ServerStreams: info.ServerStream(),
ClientStreams: info.ClientStream(),
}, toMethodName(info))
if err != nil {
cancel()
return nil, err
}
return &streamWrapper{
ClientStream: stream,
cancel: cancel,
timeout: c.rwTimeout,
}, nil
}

View File

@ -1,36 +0,0 @@
package grpc
import (
"time"
"google.golang.org/grpc"
)
const defaultRWTimeout = 1 * time.Minute
type cfg struct {
con *grpc.ClientConn
rwTimeout time.Duration
}
func defaultCfg() *cfg {
return &cfg{
rwTimeout: defaultRWTimeout,
}
}
// WithClientConnection returns option to set gRPC connection
// to the remote server.
func WithClientConnection(con *grpc.ClientConn) Option {
return func(c *cfg) {
c.con = con
}
}
// WithRWTimeout returns option to specify rwTimeout
// for reading and writing single gRPC message.
func WithRWTimeout(t time.Duration) Option {
return func(c *cfg) {
c.rwTimeout = t
}
}