WIP: [SUPPORT] Make rpc client stream initialization get cancelled by dial timeout #128
1 changed files with 47 additions and 8 deletions
|
@ -3,6 +3,7 @@ package client
|
|||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/message"
|
||||
|
@ -51,18 +52,56 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
|
|||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(prm.ctx)
|
||||
stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{
|
||||
StreamName: info.Name,
|
||||
ServerStreams: info.ServerStream(),
|
||||
ClientStreams: info.ClientStream(),
|
||||
}, toMethodName(info))
|
||||
if err != nil {
|
||||
|
||||
// `conn.NewStream` doesn't check if `conn` may turn up invalidated right before this invocation.
|
||||
// In such cases, the operation can hang indefinitely, with the context timeout being the only
|
||||
// mechanism to cancel it.
|
||||
//
|
||||
// We use a separate timer instead of context timeout because the latter
|
||||
// would propagate to all subsequent read/write operations on the opened stream,
|
||||
// which is not desired for the stream's lifecycle management.
|
||||
dialTimeoutTimer := time.NewTimer(c.dialTimeout)
|
||||
defer dialTimeoutTimer.Stop()
|
||||
|
||||
type newStreamRes struct {
|
||||
stream grpc.ClientStream
|
||||
err error
|
||||
}
|
||||
newStreamCh := make(chan newStreamRes)
|
||||
|
||||
go func() {
|
||||
stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{
|
||||
StreamName: info.Name,
|
||||
ServerStreams: info.ServerStream(),
|
||||
ClientStreams: info.ClientStream(),
|
||||
}, toMethodName(info))
|
||||
|
||||
newStreamCh <- newStreamRes{
|
||||
stream: stream,
|
||||
err: err,
|
||||
}
|
||||
}()
|
||||
|
||||
var res newStreamRes
|
||||
|
||||
select {
|
||||
case <-dialTimeoutTimer.C:
|
||||
cancel()
|
||||
return nil, err
|
||||
res = <-newStreamCh
|
||||
if res.stream != nil && res.err == nil {
|
||||
_ = res.stream.CloseSend()
|
||||
}
|
||||
return nil, context.Canceled
|
||||
case res = <-newStreamCh:
|
||||
}
|
||||
|
||||
if res.err != nil {
|
||||
cancel()
|
||||
return nil, res.err
|
||||
}
|
||||
|
||||
return &streamWrapper{
|
||||
ClientStream: stream,
|
||||
ClientStream: res.stream,
|
||||
cancel: cancel,
|
||||
timeout: c.rwTimeout,
|
||||
}, nil
|
||||
|
|
Loading…
Reference in a new issue