frostfs-api-go/pkg/tracing/grpc.go
Dmitrii Stepanov 30c5a0f55c [#24] tracing: Fix panic on closed channel
Closed channel is also ready for communication, so select
statement can write to it.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-18 10:47:55 +03:00

135 lines
4.2 KiB
Go

package tracing
import (
"context"
"io"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// NewGRPCUnaryClientInteceptor creates new gRPC unary interceptor to save gRPC client traces.
func NewGRPCUnaryClientInteceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, span := startClientSpan(ctx, cc, method)
defer span.End()
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
grpcStatus, _ := status.FromError(err)
span.SetStatus(codes.Error, grpcStatus.Message())
span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatus.Code())))
} else {
span.SetStatus(codes.Ok, "")
span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpc_codes.OK)))
}
return err
}
}
// NewGRPCStreamClientInterceptor creates new gRPC stream interceptor to save gRPC client traces.
func NewGRPCStreamClientInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx, span := startClientSpan(ctx, cc, method)
str, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
grpcStatus, _ := status.FromError(err)
span.SetStatus(codes.Error, grpcStatus.Message())
span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatus.Code())))
span.End()
return str, err
}
finished := make(chan error)
done := make(chan struct{})
strWrp := newgRPCClientStream(str, desc, finished, done)
go func() {
defer close(done)
defer span.End()
select {
case err := <-finished:
if err == nil || err == io.EOF {
setGRPCSpanStatus(span, nil)
} else {
setGRPCSpanStatus(span, err)
}
return
case <-ctx.Done():
setGRPCSpanStatus(span, ctx.Err())
return
}
}()
return strWrp, nil
}
}
// NewGRPCUnaryServerInterceptor creates new gRPC unary interceptor to save gRPC server traces.
func NewGRPCUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
ctx = extractGRPCTraceInfo(ctx)
var span trace.Span
ctx, span = StartSpanFromContext(ctx, info.FullMethod,
trace.WithAttributes(
semconv.RPCSystemGRPC,
semconv.RPCMethod(info.FullMethod),
),
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
resp, err = handler(ctx, req)
setGRPCSpanStatus(span, err)
return
}
}
// NewGRPCStreamServerInterceptor creates new gRPC stream interceptor to save gRPC server traces.
func NewGRPCStreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := extractGRPCTraceInfo(ss.Context())
var span trace.Span
ctx, span = StartSpanFromContext(ctx, info.FullMethod,
trace.WithAttributes(
semconv.RPCSystemGRPC,
semconv.RPCMethod(info.FullMethod),
),
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
err := handler(srv, newgRPCServerStream(ctx, ss))
setGRPCSpanStatus(span, err)
return err
}
}
func startClientSpan(ctx context.Context, cc *grpc.ClientConn, method string) (context.Context, trace.Span) {
ctx, span := StartSpanFromContext(ctx, method, trace.WithAttributes(
semconv.RPCSystemGRPC,
semconv.RPCMethod(method),
attribute.String("rpc.grpc.target", cc.Target())),
trace.WithSpanKind(trace.SpanKindClient),
)
ctx = setGRPCTraceInfo(ctx)
return ctx, span
}
func setGRPCSpanStatus(span trace.Span, err error) {
if err != nil {
grpcStatus, _ := status.FromError(err)
span.SetStatus(codes.Error, grpcStatus.Message())
span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatus.Code())))
} else {
span.SetStatus(codes.Ok, "")
span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpc_codes.OK)))
}
}