[#15] tracing: Add events for grpc stream calls #18

3 changed files with 44 additions and 6 deletions

View file

@ -2,7 +2,10 @@ package grpc
import ( import (
"context" "context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
@ -10,14 +13,16 @@ import (
type clientStream struct { type clientStream struct {
originalStream grpc.ClientStream originalStream grpc.ClientStream
desc *grpc.StreamDesc desc *grpc.StreamDesc
span trace.Span
finished chan<- error finished chan<- error
done <-chan struct{} done <-chan struct{}
} }
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, finished chan<- error, done <-chan struct{}) grpc.ClientStream { func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
return &clientStream{ return &clientStream{
originalStream: originalStream, originalStream: originalStream,
desc: desc, desc: desc,
span: span,
finished: finished, finished: finished,
done: done, done: done,
} }
@ -39,6 +44,7 @@ func (cs *clientStream) Trailer() metadata.MD {
} }
func (cs *clientStream) CloseSend() error { func (cs *clientStream) CloseSend() error {
cs.span.AddEvent("client.stream.close.send.start")
fyrchik marked this conversation as resolved Outdated

Please, take a look at https://opentelemetry.io/docs/specs/semconv/general/events/ and https://opentelemetry.io/docs/specs/semconv/general/attribute-naming/

I believe what you use as a value for a event.name attribute, should be the first argument to AddEvent.
Event names in this PR don't follow semantic conventions.

Please, take a look at https://opentelemetry.io/docs/specs/semconv/general/events/ and https://opentelemetry.io/docs/specs/semconv/general/attribute-naming/ I believe what you use as a value for a `event.name` attribute, should be the first argument to `AddEvent`. Event names in this PR don't follow semantic conventions.

Sorry, it seems you are right, opentelemetry itself uses human readable names in the first argument.

Sorry, it seems you are right, opentelemetry itself uses human readable names in the first argument.
err := cs.originalStream.CloseSend() err := cs.originalStream.CloseSend()

Is there any difference between this code and

cs.span.AddEvent("client.stream.close.send.start")

I mean, event name and description are basically self explanatory. Maybe we can save couple of bytes during tracing transmission to collector?

What you think @fyrchik, @dstepanov-yadro ?

Is there any difference between this code and ``` cs.span.AddEvent("client.stream.close.send.start") ``` I mean, event name and description are basically self explanatory. Maybe we can save couple of bytes during tracing transmission to collector? What you think @fyrchik, @dstepanov-yadro ?

I have looked into opentelemetry code and they use names which do NOT conform to the semantic convention described in docs b99d2b8178/bridge/opencensus/internal/span.go (L84)
So I am not sure how to write AddEvent right.

But I like your suggestion a lot more, it is smaller and seems to achieve exactly what we need.

I have looked into opentelemetry code and they use names which do NOT conform to the semantic convention described in docs https://github.com/open-telemetry/opentelemetry-go/blob/b99d2b81783dd3d27201393fc0e741a6fb4a8d6b/bridge/opencensus/internal/span.go#L84 So I am not sure how to write `AddEvent` right. But I like your suggestion a lot more, it is smaller and seems to achieve exactly what we need.

I agree with @fyrchik

I agree with @fyrchik
if err != nil { if err != nil {
select { select {
@ -46,6 +52,7 @@ func (cs *clientStream) CloseSend() error {
case cs.finished <- err: case cs.finished <- err:
} }
} }
cs.span.AddEvent("client.stream.close.send.finish")
return err return err
} }
@ -54,6 +61,9 @@ func (cs *clientStream) Context() context.Context {
} }
func (cs *clientStream) SendMsg(m any) error { func (cs *clientStream) SendMsg(m any) error {
cs.span.AddEvent("client.stream.send.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := cs.originalStream.SendMsg(m) err := cs.originalStream.SendMsg(m)
if err != nil { if err != nil {
select { select {
@ -61,10 +71,16 @@ func (cs *clientStream) SendMsg(m any) error {
case cs.finished <- err: case cs.finished <- err:
} }
} }
cs.span.AddEvent("client.stream.send.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err return err
} }
func (cs *clientStream) RecvMsg(m any) error { func (cs *clientStream) RecvMsg(m any) error {
cs.span.AddEvent("client.stream.receive.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := cs.originalStream.RecvMsg(m) err := cs.originalStream.RecvMsg(m)
if err != nil || !cs.desc.ServerStreams { if err != nil || !cs.desc.ServerStreams {
select { select {
@ -72,5 +88,8 @@ func (cs *clientStream) RecvMsg(m any) error {
case cs.finished <- err: case cs.finished <- err:
} }
} }
cs.span.AddEvent("client.stream.receive.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err return err
} }

View file

@ -50,7 +50,7 @@ func NewStreamClientInterceptor() grpc.StreamClientInterceptor {
finished := make(chan error) finished := make(chan error)
done := make(chan struct{}) done := make(chan struct{})
strWrp := newgRPCClientStream(str, desc, finished, done) strWrp := newgRPCClientStream(str, desc, span, finished, done)
go func() { go func() {
defer close(done) defer close(done)
@ -107,7 +107,7 @@ func NewStreamServerInterceptor() grpc.StreamServerInterceptor {
trace.WithSpanKind(trace.SpanKindServer)) trace.WithSpanKind(trace.SpanKindServer))
defer span.End() defer span.End()
err := handler(srv, newgRPCServerStream(ctx, ss)) err := handler(srv, newgRPCServerStream(ctx, ss, span))
setGRPCSpanStatus(span, err) setGRPCSpanStatus(span, err)
return err return err

View file

@ -2,7 +2,10 @@ package grpc
import ( import (
"context" "context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
@ -10,12 +13,14 @@ import (
type serverStream struct { type serverStream struct {
originalStream grpc.ServerStream originalStream grpc.ServerStream
ctx context.Context // nolint:containedctx ctx context.Context // nolint:containedctx
span trace.Span
} }
func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream) grpc.ServerStream { func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream, span trace.Span) grpc.ServerStream {
return &serverStream{ return &serverStream{
originalStream: originalStream, originalStream: originalStream,
ctx: ctx, ctx: ctx,
span: span,
} }
} }
@ -36,9 +41,23 @@ func (ss *serverStream) Context() context.Context {
} }
func (ss *serverStream) SendMsg(m any) error { func (ss *serverStream) SendMsg(m any) error {
return ss.originalStream.SendMsg(m) ss.span.AddEvent("server.stream.send.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := ss.originalStream.SendMsg(m)
ss.span.AddEvent("server.stream.send.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err
} }
func (ss *serverStream) RecvMsg(m any) error { func (ss *serverStream) RecvMsg(m any) error {
return ss.originalStream.RecvMsg(m) ss.span.AddEvent("server.stream.receive.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := ss.originalStream.RecvMsg(m)
ss.span.AddEvent("server.stream.receive.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err
} }