From c8283d96a37710125fd0e13003c41326f7650af9 Mon Sep 17 00:00:00 2001 From: Roman Loginov Date: Wed, 20 Nov 2024 11:35:34 +0300 Subject: [PATCH] [#15] tracing: Add events for grpc stream calls Signed-off-by: Roman Loginov --- tracing/grpc/client.go | 29 ++++++++++++++++++++++++++++- tracing/grpc/interceptors.go | 4 ++-- tracing/grpc/server.go | 29 ++++++++++++++++++++++++++--- 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/tracing/grpc/client.go b/tracing/grpc/client.go index ce750be..20d6a02 100644 --- a/tracing/grpc/client.go +++ b/tracing/grpc/client.go @@ -2,7 +2,10 @@ package grpc import ( "context" + "fmt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -10,14 +13,16 @@ import ( type clientStream struct { originalStream grpc.ClientStream desc *grpc.StreamDesc + span trace.Span finished chan<- error done <-chan struct{} } -func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, finished chan<- error, done <-chan struct{}) grpc.ClientStream { +func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, finished chan<- error, done <-chan struct{}) grpc.ClientStream { return &clientStream{ originalStream: originalStream, desc: desc, + span: span, finished: finished, done: done, } @@ -39,6 +44,9 @@ func (cs *clientStream) Trailer() metadata.MD { } func (cs *clientStream) CloseSend() error { + cs.span.AddEvent("start closing the send direction of the stream", trace.WithAttributes( + attribute.String("event.name", "client.stream.close.send.start")), + ) err := cs.originalStream.CloseSend() if err != nil { select { @@ -46,6 +54,9 @@ func (cs *clientStream) CloseSend() error { case cs.finished <- err: } } + cs.span.AddEvent("finish closing the send direction of the stream", trace.WithAttributes( + attribute.String("event.name", "client.stream.close.send.finish")), + ) return err } @@ -54,6 +65,10 @@ func (cs *clientStream) Context() context.Context { } func (cs *clientStream) SendMsg(m any) error { + cs.span.AddEvent("start sending a message", trace.WithAttributes( + attribute.String("event.name", "client.stream.send.msg.start"), + attribute.String("message.type", fmt.Sprintf("%T", m))), + ) err := cs.originalStream.SendMsg(m) if err != nil { select { @@ -61,10 +76,18 @@ func (cs *clientStream) SendMsg(m any) error { case cs.finished <- err: } } + cs.span.AddEvent("finish sending a message", trace.WithAttributes( + attribute.String("event.name", "client.stream.send.msg.finish"), + attribute.String("message.type", fmt.Sprintf("%T", m))), + ) return err } func (cs *clientStream) RecvMsg(m any) error { + cs.span.AddEvent("start receiving a message", trace.WithAttributes( + attribute.String("event.name", "client.stream.receive.msg.start"), + attribute.String("message.type", fmt.Sprintf("%T", m))), + ) err := cs.originalStream.RecvMsg(m) if err != nil || !cs.desc.ServerStreams { select { @@ -72,5 +95,9 @@ func (cs *clientStream) RecvMsg(m any) error { case cs.finished <- err: } } + cs.span.AddEvent("finish receiving a message", trace.WithAttributes( + attribute.String("event.name", "client.stream.receive.msg.finish"), + attribute.String("message.type", fmt.Sprintf("%T", m))), + ) return err } diff --git a/tracing/grpc/interceptors.go b/tracing/grpc/interceptors.go index a9bef07..21eeaf5 100644 --- a/tracing/grpc/interceptors.go +++ b/tracing/grpc/interceptors.go @@ -50,7 +50,7 @@ func NewStreamClientInterceptor() grpc.StreamClientInterceptor { finished := make(chan error) done := make(chan struct{}) - strWrp := newgRPCClientStream(str, desc, finished, done) + strWrp := newgRPCClientStream(str, desc, span, finished, done) go func() { defer close(done) @@ -107,7 +107,7 @@ func NewStreamServerInterceptor() grpc.StreamServerInterceptor { trace.WithSpanKind(trace.SpanKindServer)) defer span.End() - err := handler(srv, newgRPCServerStream(ctx, ss)) + err := handler(srv, newgRPCServerStream(ctx, ss, span)) setGRPCSpanStatus(span, err) return err diff --git a/tracing/grpc/server.go b/tracing/grpc/server.go index 766fdd8..941e791 100644 --- a/tracing/grpc/server.go +++ b/tracing/grpc/server.go @@ -2,7 +2,10 @@ package grpc import ( "context" + "fmt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -10,12 +13,14 @@ import ( type serverStream struct { originalStream grpc.ServerStream ctx context.Context // nolint:containedctx + span trace.Span } -func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream) grpc.ServerStream { +func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream, span trace.Span) grpc.ServerStream { return &serverStream{ originalStream: originalStream, ctx: ctx, + span: span, } } @@ -36,9 +41,27 @@ func (ss *serverStream) Context() context.Context { } func (ss *serverStream) SendMsg(m any) error { - return ss.originalStream.SendMsg(m) + ss.span.AddEvent("start sending a message", trace.WithAttributes( + attribute.String("event.name", "server.stream.send.msg.start"), + attribute.String("message.type", fmt.Sprintf("%T", m))), + ) + err := ss.originalStream.SendMsg(m) + ss.span.AddEvent("finish sending a message", trace.WithAttributes( + attribute.String("event.name", "server.stream.send.msg.finish"), + attribute.String("message.type", fmt.Sprintf("%T", m))), + ) + return err } func (ss *serverStream) RecvMsg(m any) error { - return ss.originalStream.RecvMsg(m) + ss.span.AddEvent("start receiving a message", trace.WithAttributes( + attribute.String("event.name", "server.stream.receive.msg.start"), + attribute.String("message.type", fmt.Sprintf("%T", m))), + ) + err := ss.originalStream.RecvMsg(m) + ss.span.AddEvent("finish receiving a message", trace.WithAttributes( + attribute.String("event.name", "server.stream.receive.msg.finish"), + attribute.String("message.type", fmt.Sprintf("%T", m))), + ) + return err }