forked from TrueCloudLab/frostfs-api-go
aa53fb7131
Remote gRPC server may not return or accept data for a while. gRPC solves this issue with timeout in context. However, the context is used for entire gRPC method invocation. Unfortunately the duration of requests with streams can't be estimated easily. To solve this issue we can specify timeouts for every message read and write. Single message has size limit so timeout can be related to that. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
93 lines
2 KiB
Go
93 lines
2 KiB
Go
package grpc
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// Message represents raw gRPC message.
|
|
type Message interface{}
|
|
|
|
// MessageReadWriter is a component interface
|
|
// for transmitting raw messages over gRPC protocol.
|
|
type MessageReadWriter interface {
|
|
// ReadMessage reads the next message from the remote server,
|
|
// and writes it to the argument.
|
|
ReadMessage(Message) error
|
|
|
|
// WriteMessage sends message from argument to remote server.
|
|
WriteMessage(Message) error
|
|
|
|
// Closes the communication session with the remote server.
|
|
//
|
|
// All calls to send/receive messages must be done before closing.
|
|
io.Closer
|
|
}
|
|
|
|
type streamWrapper struct {
|
|
grpc.ClientStream
|
|
timeout time.Duration
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func (w streamWrapper) ReadMessage(m Message) error {
|
|
return w.withTimeout(func() error {
|
|
return w.ClientStream.RecvMsg(m)
|
|
})
|
|
}
|
|
|
|
func (w streamWrapper) WriteMessage(m Message) error {
|
|
return w.withTimeout(func() error {
|
|
return w.ClientStream.SendMsg(m)
|
|
})
|
|
}
|
|
|
|
func (w *streamWrapper) Close() error {
|
|
return w.withTimeout(w.ClientStream.CloseSend)
|
|
}
|
|
|
|
func (w *streamWrapper) withTimeout(closure func() error) error {
|
|
ch := make(chan error, 1)
|
|
go func() {
|
|
ch <- closure()
|
|
close(ch)
|
|
}()
|
|
|
|
select {
|
|
case err := <-ch:
|
|
return err
|
|
case <-time.After(w.timeout):
|
|
w.cancel()
|
|
return context.DeadlineExceeded
|
|
}
|
|
}
|
|
|
|
// Init initiates a messaging session within the RPC configured by options.
|
|
func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) {
|
|
prm := defaultCallParameters()
|
|
|
|
for _, opt := range opts {
|
|
opt(prm)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(prm.ctx)
|
|
stream, err := c.con.NewStream(ctx, &grpc.StreamDesc{
|
|
StreamName: info.Name,
|
|
ServerStreams: info.ServerStream(),
|
|
ClientStreams: info.ClientStream(),
|
|
}, toMethodName(info))
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
return &streamWrapper{
|
|
ClientStream: stream,
|
|
cancel: cancel,
|
|
timeout: c.rwTimeout,
|
|
}, nil
|
|
}
|