Compare commits

..

1 commit

Author SHA1 Message Date
a114637e54
[#327] rpc: Fix mem leak
All checks were successful
DCO / DCO (pull_request) Successful in 26s
Code generation / Generate proto (pull_request) Successful in 33s
Tests and linters / Tests (pull_request) Successful in 48s
Tests and linters / Lint (pull_request) Successful in 2m20s
gRPC stream must be closed by `cancel` to prevent memleak.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-01-30 12:04:13 +03:00

View file

@ -12,20 +12,18 @@ import (
// SendUnary initializes communication session by RPC info, performs unary RPC
// and closes the session.
func SendUnary(cli *Client, info common.CallMethodInfo, req, resp message.Message, opts ...CallOption) error {
rw, err := cli.initInternal(info, opts...)
rw, err := cli.Init(info, opts...)
if err != nil {
return err
}
err = rw.WriteMessage(req)
if err != nil {
rw.cancel()
return err
}
err = rw.ReadMessage(resp)
if err != nil {
rw.cancel()
return err
}
@ -57,7 +55,6 @@ func (c *clientStreamWriterCloser) Close() error {
}
if err = c.sw.ReadMessage(c.resp); err != nil {
c.sw.cancel()
return err
}
@ -88,7 +85,7 @@ type MessageReaderCloser interface {
}
type serverStreamReaderCloser struct {
rw *streamWrapper
rw MessageReadWriter
once sync.Once
@ -103,13 +100,11 @@ func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
})
if err != nil {
s.rw.cancel()
return err
}
err = s.rw.ReadMessage(msg)
if !errors.Is(err, io.EOF) {
s.rw.cancel()
return err
}