frostfs-sdk-go/api/rpc/client/flows.go
Dmitrii Stepanov 12f64696df
All checks were successful
DCO / DCO (pull_request) Successful in 27s
Code generation / Generate proto (pull_request) Successful in 33s
Tests and linters / Tests (pull_request) Successful in 44s
Tests and linters / Lint (pull_request) Successful in 1m50s
[#327] rpc: Fix mem leak
gRPC stream must be closed by `cancel` to prevent memleak.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-01-30 12:17:25 +03:00

138 lines
2.8 KiB
Go

package client
import (
"errors"
"io"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
)
// SendUnary initializes communication session by RPC info, performs unary RPC
// and closes the session.
func SendUnary(cli *Client, info common.CallMethodInfo, req, resp message.Message, opts ...CallOption) error {
rw, err := cli.initInternal(info, opts...)
if err != nil {
return err
}
err = rw.WriteMessage(req)
if err != nil {
rw.cancel()
return err
}
err = rw.ReadMessage(resp)
if err != nil {
rw.cancel()
return err
}
return rw.Close()
}
// MessageWriterCloser wraps MessageWriter
// and io.Closer interfaces.
type MessageWriterCloser interface {
MessageWriter
io.Closer
}
type clientStreamWriterCloser struct {
sw *streamWrapper
resp message.Message
}
// WriteMessage implements MessageWriterCloser.
func (c *clientStreamWriterCloser) WriteMessage(m message.Message) error {
return c.sw.WriteMessage(m)
}
func (c *clientStreamWriterCloser) Close() error {
err := c.sw.closeSend()
if err != nil {
c.sw.cancel()
return err
}
if err = c.sw.ReadMessage(c.resp); err != nil {
c.sw.cancel()
return err
}
return c.sw.Close()
}
// OpenClientStream initializes communication session by RPC info, opens client-side stream
// and returns its interface.
//
// All stream writes must be performed before the closing. Close must be called once.
func OpenClientStream(cli *Client, info common.CallMethodInfo, resp message.Message, opts ...CallOption) (MessageWriterCloser, error) {
rw, err := cli.initInternal(info, opts...)
if err != nil {
return nil, err
}
return &clientStreamWriterCloser{
sw: rw,
resp: resp,
}, nil
}
// MessageReaderCloser wraps MessageReader
// and io.Closer interface.
type MessageReaderCloser interface {
MessageReader
io.Closer
}
type serverStreamReaderCloser struct {
rw *streamWrapper
once sync.Once
req message.Message
}
func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
var err error
s.once.Do(func() {
err = s.rw.WriteMessage(s.req)
})
if err != nil {
s.rw.cancel()
return err
}
err = s.rw.ReadMessage(msg)
if !errors.Is(err, io.EOF) {
s.rw.cancel()
return err
}
err = s.rw.Close()
if err != nil {
return err
}
return io.EOF
}
// OpenServerStream initializes communication session by RPC info, opens server-side stream
// and returns its interface.
//
// All stream reads must be performed before the closing. Close must be called once.
func OpenServerStream(cli *Client, info common.CallMethodInfo, req message.Message, opts ...CallOption) (MessageReader, error) {
rw, err := cli.initInternal(info, opts...)
if err != nil {
return nil, err
}
return &serverStreamReaderCloser{
rw: rw,
req: req,
}, nil
}