generated from TrueCloudLab/basic
[#15] tracing: Add events for grpc stream calls #18
3 changed files with 44 additions and 6 deletions
|
@ -2,7 +2,10 @@ package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
)
|
)
|
||||||
|
@ -10,14 +13,16 @@ import (
|
||||||
type clientStream struct {
|
type clientStream struct {
|
||||||
originalStream grpc.ClientStream
|
originalStream grpc.ClientStream
|
||||||
desc *grpc.StreamDesc
|
desc *grpc.StreamDesc
|
||||||
|
span trace.Span
|
||||||
finished chan<- error
|
finished chan<- error
|
||||||
done <-chan struct{}
|
done <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
|
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
|
||||||
return &clientStream{
|
return &clientStream{
|
||||||
originalStream: originalStream,
|
originalStream: originalStream,
|
||||||
desc: desc,
|
desc: desc,
|
||||||
|
span: span,
|
||||||
finished: finished,
|
finished: finished,
|
||||||
done: done,
|
done: done,
|
||||||
}
|
}
|
||||||
|
@ -39,6 +44,7 @@ func (cs *clientStream) Trailer() metadata.MD {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) CloseSend() error {
|
func (cs *clientStream) CloseSend() error {
|
||||||
|
cs.span.AddEvent("client.stream.close.send.start")
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
err := cs.originalStream.CloseSend()
|
err := cs.originalStream.CloseSend()
|
||||||
alexvanin
commented
Is there any difference between this code and
I mean, event name and description are basically self explanatory. Maybe we can save couple of bytes during tracing transmission to collector? What you think @fyrchik, @dstepanov-yadro ? Is there any difference between this code and
```
cs.span.AddEvent("client.stream.close.send.start")
```
I mean, event name and description are basically self explanatory. Maybe we can save couple of bytes during tracing transmission to collector?
What you think @fyrchik, @dstepanov-yadro ?
fyrchik
commented
I have looked into opentelemetry code and they use names which do NOT conform to the semantic convention described in docs But I like your suggestion a lot more, it is smaller and seems to achieve exactly what we need. I have looked into opentelemetry code and they use names which do NOT conform to the semantic convention described in docs https://github.com/open-telemetry/opentelemetry-go/blob/b99d2b81783dd3d27201393fc0e741a6fb4a8d6b/bridge/opencensus/internal/span.go#L84
So I am not sure how to write `AddEvent` right.
But I like your suggestion a lot more, it is smaller and seems to achieve exactly what we need.
dstepanov-yadro
commented
I agree with @fyrchik I agree with @fyrchik
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
select {
|
select {
|
||||||
|
@ -46,6 +52,7 @@ func (cs *clientStream) CloseSend() error {
|
||||||
case cs.finished <- err:
|
case cs.finished <- err:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cs.span.AddEvent("client.stream.close.send.finish")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,6 +61,9 @@ func (cs *clientStream) Context() context.Context {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) SendMsg(m any) error {
|
func (cs *clientStream) SendMsg(m any) error {
|
||||||
|
cs.span.AddEvent("client.stream.send.msg.start", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
err := cs.originalStream.SendMsg(m)
|
err := cs.originalStream.SendMsg(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
select {
|
select {
|
||||||
|
@ -61,10 +71,16 @@ func (cs *clientStream) SendMsg(m any) error {
|
||||||
case cs.finished <- err:
|
case cs.finished <- err:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cs.span.AddEvent("client.stream.send.msg.finish", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) RecvMsg(m any) error {
|
func (cs *clientStream) RecvMsg(m any) error {
|
||||||
|
cs.span.AddEvent("client.stream.receive.msg.start", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
err := cs.originalStream.RecvMsg(m)
|
err := cs.originalStream.RecvMsg(m)
|
||||||
if err != nil || !cs.desc.ServerStreams {
|
if err != nil || !cs.desc.ServerStreams {
|
||||||
select {
|
select {
|
||||||
|
@ -72,5 +88,8 @@ func (cs *clientStream) RecvMsg(m any) error {
|
||||||
case cs.finished <- err:
|
case cs.finished <- err:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cs.span.AddEvent("client.stream.receive.msg.finish", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,7 @@ func NewStreamClientInterceptor() grpc.StreamClientInterceptor {
|
||||||
|
|
||||||
finished := make(chan error)
|
finished := make(chan error)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
strWrp := newgRPCClientStream(str, desc, finished, done)
|
strWrp := newgRPCClientStream(str, desc, span, finished, done)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(done)
|
defer close(done)
|
||||||
|
@ -107,7 +107,7 @@ func NewStreamServerInterceptor() grpc.StreamServerInterceptor {
|
||||||
trace.WithSpanKind(trace.SpanKindServer))
|
trace.WithSpanKind(trace.SpanKindServer))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
err := handler(srv, newgRPCServerStream(ctx, ss))
|
err := handler(srv, newgRPCServerStream(ctx, ss, span))
|
||||||
|
|
||||||
setGRPCSpanStatus(span, err)
|
setGRPCSpanStatus(span, err)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -2,7 +2,10 @@ package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
)
|
)
|
||||||
|
@ -10,12 +13,14 @@ import (
|
||||||
type serverStream struct {
|
type serverStream struct {
|
||||||
originalStream grpc.ServerStream
|
originalStream grpc.ServerStream
|
||||||
ctx context.Context // nolint:containedctx
|
ctx context.Context // nolint:containedctx
|
||||||
|
span trace.Span
|
||||||
}
|
}
|
||||||
|
|
||||||
func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream) grpc.ServerStream {
|
func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream, span trace.Span) grpc.ServerStream {
|
||||||
return &serverStream{
|
return &serverStream{
|
||||||
originalStream: originalStream,
|
originalStream: originalStream,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
span: span,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,9 +41,23 @@ func (ss *serverStream) Context() context.Context {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *serverStream) SendMsg(m any) error {
|
func (ss *serverStream) SendMsg(m any) error {
|
||||||
return ss.originalStream.SendMsg(m)
|
ss.span.AddEvent("server.stream.send.msg.start", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
|
err := ss.originalStream.SendMsg(m)
|
||||||
|
ss.span.AddEvent("server.stream.send.msg.finish", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *serverStream) RecvMsg(m any) error {
|
func (ss *serverStream) RecvMsg(m any) error {
|
||||||
return ss.originalStream.RecvMsg(m)
|
ss.span.AddEvent("server.stream.receive.msg.start", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
|
err := ss.originalStream.RecvMsg(m)
|
||||||
|
ss.span.AddEvent("server.stream.receive.msg.finish", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue
Please, take a look at https://opentelemetry.io/docs/specs/semconv/general/events/ and https://opentelemetry.io/docs/specs/semconv/general/attribute-naming/
I believe what you use as a value for a
event.name
attribute, should be the first argument toAddEvent
.Event names in this PR don't follow semantic conventions.
Sorry, it seems you are right, opentelemetry itself uses human readable names in the first argument.