[#15] tracing: Add events for grpc stream calls
All checks were successful
DCO action / DCO (pull_request) Successful in 1m21s
Tests and linters / Staticcheck (pull_request) Successful in 1m25s
Tests and linters / Tests (pull_request) Successful in 1m40s
Tests and linters / Tests with -race (pull_request) Successful in 1m53s
Tests and linters / Lint (pull_request) Successful in 2m22s

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
This commit is contained in:
Roman Loginov 2024-11-20 11:35:34 +03:00
parent f17779933e
commit f1ed8fd502
3 changed files with 33 additions and 4 deletions

View file

@ -2,7 +2,10 @@ package grpc
import (
"context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@ -10,14 +13,16 @@ import (
type clientStream struct {
originalStream grpc.ClientStream
desc *grpc.StreamDesc
span trace.Span
finished chan<- error
done <-chan struct{}
}
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
return &clientStream{
originalStream: originalStream,
desc: desc,
span: span,
finished: finished,
done: done,
}
@ -39,6 +44,9 @@ func (cs *clientStream) Trailer() metadata.MD {
}
func (cs *clientStream) CloseSend() error {
cs.span.AddEvent("closing the send direction of the stream", trace.WithAttributes(
attribute.String("event.name", "client.stream.close.send")),
)
err := cs.originalStream.CloseSend()
if err != nil {
select {
@ -54,6 +62,10 @@ func (cs *clientStream) Context() context.Context {
}
func (cs *clientStream) SendMsg(m any) error {
cs.span.AddEvent("sending a message", trace.WithAttributes(
attribute.String("event.name", "client.stream.send.msg"),
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := cs.originalStream.SendMsg(m)
if err != nil {
select {
@ -65,6 +77,10 @@ func (cs *clientStream) SendMsg(m any) error {
}
func (cs *clientStream) RecvMsg(m any) error {
cs.span.AddEvent("receiving a message", trace.WithAttributes(
attribute.String("event.name", "client.stream.receive.msg"),
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := cs.originalStream.RecvMsg(m)
if err != nil || !cs.desc.ServerStreams {
select {

View file

@ -50,7 +50,7 @@ func NewStreamClientInterceptor() grpc.StreamClientInterceptor {
finished := make(chan error)
done := make(chan struct{})
strWrp := newgRPCClientStream(str, desc, finished, done)
strWrp := newgRPCClientStream(str, desc, span, finished, done)
go func() {
defer close(done)
@ -107,7 +107,7 @@ func NewStreamServerInterceptor() grpc.StreamServerInterceptor {
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
err := handler(srv, newgRPCServerStream(ctx, ss))
err := handler(srv, newgRPCServerStream(ctx, ss, span))
setGRPCSpanStatus(span, err)
return err

View file

@ -2,7 +2,10 @@ package grpc
import (
"context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@ -10,12 +13,14 @@ import (
type serverStream struct {
originalStream grpc.ServerStream
ctx context.Context // nolint:containedctx
span trace.Span
}
func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream) grpc.ServerStream {
func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream, span trace.Span) grpc.ServerStream {
return &serverStream{
originalStream: originalStream,
ctx: ctx,
span: span,
}
}
@ -36,9 +41,17 @@ func (ss *serverStream) Context() context.Context {
}
func (ss *serverStream) SendMsg(m any) error {
ss.span.AddEvent("sending a message", trace.WithAttributes(
attribute.String("event.name", "server.stream.send.msg"),
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return ss.originalStream.SendMsg(m)
}
func (ss *serverStream) RecvMsg(m any) error {
ss.span.AddEvent("receiving a message", trace.WithAttributes(
attribute.String("event.name", "server.stream.receive.msg"),
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return ss.originalStream.RecvMsg(m)
}