Compare commits

..

1 commit

Author SHA1 Message Date
12f64696df
[#327] rpc: Fix mem leak
All checks were successful
DCO / DCO (pull_request) Successful in 27s
Code generation / Generate proto (pull_request) Successful in 33s
Tests and linters / Tests (pull_request) Successful in 44s
Tests and linters / Lint (pull_request) Successful in 1m50s
gRPC stream must be closed by `cancel` to prevent memleak.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-01-30 12:17:25 +03:00

View file

@ -12,18 +12,20 @@ import (
// SendUnary initializes communication session by RPC info, performs unary RPC
// and closes the session.
func SendUnary(cli *Client, info common.CallMethodInfo, req, resp message.Message, opts ...CallOption) error {
rw, err := cli.Init(info, opts...)
rw, err := cli.initInternal(info, opts...)
if err != nil {
return err
}
err = rw.WriteMessage(req)
if err != nil {
rw.cancel()
return err
}
err = rw.ReadMessage(resp)
if err != nil {
rw.cancel()
return err
}
@ -55,6 +57,7 @@ func (c *clientStreamWriterCloser) Close() error {
}
if err = c.sw.ReadMessage(c.resp); err != nil {
c.sw.cancel()
return err
}
@ -85,7 +88,7 @@ type MessageReaderCloser interface {
}
type serverStreamReaderCloser struct {
rw MessageReadWriter
rw *streamWrapper
once sync.Once
@ -100,11 +103,13 @@ func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
})
if err != nil {
s.rw.cancel()
return err
}
err = s.rw.ReadMessage(msg)
if !errors.Is(err, io.EOF) {
s.rw.cancel()
return err
}