[#9999] rpc: Fix mem leak again
All checks were successful
DCO / DCO (pull_request) Successful in 27s
Code generation / Generate proto (pull_request) Successful in 33s
Tests and linters / Tests (pull_request) Successful in 44s
Tests and linters / Lint (pull_request) Successful in 1m53s

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-01-30 10:49:34 +03:00
parent 2786fadb25
commit 7f7539c895
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 43 additions and 14 deletions

View file

@ -38,18 +38,26 @@ type MessageWriterCloser interface {
}
type clientStreamWriterCloser struct {
MessageReadWriter
sw *streamWrapper
resp message.Message
}
// WriteMessage implements MessageWriterCloser.
func (c *clientStreamWriterCloser) WriteMessage(m message.Message) error {
return c.sw.WriteMessage(m)
}
func (c *clientStreamWriterCloser) Close() error {
err := c.MessageReadWriter.Close()
err := c.sw.closeSend()
if err != nil {
return err
}
return c.ReadMessage(c.resp)
if err = c.sw.ReadMessage(c.resp); err != nil {
return err
}
return c.sw.Close()
}
// OpenClientStream initializes communication session by RPC info, opens client-side stream
@ -57,14 +65,14 @@ func (c *clientStreamWriterCloser) Close() error {
//
// All stream writes must be performed before the closing. Close must be called once.
func OpenClientStream(cli *Client, info common.CallMethodInfo, resp message.Message, opts ...CallOption) (MessageWriterCloser, error) {
rw, err := cli.Init(info, opts...)
rw, err := cli.initInternal(info, opts...)
if err != nil {
return nil, err
}
return &clientStreamWriterCloser{
MessageReadWriter: rw,
resp: resp,
sw: rw,
resp: resp,
}, nil
}
@ -112,7 +120,7 @@ func (s *serverStreamReaderCloser) ReadMessage(msg message.Message) error {
//
// All stream reads must be performed before the closing. Close must be called once.
func OpenServerStream(cli *Client, info common.CallMethodInfo, req message.Message, opts ...CallOption) (MessageReader, error) {
rw, err := cli.Init(info, opts...)
rw, err := cli.initInternal(info, opts...)
if err != nil {
return nil, err
}

View file

@ -41,6 +41,10 @@ type MessageReadWriter interface {
// Init initiates a messaging session and returns the interface for message transmitting.
func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) {
return c.initInternal(info, opts...)
}
func (c *Client) initInternal(info common.CallMethodInfo, opts ...CallOption) (*streamWrapper, error) {
prm := defaultCallParameters()
for _, opt := range opts {
@ -52,7 +56,6 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
}
ctx, cancel := context.WithCancel(prm.ctx)
defer cancel()
// `conn.NewStream` doesn't check if `conn` may turn up invalidated right before this invocation.
// In such cases, the operation can hang indefinitely, with the context timeout being the only

View file

@ -2,6 +2,7 @@ package client
import (
"context"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
@ -10,11 +11,12 @@ import (
type streamWrapper struct {
grpc.ClientStream
timeout time.Duration
cancel context.CancelFunc
timeout time.Duration
cancel context.CancelFunc
closeSendOnce sync.Once
}
func (w streamWrapper) ReadMessage(m message.Message) error {
func (w *streamWrapper) ReadMessage(m message.Message) error {
// Can be optimized: we can create blank message here.
gm := m.ToGRPCMessage()
@ -28,14 +30,26 @@ func (w streamWrapper) ReadMessage(m message.Message) error {
return m.FromGRPCMessage(gm)
}
func (w streamWrapper) WriteMessage(m message.Message) error {
func (w *streamWrapper) WriteMessage(m message.Message) error {
return w.withTimeout(func() error {
return w.ClientStream.SendMsg(m.ToGRPCMessage())
})
}
func (w *streamWrapper) closeSend() error {
var err error
w.closeSendOnce.Do(
func() {
err = w.withTimeout(w.ClientStream.CloseSend)
},
)
return err
}
func (w *streamWrapper) Close() error {
return w.withTimeout(w.ClientStream.CloseSend)
err := w.closeSend()
w.cancel()
return err
}
func (w *streamWrapper) withTimeout(closure func() error) error {
@ -50,6 +64,10 @@ func (w *streamWrapper) withTimeout(closure func() error) error {
select {
case err := <-ch:
tt.Stop()
select {
case <-tt.C:
default:
}
return err
case <-tt.C:
w.cancel()