Compare commits

...
Sign in to create a new pull request.

5 commits

Author SHA1 Message Date
93b681a202
[#21] tracing: Fix typo
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-21 09:32:46 +03:00
d34e1329c8
[#20] metrics: Add grpc msg send metrics
Streaming RPC has two main metrics: send message and receive message.
But the first one was missed.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-02-12 14:19:29 +03:00
ec6f880337 [#19] Add CODEOWNERS
Signed-off-by: Vitaliy Potyarkin <v.potyarkin@yadro.com>
2024-12-10 18:50:57 +03:00
37bd758211 [#15] tracing: Add events for grpc stream calls
Signed-off-by: Roman Loginov <r.loginov@yadro.com>
2024-11-25 16:38:52 +03:00
f17779933e
[#17] config: Add resource attributes
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-12 11:23:07 +03:00
8 changed files with 68 additions and 10 deletions

3
CODEOWNERS Normal file
View file

@ -0,0 +1,3 @@
.* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers
.forgejo/.* @potyarkin
Makefile @potyarkin

View file

@ -15,6 +15,9 @@ var clientMetrics = grpcprom.NewClientMetrics(
grpcprom.WithClientStreamRecvHistogram( grpcprom.WithClientStreamRecvHistogram(
grpcprom.WithHistogramBuckets(prometheus.DefBuckets), grpcprom.WithHistogramBuckets(prometheus.DefBuckets),
), ),
grpcprom.WithClientStreamSendHistogram(
grpcprom.WithHistogramBuckets(prometheus.DefBuckets),
),
) )
func init() { func init() {
@ -57,6 +60,12 @@ func init() {
Help: "Histogram of response latency (seconds) of the gRPC single message receive.", Help: "Histogram of response latency (seconds) of the gRPC single message receive.",
VariableLabels: []string{"grpc_type", "grpc_service", "grpc_method"}, VariableLabels: []string{"grpc_type", "grpc_service", "grpc_method"},
}, },
{
Name: "grpc_client_msg_send_handling_seconds",
Type: dto.MetricType_HISTOGRAM.String(),
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
VariableLabels: []string{"grpc_type", "grpc_service", "grpc_method"},
},
} }
metrics.MustRegister(clientMetrics, descs...) metrics.MustRegister(clientMetrics, descs...)
} }

View file

@ -3,6 +3,7 @@ package tracing
import ( import (
"crypto/x509" "crypto/x509"
"fmt" "fmt"
"maps"
) )
// Exporter is type of tracing target. // Exporter is type of tracing target.
@ -33,6 +34,9 @@ type Config struct {
// Version is version of service instance. // Version is version of service instance.
// Optional. // Optional.
Version string Version string
// Attributes is KV list of attributes.
// Optional.
Attributes map[string]string
} }
func (c *Config) validate() error { func (c *Config) validate() error {
@ -81,5 +85,6 @@ func (c *Config) hasChange(other *Config) bool {
func (c *Config) serviceInfoEqual(other *Config) bool { func (c *Config) serviceInfoEqual(other *Config) bool {
return c.Service == other.Service && return c.Service == other.Service &&
c.InstanceID == other.InstanceID && c.InstanceID == other.InstanceID &&
c.Version == other.Version c.Version == other.Version &&
maps.Equal(c.Attributes, other.Attributes)
} }

View file

@ -82,7 +82,7 @@ func main() {
cc, err := grpc.NewClient(":7000", cc, err := grpc.NewClient(":7000",
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor( grpc.WithChainUnaryInterceptor(
tracing_grpc.NewUnaryClientInteceptor(), tracing_grpc.NewUnaryClientInterceptor(),
verifyClientTraceID, verifyClientTraceID,
), ),
grpc.WithChainStreamInterceptor( grpc.WithChainStreamInterceptor(

View file

@ -2,7 +2,10 @@ package grpc
import ( import (
"context" "context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
@ -10,14 +13,16 @@ import (
type clientStream struct { type clientStream struct {
originalStream grpc.ClientStream originalStream grpc.ClientStream
desc *grpc.StreamDesc desc *grpc.StreamDesc
span trace.Span
finished chan<- error finished chan<- error
done <-chan struct{} done <-chan struct{}
} }
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, finished chan<- error, done <-chan struct{}) grpc.ClientStream { func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
return &clientStream{ return &clientStream{
originalStream: originalStream, originalStream: originalStream,
desc: desc, desc: desc,
span: span,
finished: finished, finished: finished,
done: done, done: done,
} }
@ -39,6 +44,7 @@ func (cs *clientStream) Trailer() metadata.MD {
} }
func (cs *clientStream) CloseSend() error { func (cs *clientStream) CloseSend() error {
cs.span.AddEvent("client.stream.close.send.start")
err := cs.originalStream.CloseSend() err := cs.originalStream.CloseSend()
if err != nil { if err != nil {
select { select {
@ -46,6 +52,7 @@ func (cs *clientStream) CloseSend() error {
case cs.finished <- err: case cs.finished <- err:
} }
} }
cs.span.AddEvent("client.stream.close.send.finish")
return err return err
} }
@ -54,6 +61,9 @@ func (cs *clientStream) Context() context.Context {
} }
func (cs *clientStream) SendMsg(m any) error { func (cs *clientStream) SendMsg(m any) error {
cs.span.AddEvent("client.stream.send.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := cs.originalStream.SendMsg(m) err := cs.originalStream.SendMsg(m)
if err != nil { if err != nil {
select { select {
@ -61,10 +71,16 @@ func (cs *clientStream) SendMsg(m any) error {
case cs.finished <- err: case cs.finished <- err:
} }
} }
cs.span.AddEvent("client.stream.send.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err return err
} }
func (cs *clientStream) RecvMsg(m any) error { func (cs *clientStream) RecvMsg(m any) error {
cs.span.AddEvent("client.stream.receive.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := cs.originalStream.RecvMsg(m) err := cs.originalStream.RecvMsg(m)
if err != nil || !cs.desc.ServerStreams { if err != nil || !cs.desc.ServerStreams {
select { select {
@ -72,5 +88,8 @@ func (cs *clientStream) RecvMsg(m any) error {
case cs.finished <- err: case cs.finished <- err:
} }
} }
cs.span.AddEvent("client.stream.receive.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err return err
} }

View file

@ -15,8 +15,8 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
// NewUnaryClientInteceptor creates new gRPC unary interceptor to save gRPC client traces. // NewUnaryClientInterceptor creates new gRPC unary interceptor to save gRPC client traces.
func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor { func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, span := startClientSpan(ctx, cc, method) ctx, span := startClientSpan(ctx, cc, method)
defer span.End() defer span.End()
@ -50,7 +50,7 @@ func NewStreamClientInterceptor() grpc.StreamClientInterceptor {
finished := make(chan error) finished := make(chan error)
done := make(chan struct{}) done := make(chan struct{})
strWrp := newgRPCClientStream(str, desc, finished, done) strWrp := newgRPCClientStream(str, desc, span, finished, done)
go func() { go func() {
defer close(done) defer close(done)
@ -107,7 +107,7 @@ func NewStreamServerInterceptor() grpc.StreamServerInterceptor {
trace.WithSpanKind(trace.SpanKindServer)) trace.WithSpanKind(trace.SpanKindServer))
defer span.End() defer span.End()
err := handler(srv, newgRPCServerStream(ctx, ss)) err := handler(srv, newgRPCServerStream(ctx, ss, span))
setGRPCSpanStatus(span, err) setGRPCSpanStatus(span, err)
return err return err

View file

@ -2,7 +2,10 @@ package grpc
import ( import (
"context" "context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
@ -10,12 +13,14 @@ import (
type serverStream struct { type serverStream struct {
originalStream grpc.ServerStream originalStream grpc.ServerStream
ctx context.Context // nolint:containedctx ctx context.Context // nolint:containedctx
span trace.Span
} }
func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream) grpc.ServerStream { func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream, span trace.Span) grpc.ServerStream {
return &serverStream{ return &serverStream{
originalStream: originalStream, originalStream: originalStream,
ctx: ctx, ctx: ctx,
span: span,
} }
} }
@ -36,9 +41,23 @@ func (ss *serverStream) Context() context.Context {
} }
func (ss *serverStream) SendMsg(m any) error { func (ss *serverStream) SendMsg(m any) error {
return ss.originalStream.SendMsg(m) ss.span.AddEvent("server.stream.send.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := ss.originalStream.SendMsg(m)
ss.span.AddEvent("server.stream.send.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err
} }
func (ss *serverStream) RecvMsg(m any) error { func (ss *serverStream) RecvMsg(m any) error {
return ss.originalStream.RecvMsg(m) ss.span.AddEvent("server.stream.receive.msg.start", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
err := ss.originalStream.RecvMsg(m)
ss.span.AddEvent("server.stream.receive.msg.finish", trace.WithAttributes(
attribute.String("message.type", fmt.Sprintf("%T", m))),
)
return err
} }

View file

@ -162,6 +162,9 @@ func newResource(cfg *Config) *resource.Resource {
if len(cfg.InstanceID) > 0 { if len(cfg.InstanceID) > 0 {
attrs = append(attrs, semconv.ServiceInstanceID(cfg.InstanceID)) attrs = append(attrs, semconv.ServiceInstanceID(cfg.InstanceID))
} }
for k, v := range cfg.Attributes {
attrs = append(attrs, attribute.String(k, v))
}
return resource.NewWithAttributes( return resource.NewWithAttributes(
semconv.SchemaURL, semconv.SchemaURL,
attrs..., attrs...,