Make rpc client stream initialization get cancelled by dial timeout #304
|
@ -3,6 +3,7 @@ package client
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
|
||||||
|
@ -51,18 +52,56 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
|
||||||
}
|
}
|
||||||
|
|
||||||
a-savchuk marked this conversation as resolved
Outdated
|
|||||||
ctx, cancel := context.WithCancel(prm.ctx)
|
ctx, cancel := context.WithCancel(prm.ctx)
|
||||||
|
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Should we save Should we save `cancel` as field of `sttreamWrapper` below if invoke it in `defer`?
aarifullin
commented
Sure we don't. Fixed Sure we don't. Fixed
|
|||||||
|
// `conn.NewStream` doesn't check if `conn` may turn up invalidated right before this invocation.
|
||||||
|
// In such cases, the operation can hang indefinitely, with the context timeout being the only
|
||||||
|
// mechanism to cancel it.
|
||||||
|
//
|
||||||
|
// We use a separate timer instead of context timeout because the latter
|
||||||
|
// would propagate to all subsequent read/write operations on the opened stream,
|
||||||
|
// which is not desired for the stream's lifecycle management.
|
||||||
|
dialTimeoutTimer := time.NewTimer(c.dialTimeout)
|
||||||
fyrchik
commented
Do we also need Do we also need `dialTimeoutTimer.Stop()` call?
aarifullin
commented
Indeed! Fixed Indeed! Fixed
|
|||||||
|
defer dialTimeoutTimer.Stop()
|
||||||
fyrchik
commented
please, use please, use `struct{}` if you do nothing with the value -- it better communicates the intent and could be optimized by compiler as zero-size type.
aarifullin
commented
Good to know! Fixed > t better communicates the intent and could be optimized by compiler as zero-size type.
Good to know! Fixed
|
|||||||
|
|
||||||
|
type newStreamRes struct {
|
||||||
|
stream grpc.ClientStream
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
newStreamCh := make(chan newStreamRes)
|
||||||
|
|
||||||
|
go func() {
|
||||||
stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{
|
stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{
|
||||||
StreamName: info.Name,
|
StreamName: info.Name,
|
||||||
ServerStreams: info.ServerStream(),
|
ServerStreams: info.ServerStream(),
|
||||||
ClientStreams: info.ClientStream(),
|
ClientStreams: info.ClientStream(),
|
||||||
}, toMethodName(info))
|
}, toMethodName(info))
|
||||||
if err != nil {
|
|
||||||
fyrchik
commented
I think we still have a race-condition here and a possible goroutine leak.
It is true that I think we still have a race-condition here and a possible goroutine leak.
1. times fires
2. `stream, nil` is returned from `NewStream`
3. context is cancelled
It is true that `stream` from (2) should be closed?
To prevent this we must wait `<-newStreamCh` and then check the stream.
aarifullin
commented
I have fixed something
This must work out I have fixed something
1. If conn successfully/unsuccessfully opens a stream: `newStreamCh` is closed, select goes further and the call returns `err` or `streamWrapper`
2. If timeout expires, we check if a stream has been successfully/unsuccessfully **opened** _at the same time_. We can't wait for the channel gets closed because then this PR doesn't make sense. If it's opened, then we process like in (1). If it's still _opening_, then we return an error and cancel context (see `cancel()`)
This must work out
aarifullin
commented
I suppose we don't need to close if it's successfully/unsuccessfully opened . The goal of this fix is to prevent hung open_ing_. So, if we have the result of > It is true that stream from (2) should be closed?
I suppose we don't need to close if it's successfully/unsuccessfully opened . The goal of this fix is to prevent hung open_ing_. So, if we have the result of `NewStream`, then we can return either opened stream or an error
fyrchik
commented
Is there any way you can transform this supposition into knowledge? Please, do. >I suppose
Is there any way you can transform this supposition into knowledge? Please, do.
aarifullin
commented
Check the changes out, please. Now it's closed if timer has expired Check the changes out, please. Now it's closed if timer has expired
|
|||||||
|
newStreamCh <- newStreamRes{
|
||||||
|
stream: stream,
|
||||||
fyrchik
commented
You have just entered You have just entered `<-dialTimeoutTimer.C` branch (`newStreamCh` could be closed or not closed)
goroutine leak can still happen in the `default` branch, because that's where `newStreamCh`
fyrchik
commented
I mean the only question is whether we need to close the stream. I mean the only question is whether we need to close the stream.
This inner select is useless.
aarifullin
commented
Sorry. When I read your comment first time:
I mistook these points for "any of this" case. So, now I understand you mean what if timer has expired, but Sorry. When I read your comment first time:
> - times fires
> - stream, nil is returned from NewStream
> - context is cancelled
I mistook these points for "any of this" case. So, now I understand you mean what if timer has expired, but `NewStream` is fine.
So, check the changes out: if stream has been opened but timer - expired, we close the stream
|
|||||||
|
err: err,
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var res newStreamRes
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-dialTimeoutTimer.C:
|
||||||
cancel()
|
cancel()
|
||||||
return nil, err
|
res = <-newStreamCh
|
||||||
|
if res.stream != nil && res.err == nil {
|
||||||
|
_ = res.stream.CloseSend()
|
||||||
|
}
|
||||||
|
return nil, context.Canceled
|
||||||
|
case res = <-newStreamCh:
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.err != nil {
|
||||||
|
cancel()
|
||||||
|
return nil, res.err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &streamWrapper{
|
return &streamWrapper{
|
||||||
ClientStream: stream,
|
ClientStream: res.stream,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
timeout: c.rwTimeout,
|
timeout: c.rwTimeout,
|
||||||
}, nil
|
}, nil
|
||||||
|
|
I believe, the gRPC client has one context for both dialing and streaming, and there's no separate dialing step in gRPC-Go v1.63+. Could you please double-check it?
You was right. I didn't have a clue that this value also cancels streaming. The approach has been changed, please, check this out