[#301] rpc: Make client stream initialization get cancelled by dial timeout
* `c.conn` may be already invalidated but the rpc client can't detect this. `NewStream` may hang trying to open a stream with invalidated connection. Using timer with `dialTimeout` for `NewStream` fixes this problem. Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
cb813e27a8
commit
a5a399f748
1 changed files with 34 additions and 6 deletions
|
@ -3,6 +3,7 @@ package client
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
|
||||||
|
@ -51,13 +52,40 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(prm.ctx)
|
ctx, cancel := context.WithCancel(prm.ctx)
|
||||||
stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{
|
defer cancel()
|
||||||
|
|
||||||
|
// `conn.NewStream` doesn't check if `conn` may turn up invalidated right before this invocation.
|
||||||
|
// In such cases, the operation can hang indefinitely, with the context timeout being the only
|
||||||
|
// mechanism to cancel it.
|
||||||
|
//
|
||||||
|
// We use a separate timer instead of context timeout because the latter
|
||||||
|
// would propagate to all subsequent read/write operations on the opened stream,
|
||||||
|
// which is not desired for the stream's lifecycle management.
|
||||||
|
dialTimeoutTimer := time.NewTimer(c.dialTimeout)
|
||||||
|
defer dialTimeoutTimer.Stop()
|
||||||
|
newStreamCh := make(chan struct{})
|
||||||
|
|
||||||
|
var stream grpc.ClientStream
|
||||||
|
var err error
|
||||||
|
go func() {
|
||||||
|
stream, err = c.conn.NewStream(ctx, &grpc.StreamDesc{
|
||||||
StreamName: info.Name,
|
StreamName: info.Name,
|
||||||
ServerStreams: info.ServerStream(),
|
ServerStreams: info.ServerStream(),
|
||||||
ClientStreams: info.ClientStream(),
|
ClientStreams: info.ClientStream(),
|
||||||
}, toMethodName(info))
|
}, toMethodName(info))
|
||||||
|
close(newStreamCh)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-dialTimeoutTimer.C:
|
||||||
|
select {
|
||||||
|
case <-newStreamCh:
|
||||||
|
default:
|
||||||
|
return nil, context.Canceled
|
||||||
|
}
|
||||||
|
case <-newStreamCh:
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue