Compare commits
5 commits
fix/licens
...
master
Author | SHA1 | Date | |
---|---|---|---|
93b681a202 | |||
d34e1329c8 | |||
ec6f880337 | |||
37bd758211 | |||
f17779933e |
8 changed files with 68 additions and 10 deletions
3
CODEOWNERS
Normal file
3
CODEOWNERS
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
.* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers
|
||||||
|
.forgejo/.* @potyarkin
|
||||||
|
Makefile @potyarkin
|
|
@ -15,6 +15,9 @@ var clientMetrics = grpcprom.NewClientMetrics(
|
||||||
grpcprom.WithClientStreamRecvHistogram(
|
grpcprom.WithClientStreamRecvHistogram(
|
||||||
grpcprom.WithHistogramBuckets(prometheus.DefBuckets),
|
grpcprom.WithHistogramBuckets(prometheus.DefBuckets),
|
||||||
),
|
),
|
||||||
|
grpcprom.WithClientStreamSendHistogram(
|
||||||
|
grpcprom.WithHistogramBuckets(prometheus.DefBuckets),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -57,6 +60,12 @@ func init() {
|
||||||
Help: "Histogram of response latency (seconds) of the gRPC single message receive.",
|
Help: "Histogram of response latency (seconds) of the gRPC single message receive.",
|
||||||
VariableLabels: []string{"grpc_type", "grpc_service", "grpc_method"},
|
VariableLabels: []string{"grpc_type", "grpc_service", "grpc_method"},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "grpc_client_msg_send_handling_seconds",
|
||||||
|
Type: dto.MetricType_HISTOGRAM.String(),
|
||||||
|
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
|
||||||
|
VariableLabels: []string{"grpc_type", "grpc_service", "grpc_method"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
metrics.MustRegister(clientMetrics, descs...)
|
metrics.MustRegister(clientMetrics, descs...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package tracing
|
||||||
import (
|
import (
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"maps"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Exporter is type of tracing target.
|
// Exporter is type of tracing target.
|
||||||
|
@ -33,6 +34,9 @@ type Config struct {
|
||||||
// Version is version of service instance.
|
// Version is version of service instance.
|
||||||
// Optional.
|
// Optional.
|
||||||
Version string
|
Version string
|
||||||
|
// Attributes is KV list of attributes.
|
||||||
|
// Optional.
|
||||||
|
Attributes map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) validate() error {
|
func (c *Config) validate() error {
|
||||||
|
@ -81,5 +85,6 @@ func (c *Config) hasChange(other *Config) bool {
|
||||||
func (c *Config) serviceInfoEqual(other *Config) bool {
|
func (c *Config) serviceInfoEqual(other *Config) bool {
|
||||||
return c.Service == other.Service &&
|
return c.Service == other.Service &&
|
||||||
c.InstanceID == other.InstanceID &&
|
c.InstanceID == other.InstanceID &&
|
||||||
c.Version == other.Version
|
c.Version == other.Version &&
|
||||||
|
maps.Equal(c.Attributes, other.Attributes)
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,7 @@ func main() {
|
||||||
cc, err := grpc.NewClient(":7000",
|
cc, err := grpc.NewClient(":7000",
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithChainUnaryInterceptor(
|
grpc.WithChainUnaryInterceptor(
|
||||||
tracing_grpc.NewUnaryClientInteceptor(),
|
tracing_grpc.NewUnaryClientInterceptor(),
|
||||||
verifyClientTraceID,
|
verifyClientTraceID,
|
||||||
),
|
),
|
||||||
grpc.WithChainStreamInterceptor(
|
grpc.WithChainStreamInterceptor(
|
||||||
|
|
|
@ -2,7 +2,10 @@ package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
)
|
)
|
||||||
|
@ -10,14 +13,16 @@ import (
|
||||||
type clientStream struct {
|
type clientStream struct {
|
||||||
originalStream grpc.ClientStream
|
originalStream grpc.ClientStream
|
||||||
desc *grpc.StreamDesc
|
desc *grpc.StreamDesc
|
||||||
|
span trace.Span
|
||||||
finished chan<- error
|
finished chan<- error
|
||||||
done <-chan struct{}
|
done <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
|
func newgRPCClientStream(originalStream grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, finished chan<- error, done <-chan struct{}) grpc.ClientStream {
|
||||||
return &clientStream{
|
return &clientStream{
|
||||||
originalStream: originalStream,
|
originalStream: originalStream,
|
||||||
desc: desc,
|
desc: desc,
|
||||||
|
span: span,
|
||||||
finished: finished,
|
finished: finished,
|
||||||
done: done,
|
done: done,
|
||||||
}
|
}
|
||||||
|
@ -39,6 +44,7 @@ func (cs *clientStream) Trailer() metadata.MD {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) CloseSend() error {
|
func (cs *clientStream) CloseSend() error {
|
||||||
|
cs.span.AddEvent("client.stream.close.send.start")
|
||||||
err := cs.originalStream.CloseSend()
|
err := cs.originalStream.CloseSend()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
select {
|
select {
|
||||||
|
@ -46,6 +52,7 @@ func (cs *clientStream) CloseSend() error {
|
||||||
case cs.finished <- err:
|
case cs.finished <- err:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cs.span.AddEvent("client.stream.close.send.finish")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,6 +61,9 @@ func (cs *clientStream) Context() context.Context {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) SendMsg(m any) error {
|
func (cs *clientStream) SendMsg(m any) error {
|
||||||
|
cs.span.AddEvent("client.stream.send.msg.start", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
err := cs.originalStream.SendMsg(m)
|
err := cs.originalStream.SendMsg(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
select {
|
select {
|
||||||
|
@ -61,10 +71,16 @@ func (cs *clientStream) SendMsg(m any) error {
|
||||||
case cs.finished <- err:
|
case cs.finished <- err:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cs.span.AddEvent("client.stream.send.msg.finish", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) RecvMsg(m any) error {
|
func (cs *clientStream) RecvMsg(m any) error {
|
||||||
|
cs.span.AddEvent("client.stream.receive.msg.start", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
err := cs.originalStream.RecvMsg(m)
|
err := cs.originalStream.RecvMsg(m)
|
||||||
if err != nil || !cs.desc.ServerStreams {
|
if err != nil || !cs.desc.ServerStreams {
|
||||||
select {
|
select {
|
||||||
|
@ -72,5 +88,8 @@ func (cs *clientStream) RecvMsg(m any) error {
|
||||||
case cs.finished <- err:
|
case cs.finished <- err:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cs.span.AddEvent("client.stream.receive.msg.finish", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,8 +15,8 @@ import (
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewUnaryClientInteceptor creates new gRPC unary interceptor to save gRPC client traces.
|
// NewUnaryClientInterceptor creates new gRPC unary interceptor to save gRPC client traces.
|
||||||
func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor {
|
func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor {
|
||||||
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||||
ctx, span := startClientSpan(ctx, cc, method)
|
ctx, span := startClientSpan(ctx, cc, method)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
@ -50,7 +50,7 @@ func NewStreamClientInterceptor() grpc.StreamClientInterceptor {
|
||||||
|
|
||||||
finished := make(chan error)
|
finished := make(chan error)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
strWrp := newgRPCClientStream(str, desc, finished, done)
|
strWrp := newgRPCClientStream(str, desc, span, finished, done)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(done)
|
defer close(done)
|
||||||
|
@ -107,7 +107,7 @@ func NewStreamServerInterceptor() grpc.StreamServerInterceptor {
|
||||||
trace.WithSpanKind(trace.SpanKindServer))
|
trace.WithSpanKind(trace.SpanKindServer))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
err := handler(srv, newgRPCServerStream(ctx, ss))
|
err := handler(srv, newgRPCServerStream(ctx, ss, span))
|
||||||
|
|
||||||
setGRPCSpanStatus(span, err)
|
setGRPCSpanStatus(span, err)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -2,7 +2,10 @@ package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
)
|
)
|
||||||
|
@ -10,12 +13,14 @@ import (
|
||||||
type serverStream struct {
|
type serverStream struct {
|
||||||
originalStream grpc.ServerStream
|
originalStream grpc.ServerStream
|
||||||
ctx context.Context // nolint:containedctx
|
ctx context.Context // nolint:containedctx
|
||||||
|
span trace.Span
|
||||||
}
|
}
|
||||||
|
|
||||||
func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream) grpc.ServerStream {
|
func newgRPCServerStream(ctx context.Context, originalStream grpc.ServerStream, span trace.Span) grpc.ServerStream {
|
||||||
return &serverStream{
|
return &serverStream{
|
||||||
originalStream: originalStream,
|
originalStream: originalStream,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
span: span,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,9 +41,23 @@ func (ss *serverStream) Context() context.Context {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *serverStream) SendMsg(m any) error {
|
func (ss *serverStream) SendMsg(m any) error {
|
||||||
return ss.originalStream.SendMsg(m)
|
ss.span.AddEvent("server.stream.send.msg.start", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
|
err := ss.originalStream.SendMsg(m)
|
||||||
|
ss.span.AddEvent("server.stream.send.msg.finish", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *serverStream) RecvMsg(m any) error {
|
func (ss *serverStream) RecvMsg(m any) error {
|
||||||
return ss.originalStream.RecvMsg(m)
|
ss.span.AddEvent("server.stream.receive.msg.start", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
|
err := ss.originalStream.RecvMsg(m)
|
||||||
|
ss.span.AddEvent("server.stream.receive.msg.finish", trace.WithAttributes(
|
||||||
|
attribute.String("message.type", fmt.Sprintf("%T", m))),
|
||||||
|
)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,6 +162,9 @@ func newResource(cfg *Config) *resource.Resource {
|
||||||
if len(cfg.InstanceID) > 0 {
|
if len(cfg.InstanceID) > 0 {
|
||||||
attrs = append(attrs, semconv.ServiceInstanceID(cfg.InstanceID))
|
attrs = append(attrs, semconv.ServiceInstanceID(cfg.InstanceID))
|
||||||
}
|
}
|
||||||
|
for k, v := range cfg.Attributes {
|
||||||
|
attrs = append(attrs, attribute.String(k, v))
|
||||||
|
}
|
||||||
return resource.NewWithAttributes(
|
return resource.NewWithAttributes(
|
||||||
semconv.SchemaURL,
|
semconv.SchemaURL,
|
||||||
attrs...,
|
attrs...,
|
||||||
|
|
Loading…
Add table
Reference in a new issue