diff --git a/rpc/grpc/init.go b/rpc/grpc/init.go index aeb4fb44..9cf41d72 100644 --- a/rpc/grpc/init.go +++ b/rpc/grpc/init.go @@ -1,7 +1,9 @@ package grpc import ( + "context" "io" + "time" "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" "google.golang.org/grpc" @@ -28,18 +30,40 @@ type MessageReadWriter interface { type streamWrapper struct { grpc.ClientStream + timeout time.Duration + cancel context.CancelFunc } func (w streamWrapper) ReadMessage(m Message) error { - return w.ClientStream.RecvMsg(m) + return w.withTimeout(func() error { + return w.ClientStream.RecvMsg(m) + }) } func (w streamWrapper) WriteMessage(m Message) error { - return w.ClientStream.SendMsg(m) + return w.withTimeout(func() error { + return w.ClientStream.SendMsg(m) + }) } func (w *streamWrapper) Close() error { - return w.ClientStream.CloseSend() + return w.withTimeout(w.ClientStream.CloseSend) +} + +func (w *streamWrapper) withTimeout(closure func() error) error { + ch := make(chan error, 1) + go func() { + ch <- closure() + close(ch) + }() + + select { + case err := <-ch: + return err + case <-time.After(w.timeout): + w.cancel() + return context.DeadlineExceeded + } } // Init initiates a messaging session within the RPC configured by options. @@ -50,16 +74,20 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe opt(prm) } - stream, err := c.con.NewStream(prm.ctx, &grpc.StreamDesc{ + ctx, cancel := context.WithCancel(prm.ctx) + stream, err := c.con.NewStream(ctx, &grpc.StreamDesc{ StreamName: info.Name, ServerStreams: info.ServerStream(), ClientStreams: info.ClientStream(), }, toMethodName(info)) if err != nil { + cancel() return nil, err } return &streamWrapper{ ClientStream: stream, + cancel: cancel, + timeout: c.rwTimeout, }, nil } diff --git a/rpc/grpc/options.go b/rpc/grpc/options.go index 389bd632..29b07bab 100644 --- a/rpc/grpc/options.go +++ b/rpc/grpc/options.go @@ -1,15 +1,22 @@ package grpc import ( + "time" + "google.golang.org/grpc" ) +const defaultRWTimeout = 1 * time.Minute + type cfg struct { - con *grpc.ClientConn + con *grpc.ClientConn + rwTimeout time.Duration } func defaultCfg() *cfg { - return new(cfg) + return &cfg{ + rwTimeout: defaultRWTimeout, + } } // WithClientConnection returns option to set gRPC connection @@ -19,3 +26,11 @@ func WithClientConnection(con *grpc.ClientConn) Option { c.con = con } } + +// WithRWTimeout returns option to specify rwTimeout +// for reading and writing single gRPC message. +func WithRWTimeout(t time.Duration) Option { + return func(c *cfg) { + c.rwTimeout = t + } +}