package client import ( "context" "io" "time" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message" "google.golang.org/grpc" ) // MessageReader is an interface of the Message reader. type MessageReader interface { // ReadMessage reads the next Message. // // Returns io.EOF if there are no more messages to read. // ReadMessage should not be called after io.EOF occasion. ReadMessage(message.Message) error } // MessageWriter is an interface of the Message writer. type MessageWriter interface { // WriteMessage writers the next Message. // // WriteMessage should not be called after any error. WriteMessage(message.Message) error } // MessageReadWriter is a component interface // for transmitting raw Protobuf messages. type MessageReadWriter interface { MessageReader MessageWriter // Closes the communication session. // // All calls to send/receive messages must be done before closing. io.Closer } // Init initiates a messaging session and returns the interface for message transmitting. func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) { prm := defaultCallParameters() for _, opt := range opts { opt(prm) } if err := c.openGRPCConn(prm.ctx, prm.dialer); err != nil { return nil, err } ctx, cancel := context.WithCancel(prm.ctx) // `conn.NewStream` doesn't check if `conn` may turn up invalidated right before this invocation. // In such cases, the operation can hang indefinitely, with the context timeout being the only // mechanism to cancel it. // // We use a separate timer instead of context timeout because the latter // would propagate to all subsequent read/write operations on the opened stream, // which is not desired for the stream's lifecycle management. dialTimeoutTimer := time.NewTimer(c.dialTimeout) defer dialTimeoutTimer.Stop() type newStreamRes struct { stream grpc.ClientStream err error } newStreamCh := make(chan newStreamRes) go func() { stream, err := c.conn.NewStream(ctx, &grpc.StreamDesc{ StreamName: info.Name, ServerStreams: info.ServerStream(), ClientStreams: info.ClientStream(), }, toMethodName(info)) newStreamCh <- newStreamRes{ stream: stream, err: err, } }() var res newStreamRes select { case <-dialTimeoutTimer.C: cancel() res = <-newStreamCh if res.stream != nil && res.err == nil { _ = res.stream.CloseSend() } return nil, context.Canceled case res = <-newStreamCh: } if res.err != nil { cancel() return nil, res.err } return &streamWrapper{ ClientStream: res.stream, cancel: cancel, timeout: c.rwTimeout, }, nil }