diff --git a/rpc/client/init.go b/rpc/client/init.go index 60ccda9..5c1698d 100644 --- a/rpc/client/init.go +++ b/rpc/client/init.go @@ -3,6 +3,7 @@ package client import ( "context" "io" + "time" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/common" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/message" @@ -51,18 +52,56 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe } ctx, cancel := context.WithCancel(prm.ctx) - stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{ - StreamName: info.Name, - ServerStreams: info.ServerStream(), - ClientStreams: info.ClientStream(), - }, toMethodName(info)) - if err != nil { + + // `conn.NewStream` doesn't check if `conn` may turn up invalidated right before this invocation. + // In such cases, the operation can hang indefinitely, with the context timeout being the only + // mechanism to cancel it. + // + // We use a separate timer instead of context timeout because the latter + // would propagate to all subsequent read/write operations on the opened stream, + // which is not desired for the stream's lifecycle management. + dialTimeoutTimer := time.NewTimer(c.dialTimeout) + defer dialTimeoutTimer.Stop() + + type newStreamRes struct { + stream grpc.ClientStream + err error + } + newStreamCh := make(chan newStreamRes) + + go func() { + stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{ + StreamName: info.Name, + ServerStreams: info.ServerStream(), + ClientStreams: info.ClientStream(), + }, toMethodName(info)) + + newStreamCh <- newStreamRes{ + stream: stream, + err: err, + } + }() + + var res newStreamRes + + select { + case <-dialTimeoutTimer.C: cancel() - return nil, err + res = <-newStreamCh + if res.stream != nil && res.err == nil { + _ = res.stream.CloseSend() + } + return nil, context.Canceled + case res = <-newStreamCh: + } + + if res.err != nil { + cancel() + return nil, res.err } return &streamWrapper{ - ClientStream: stream, + ClientStream: res.stream, cancel: cancel, timeout: c.rwTimeout, }, nil