generated from TrueCloudLab/basic
[#3] tracing: Move from api-go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
6b3d863d81
commit
7f9eba1b19
16 changed files with 2073 additions and 0 deletions
160
tracing/grpc/interceptors.go
Normal file
160
tracing/grpc/interceptors.go
Normal file
|
@ -0,0 +1,160 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
grpc_codes "google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// NewUnaryClientInteceptor creates new gRPC unary interceptor to save gRPC client traces.
|
||||
func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor {
|
||||
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
ctx, span := startClientSpan(ctx, cc, method)
|
||||
defer span.End()
|
||||
|
||||
err := invoker(ctx, method, req, reply, cc, opts...)
|
||||
if err != nil {
|
||||
grpcStatus, _ := status.FromError(err)
|
||||
span.SetStatus(codes.Error, grpcStatus.Message())
|
||||
span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatus.Code())))
|
||||
} else {
|
||||
span.SetStatus(codes.Ok, "")
|
||||
span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpc_codes.OK)))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// NewStreamClientInterceptor creates new gRPC stream interceptor to save gRPC client traces.
|
||||
func NewStreamClientInterceptor() grpc.StreamClientInterceptor {
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
ctx, span := startClientSpan(ctx, cc, method)
|
||||
str, err := streamer(ctx, desc, cc, method, opts...)
|
||||
if err != nil {
|
||||
grpcStatus, _ := status.FromError(err)
|
||||
span.SetStatus(codes.Error, grpcStatus.Message())
|
||||
span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatus.Code())))
|
||||
span.End()
|
||||
return str, err
|
||||
}
|
||||
|
||||
finished := make(chan error)
|
||||
done := make(chan struct{})
|
||||
strWrp := newgRPCClientStream(str, desc, finished, done)
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer span.End()
|
||||
|
||||
select {
|
||||
case err := <-finished:
|
||||
if err == nil || err == io.EOF {
|
||||
setGRPCSpanStatus(span, nil)
|
||||
} else {
|
||||
setGRPCSpanStatus(span, err)
|
||||
}
|
||||
return
|
||||
case <-ctx.Done():
|
||||
setGRPCSpanStatus(span, ctx.Err())
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return strWrp, nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewUnaryServerInterceptor creates new gRPC unary interceptor to save gRPC server traces.
|
||||
func NewUnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
ctx = extractGRPCTraceInfo(ctx)
|
||||
var span trace.Span
|
||||
ctx, span = tracing.StartSpanFromContext(ctx, info.FullMethod,
|
||||
trace.WithAttributes(
|
||||
semconv.RPCSystemGRPC,
|
||||
semconv.RPCMethod(info.FullMethod),
|
||||
),
|
||||
trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
resp, err = handler(ctx, req)
|
||||
|
||||
setGRPCSpanStatus(span, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// NewStreamServerInterceptor creates new gRPC stream interceptor to save gRPC server traces.
|
||||
func NewStreamServerInterceptor() grpc.StreamServerInterceptor {
|
||||
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
ctx := extractGRPCTraceInfo(ss.Context())
|
||||
var span trace.Span
|
||||
ctx, span = tracing.StartSpanFromContext(ctx, info.FullMethod,
|
||||
trace.WithAttributes(
|
||||
semconv.RPCSystemGRPC,
|
||||
semconv.RPCMethod(info.FullMethod),
|
||||
),
|
||||
trace.WithSpanKind(trace.SpanKindServer))
|
||||
defer span.End()
|
||||
|
||||
err := handler(srv, newgRPCServerStream(ctx, ss))
|
||||
|
||||
setGRPCSpanStatus(span, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func startClientSpan(ctx context.Context, cc *grpc.ClientConn, method string) (context.Context, trace.Span) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, method, trace.WithAttributes(
|
||||
semconv.RPCSystemGRPC,
|
||||
semconv.RPCMethod(method),
|
||||
attribute.String("rpc.grpc.target", cc.Target())),
|
||||
trace.WithSpanKind(trace.SpanKindClient),
|
||||
)
|
||||
ctx = setGRPCTraceInfo(ctx)
|
||||
return ctx, span
|
||||
}
|
||||
|
||||
func setGRPCSpanStatus(span trace.Span, err error) {
|
||||
if err != nil {
|
||||
grpcStatus, _ := status.FromError(err)
|
||||
span.SetStatus(codes.Error, grpcStatus.Message())
|
||||
span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatus.Code())))
|
||||
} else {
|
||||
span.SetStatus(codes.Ok, "")
|
||||
span.SetAttributes(semconv.RPCGRPCStatusCodeKey.Int64(int64(grpc_codes.OK)))
|
||||
}
|
||||
}
|
||||
|
||||
func extractGRPCTraceInfo(ctx context.Context) context.Context {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return ctx
|
||||
}
|
||||
carrier := &grpcMetadataCarrier{
|
||||
md: &md,
|
||||
}
|
||||
return tracing.Propagator.Extract(ctx, carrier)
|
||||
}
|
||||
|
||||
func setGRPCTraceInfo(ctx context.Context) context.Context {
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
md = metadata.MD{}
|
||||
}
|
||||
carrier := &grpcMetadataCarrier{
|
||||
md: &md,
|
||||
}
|
||||
tracing.Propagator.Inject(ctx, carrier)
|
||||
return metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue