package tracing import ( "context" "io" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" grpc_codes "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // NewGRPCUnaryClientInteceptor creates new gRPC unary interceptor to save gRPC client traces. func NewGRPCUnaryClientInteceptor() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ctx, span := startClientSpan(ctx, cc, method) defer span.End() err := invoker(ctx, method, req, reply, cc, opts...) if err != nil { grpcStatus, _ := status.FromError(err) span.SetStatus(codes.Error, grpcStatus.Message()) span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatus.Code()))) } else { span.SetStatus(codes.Ok, "") span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpc_codes.OK))) } return err } } // NewGRPCStreamClientInterceptor creates new gRPC stream interceptor to save gRPC client traces. func NewGRPCStreamClientInterceptor() grpc.StreamClientInterceptor { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { ctx, span := startClientSpan(ctx, cc, method) str, err := streamer(ctx, desc, cc, method, opts...) if err != nil { grpcStatus, _ := status.FromError(err) span.SetStatus(codes.Error, grpcStatus.Message()) span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatus.Code()))) span.End() return str, err } finished := make(chan error) done := make(chan struct{}) strWrp := newgRPCClientStream(str, desc, finished, done) go func() { defer close(done) defer span.End() select { case err := <-finished: if err == nil || err == io.EOF { setGRPCSpanStatus(span, nil) } else { setGRPCSpanStatus(span, err) } return case <-ctx.Done(): setGRPCSpanStatus(span, ctx.Err()) return } }() return strWrp, nil } } // NewGRPCUnaryServerInterceptor creates new gRPC unary interceptor to save gRPC server traces. func NewGRPCUnaryServerInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { ctx = extractGRPCTraceInfo(ctx) var span trace.Span ctx, span = StartSpanFromContext(ctx, info.FullMethod, trace.WithAttributes( semconv.RPCSystemGRPC, semconv.RPCMethod(info.FullMethod), ), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() resp, err = handler(ctx, req) setGRPCSpanStatus(span, err) return } } // NewGRPCStreamServerInterceptor creates new gRPC stream interceptor to save gRPC server traces. func NewGRPCStreamServerInterceptor() grpc.StreamServerInterceptor { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { ctx := extractGRPCTraceInfo(ss.Context()) var span trace.Span ctx, span = StartSpanFromContext(ctx, info.FullMethod, trace.WithAttributes( semconv.RPCSystemGRPC, semconv.RPCMethod(info.FullMethod), ), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() err := handler(srv, newgRPCServerStream(ctx, ss)) setGRPCSpanStatus(span, err) return err } } func startClientSpan(ctx context.Context, cc *grpc.ClientConn, method string) (context.Context, trace.Span) { ctx, span := StartSpanFromContext(ctx, method, trace.WithAttributes( semconv.RPCSystemGRPC, semconv.RPCMethod(method), attribute.String("rpc.grpc.target", cc.Target())), trace.WithSpanKind(trace.SpanKindClient), ) ctx = setGRPCTraceInfo(ctx) return ctx, span } func setGRPCSpanStatus(span trace.Span, err error) { if err != nil { grpcStatus, _ := status.FromError(err) span.SetStatus(codes.Error, grpcStatus.Message()) span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatus.Code()))) } else { span.SetStatus(codes.Ok, "") span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpc_codes.OK))) } }