forked from TrueCloudLab/frostfs-sdk-go
125 lines
2.5 KiB
Go
125 lines
2.5 KiB
Go
|
package client
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"io"
|
||
|
"sync"
|
||
|
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
|
||
|
)
|
||
|
|
||
|
// SendUnary initializes communication session by RPC info, performs unary RPC
|
||
|
// and closes the session.
|
||
|
func SendUnary(cli *Client, info common.CallMethodInfo, req, resp message.Message, opts ...CallOption) error {
|
||
|
rw, err := cli.Init(info, opts...)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = rw.WriteMessage(req)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = rw.ReadMessage(resp)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return rw.Close()
|
||
|
}
|
||
|
|
||
|
// MessageWriterCloser wraps MessageWriter
|
||
|
// and io.Closer interfaces.
|
||
|
type MessageWriterCloser interface {
|
||
|
MessageWriter
|
||
|
io.Closer
|
||
|
}
|
||
|
|
||
|
type clientStreamWriterCloser struct {
|
||
|
MessageReadWriter
|
||
|
|
||
|
resp message.Message
|
||
|
}
|
||
|
|
||
|
func (c *clientStreamWriterCloser) Close() error {
|
||
|
err := c.MessageReadWriter.Close()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return c.ReadMessage(c.resp)
|
||
|
}
|
||
|
|
||
|
// OpenClientStream initializes communication session by RPC info, opens client-side stream
|
||
|
// and returns its interface.
|
||
|
//
|
||
|
// All stream writes must be performed before the closing. Close must be called once.
|
||
|
func OpenClientStream(cli *Client, info common.CallMethodInfo, resp message.Message, opts ...CallOption) (MessageWriterCloser, error) {
|
||
|
rw, err := cli.Init(info, opts...)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return &clientStreamWriterCloser{
|
||
|
MessageReadWriter: rw,
|
||
|
resp: resp,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// MessageReaderCloser wraps MessageReader
|
||
|
// and io.Closer interface.
|
||
|
type MessageReaderCloser interface {
|
||
|
MessageReader
|
||
|
io.Closer
|
||
|
}
|
||
|
|
||
|
type serverStreamReaderCloser struct {
|
||
|
rw MessageReadWriter
|
||
|
|
||
|
once sync.Once
|
||
|
|
||
|
req message.Message
|
||
|
}
|
||
|
|
||
|
func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
|
||
|
var err error
|
||
|
|
||
|
s.once.Do(func() {
|
||
|
err = s.rw.WriteMessage(s.req)
|
||
|
})
|
||
|
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = s.rw.ReadMessage(msg)
|
||
|
if !errors.Is(err, io.EOF) {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = s.rw.Close()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return io.EOF
|
||
|
}
|
||
|
|
||
|
// OpenServerStream initializes communication session by RPC info, opens server-side stream
|
||
|
// and returns its interface.
|
||
|
//
|
||
|
// All stream reads must be performed before the closing. Close must be called once.
|
||
|
func OpenServerStream(cli *Client, info common.CallMethodInfo, req message.Message, opts ...CallOption) (MessageReader, error) {
|
||
|
rw, err := cli.Init(info, opts...)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return &serverStreamReaderCloser{
|
||
|
rw: rw,
|
||
|
req: req,
|
||
|
}, nil
|
||
|
}
|