From a5a399f748e35c7afcbbf778960473317b97c5f1 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Wed, 4 Dec 2024 18:02:48 +0300 Subject: [PATCH] [#301] rpc: Make client stream initialization get cancelled by dial timeout * `c.conn` may be already invalidated but the rpc client can't detect this. `NewStream` may hang trying to open a stream with invalidated connection. Using timer with `dialTimeout` for `NewStream` fixes this problem. Signed-off-by: Airat Arifullin --- api/rpc/client/init.go | 40 ++++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/api/rpc/client/init.go b/api/rpc/client/init.go index 706e6a9..330e52f 100644 --- a/api/rpc/client/init.go +++ b/api/rpc/client/init.go @@ -3,6 +3,7 @@ package client import ( "context" "io" + "time" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message" @@ -51,13 +52,40 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe } ctx, cancel := context.WithCancel(prm.ctx) - stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{ - StreamName: info.Name, - ServerStreams: info.ServerStream(), - ClientStreams: info.ClientStream(), - }, toMethodName(info)) + defer cancel() + + // `conn.NewStream` doesn't check if `conn` may turn up invalidated right before this invocation. + // In such cases, the operation can hang indefinitely, with the context timeout being the only + // mechanism to cancel it. + // + // We use a separate timer instead of context timeout because the latter + // would propagate to all subsequent read/write operations on the opened stream, + // which is not desired for the stream's lifecycle management. + dialTimeoutTimer := time.NewTimer(c.dialTimeout) + defer dialTimeoutTimer.Stop() + newStreamCh := make(chan struct{}) + + var stream grpc.ClientStream + var err error + go func() { + stream, err = c.conn.NewStream(ctx, &grpc.StreamDesc{ + StreamName: info.Name, + ServerStreams: info.ServerStream(), + ClientStreams: info.ClientStream(), + }, toMethodName(info)) + close(newStreamCh) + }() + select { + case <-dialTimeoutTimer.C: + select { + case <-newStreamCh: + default: + return nil, context.Canceled + } + case <-newStreamCh: + } + if err != nil { - cancel() return nil, err }