From aa53fb7131cd96f3ab70729827d18d63f92917d6 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 24 Dec 2021 13:16:55 +0300 Subject: [PATCH] [#366] rpc/grpc: Perform read-write message operations with timeout Remote gRPC server may not return or accept data for a while. gRPC solves this issue with timeout in context. However, the context is used for entire gRPC method invocation. Unfortunately the duration of requests with streams can't be estimated easily. To solve this issue we can specify timeouts for every message read and write. Single message has size limit so timeout can be related to that. Signed-off-by: Alex Vanin --- rpc/grpc/init.go | 36 ++++++++++++++++++++++++++++++++---- rpc/grpc/options.go | 19 +++++++++++++++++-- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/rpc/grpc/init.go b/rpc/grpc/init.go index aeb4fb4..9cf41d7 100644 --- a/rpc/grpc/init.go +++ b/rpc/grpc/init.go @@ -1,7 +1,9 @@ package grpc import ( + "context" "io" + "time" "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" "google.golang.org/grpc" @@ -28,18 +30,40 @@ type MessageReadWriter interface { type streamWrapper struct { grpc.ClientStream + timeout time.Duration + cancel context.CancelFunc } func (w streamWrapper) ReadMessage(m Message) error { - return w.ClientStream.RecvMsg(m) + return w.withTimeout(func() error { + return w.ClientStream.RecvMsg(m) + }) } func (w streamWrapper) WriteMessage(m Message) error { - return w.ClientStream.SendMsg(m) + return w.withTimeout(func() error { + return w.ClientStream.SendMsg(m) + }) } func (w *streamWrapper) Close() error { - return w.ClientStream.CloseSend() + return w.withTimeout(w.ClientStream.CloseSend) +} + +func (w *streamWrapper) withTimeout(closure func() error) error { + ch := make(chan error, 1) + go func() { + ch <- closure() + close(ch) + }() + + select { + case err := <-ch: + return err + case <-time.After(w.timeout): + w.cancel() + return context.DeadlineExceeded + } } // Init initiates a messaging session within the RPC configured by options. @@ -50,16 +74,20 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe opt(prm) } - stream, err := c.con.NewStream(prm.ctx, &grpc.StreamDesc{ + ctx, cancel := context.WithCancel(prm.ctx) + stream, err := c.con.NewStream(ctx, &grpc.StreamDesc{ StreamName: info.Name, ServerStreams: info.ServerStream(), ClientStreams: info.ClientStream(), }, toMethodName(info)) if err != nil { + cancel() return nil, err } return &streamWrapper{ ClientStream: stream, + cancel: cancel, + timeout: c.rwTimeout, }, nil } diff --git a/rpc/grpc/options.go b/rpc/grpc/options.go index 389bd63..29b07ba 100644 --- a/rpc/grpc/options.go +++ b/rpc/grpc/options.go @@ -1,15 +1,22 @@ package grpc import ( + "time" + "google.golang.org/grpc" ) +const defaultRWTimeout = 1 * time.Minute + type cfg struct { - con *grpc.ClientConn + con *grpc.ClientConn + rwTimeout time.Duration } func defaultCfg() *cfg { - return new(cfg) + return &cfg{ + rwTimeout: defaultRWTimeout, + } } // WithClientConnection returns option to set gRPC connection @@ -19,3 +26,11 @@ func WithClientConnection(con *grpc.ClientConn) Option { c.con = con } } + +// WithRWTimeout returns option to specify rwTimeout +// for reading and writing single gRPC message. +func WithRWTimeout(t time.Duration) Option { + return func(c *cfg) { + c.rwTimeout = t + } +}