Compare commits

..

No commits in common. "master" and "feat/tracing_attributes" have entirely different histories.

6 changed files with 9 additions and 59 deletions

View file

@ -1,3 +0,0 @@
.* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers
.forgejo/.* @potyarkin
Makefile @potyarkin

View file

@ -15,9 +15,6 @@ var clientMetrics = grpcprom.NewClientMetrics(
grpcprom.WithClientStreamRecvHistogram(
grpcprom.WithHistogramBuckets(prometheus.DefBuckets),
),
grpcprom.WithClientStreamSendHistogram(
grpcprom.WithHistogramBuckets(prometheus.DefBuckets),
),
)
func init() {
@ -60,12 +57,6 @@ func init() {
Help: "Histogram of response latency (seconds) of the gRPC single message receive.",
VariableLabels: []string{"grpc_type", "grpc_service", "grpc_method"},
},
{
Name: "grpc_client_msg_send_handling_seconds",
Type: dto.MetricType_HISTOGRAM.String(),
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
VariableLabels: []string{"grpc_type", "grpc_service", "grpc_method"},
},
}
metrics.MustRegister(clientMetrics, descs...)
}

View file

@ -82,7 +82,7 @@ func main() {
cc, err := grpc.NewClient(":7000",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(
tracing_grpc.NewUnaryClientInterceptor(),
tracing_grpc.NewUnaryClientInteceptor(),
verifyClientTraceID,
),
grpc.WithChainStreamInterceptor(

View file

@ -2,10 +2,7 @@ package grpc
import (
"context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@ -13,16 +10,14 @@ import (
type clientStream struct {
originalStream grpc.ClientStream
desc *grpc.StreamDesc
span trace.Span
finished chan<- error
done <-chan struct{}
}
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
return &clientStream{
originalStream: originalStream,
desc: desc,
span: span,
finished: finished,
done: done,
}
@ -44,7 +39,6 @@ func (cs *clientStream) Trailer() metadata.MD {
}
func (cs *clientStream) CloseSend() error {
cs.span.AddEvent("client.stream.close.send.start")
err := cs.originalStream.CloseSend()
if err != nil {
select {
@ -52,7 +46,6 @@ func (cs *clientStream) CloseSend() error {
case cs.finished <- err:
}
}
cs.span.AddEvent("client.stream.close.send.finish")
return err
}
@ -61,9 +54,6 @@ func (cs *clientStream) Context() context.Context {
}
func (cs *clientStream) SendMsg(m any) error {
cs.span.AddEvent("client.stream.send.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := cs.originalStream.SendMsg(m)
if err != nil {
select {
@ -71,16 +61,10 @@ func (cs *clientStream) SendMsg(m any) error {
case cs.finished <- err:
}
}
cs.span.AddEvent("client.stream.send.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err
}
func (cs *clientStream) RecvMsg(m any) error {
cs.span.AddEvent("client.stream.receive.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := cs.originalStream.RecvMsg(m)
if err != nil || !cs.desc.ServerStreams {
select {
@ -88,8 +72,5 @@ func (cs *clientStream) RecvMsg(m any) error {
case cs.finished <- err:
}
}
cs.span.AddEvent("client.stream.receive.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err
}

View file

@ -15,8 +15,8 @@ import (
"google.golang.org/grpc/status"
)
// NewUnaryClientInterceptor creates new gRPC unary interceptor to save gRPC client traces.
func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor {
// NewUnaryClientInteceptor creates new gRPC unary interceptor to save gRPC client traces.
func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, span := startClientSpan(ctx, cc, method)
defer span.End()
@ -50,7 +50,7 @@ func NewStreamClientInterceptor() grpc.StreamClientInterceptor {
finished := make(chan error)
done := make(chan struct{})
strWrp := newgRPCClientStream(str, desc, span, finished, done)
strWrp := newgRPCClientStream(str, desc, finished, done)
go func() {
defer close(done)
@ -107,7 +107,7 @@ func NewStreamServerInterceptor() grpc.StreamServerInterceptor {
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
err := handler(srv, newgRPCServerStream(ctx, ss, span))
err := handler(srv, newgRPCServerStream(ctx, ss))
setGRPCSpanStatus(span, err)
return err

View file

@ -2,10 +2,7 @@ package grpc
import (
"context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@ -13,14 +10,12 @@ import (
type serverStream struct {
originalStream grpc.ServerStream
ctx context.Context // nolint:containedctx
span trace.Span
}
func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream, span trace.Span) grpc.ServerStream {
func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream) grpc.ServerStream {
return &serverStream{
originalStream: originalStream,
ctx: ctx,
span: span,
}
}
@ -41,23 +36,9 @@ func (ss *serverStream) Context() context.Context {
}
func (ss *serverStream) SendMsg(m any) error {
ss.span.AddEvent("server.stream.send.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := ss.originalStream.SendMsg(m)
ss.span.AddEvent("server.stream.send.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err
return ss.originalStream.SendMsg(m)
}
func (ss *serverStream) RecvMsg(m any) error {
ss.span.AddEvent("server.stream.receive.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := ss.originalStream.RecvMsg(m)
ss.span.AddEvent("server.stream.receive.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err
return ss.originalStream.RecvMsg(m)
}