frostfs-sdk-go/api/rpc/client/stream_wrapper.go
Pavel Pogodaev 6ce73790ea
All checks were successful
DCO / DCO (pull_request) Successful in 38s
Tests and linters / Tests (pull_request) Successful in 1m13s
Tests and linters / Lint (pull_request) Successful in 2m36s
[#276] Merge repo with frostfs-api-go
Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
2024-10-22 14:05:12 +00:00

58 lines
1 KiB
Go

package client
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/message"
"google.golang.org/grpc"
)
type streamWrapper struct {
grpc.ClientStream
timeout time.Duration
cancel context.CancelFunc
}
func (w streamWrapper) ReadMessage(m message.Message) error {
// Can be optimized: we can create blank message here.
gm := m.ToGRPCMessage()
err := w.withTimeout(func() error {
return w.ClientStream.RecvMsg(gm)
})
if err != nil {
return err
}
return m.FromGRPCMessage(gm)
}
func (w streamWrapper) WriteMessage(m message.Message) error {
return w.withTimeout(func() error {
return w.ClientStream.SendMsg(m.ToGRPCMessage())
})
}
func (w *streamWrapper) Close() error {
return w.withTimeout(w.ClientStream.CloseSend)
}
func (w *streamWrapper) withTimeout(closure func() error) error {
ch := make(chan error, 1)
go func() {
ch <- closure()
close(ch)
}()
tt := time.NewTimer(w.timeout)
select {
case err := <-ch:
tt.Stop()
return err
case <-tt.C:
w.cancel()
return context.DeadlineExceeded
}
}