frostfs-api-go/rpc/client/stream_wrapper.go
Airat Arifullin bc16a32c24 [#40] types: Generate StableMarshaler/StableSize methods for protobufs
* Add plugin option for protogen in Makefile
* Fix the generator for the plugin in util/protogen
* Erase convertable types, move helpful methods to gRPC protobufs
* Erase helpers for convertations
* Generate StableMarshlal/StableSize for protobufs by the protoc plugin

Signed-off-by: Airat Arifullin a.arifullin@yadro.com
2023-07-10 12:08:48 +03:00

50 lines
876 B
Go

package client
import (
"context"
"time"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
type streamWrapper struct {
grpc.ClientStream
timeout time.Duration
cancel context.CancelFunc
}
func (w streamWrapper) ReadMessage(m proto.Message) error {
return w.withTimeout(func() error {
return w.ClientStream.RecvMsg(m)
})
}
func (w streamWrapper) WriteMessage(m proto.Message) error {
return w.withTimeout(func() error {
return w.ClientStream.SendMsg(m)
})
}
func (w *streamWrapper) Close() error {
return w.withTimeout(w.ClientStream.CloseSend)
}
func (w *streamWrapper) withTimeout(closure func() error) error {
ch := make(chan error, 1)
go func() {
ch <- closure()
close(ch)
}()
tt := time.NewTimer(w.timeout)
select {
case err := <-ch:
tt.Stop()
return err
case <-tt.C:
w.cancel()
return context.DeadlineExceeded
}
}