From cbc59f82fbdd37576efd1c993fc4578dcc66a7d3 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Wed, 4 Dec 2024 18:02:48 +0300 Subject: [PATCH] [#301] rpc: Make client stream initialization get cancelled by dial timeout * `c.conn` may be already invalidated but the rpc client can't detect this. `NewStream` may hang trying to open a stream with invalidated connection. Using timer with `dialTimeout` for `NewStream` fixes this problem. Signed-off-by: Airat Arifullin --- api/rpc/client/init.go | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/api/rpc/client/init.go b/api/rpc/client/init.go index 706e6a9..daa833c 100644 --- a/api/rpc/client/init.go +++ b/api/rpc/client/init.go @@ -3,6 +3,7 @@ package client import ( "context" "io" + "time" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message" @@ -51,11 +52,39 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe } ctx, cancel := context.WithCancel(prm.ctx) - stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{ - StreamName: info.Name, - ServerStreams: info.ServerStream(), - ClientStreams: info.ClientStream(), - }, toMethodName(info)) + + // `conn.NewStream` doesn't check if `conn` may turn up invalidated right before this invocation. + // In such cases, the operation can hang indefinitely, with the context timeout being the only + // mechanism to cancel it. + // + // We use a separate timer instead of context timeout because the latter + // would propagate to all subsequent read/write operations on the opened stream, + // which is not desired for the stream's lifecycle management. + dialTimeoutTimer := time.NewTimer(c.dialTimeout) + defer dialTimeoutTimer.Stop() + newStreamCh := make(chan struct{}) + + var stream grpc.ClientStream + var err error + go func() { + stream, err = c.conn.NewStream(ctx, &grpc.StreamDesc{ + StreamName: info.Name, + ServerStreams: info.ServerStream(), + ClientStreams: info.ClientStream(), + }, toMethodName(info)) + close(newStreamCh) + }() + select { + case <-dialTimeoutTimer.C: + select { + case <-newStreamCh: + default: + cancel() + return nil, context.Canceled + } + case <-newStreamCh: + } + if err != nil { cancel() return nil, err