frostfs-observability/tracing/grpc/client.go

77 lines
1.4 KiB
Go
Raw Permalink Normal View History

package grpc
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
type clientStream struct {
originalStream grpc.ClientStream
desc *grpc.StreamDesc
finished chan<- error
done <-chan struct{}
}
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
return &clientStream{
originalStream: originalStream,
desc: desc,
finished: finished,
done: done,
}
}
func (cs *clientStream) Header() (metadata.MD, error) {
md, err := cs.originalStream.Header()
if err != nil {
select {
case <-cs.done:
case cs.finished <- err:
}
}
return md, err
}
func (cs *clientStream) Trailer() metadata.MD {
return cs.originalStream.Trailer()
}
func (cs *clientStream) CloseSend() error {
err := cs.originalStream.CloseSend()
if err != nil {
select {
case <-cs.done:
case cs.finished <- err:
}
}
return err
}
func (cs *clientStream) Context() context.Context {
return cs.originalStream.Context()
}
func (cs *clientStream) SendMsg(m any) error {
err := cs.originalStream.SendMsg(m)
if err != nil {
select {
case <-cs.done:
case cs.finished <- err:
}
}
return err
}
func (cs *clientStream) RecvMsg(m any) error {
err := cs.originalStream.RecvMsg(m)
if err != nil || !cs.desc.ServerStreams {
select {
case <-cs.done:
case cs.finished <- err:
}
}
return err
}