diff --git a/pkg/tracing/config.go b/pkg/tracing/config.go new file mode 100644 index 0000000..001953a --- /dev/null +++ b/pkg/tracing/config.go @@ -0,0 +1,74 @@ +package tracing + +import "fmt" + +// Exporter is type of tracing target. +type Exporter string + +const ( + Stdout Exporter = "stdout" + OTLPgRPC Exporter = "otlp_grpc" +) + +type Config struct { + // Enabled is true, if tracing enabled. + Enabled bool + // Exporter is collector type. + Exporter Exporter + // Endpoint is collector endpoint for OTLP exporters. + Endpoint string + + // Service is service name that will be used in tracing. + // Mandatory. + Service string + // InstanceID is identity of service instance. + // Optional. + InstanceID string + // Version is version of service instance. + // Optional. + Version string +} + +func (c *Config) validate() error { + if !c.Enabled { + return nil + } + + if c.Exporter != Stdout && c.Exporter != OTLPgRPC { + return fmt.Errorf("tracing config error: unknown exporter '%s', valid values are %v", + c.Exporter, []string{string(Stdout), string(OTLPgRPC)}) + } + + if len(c.Service) == 0 { + return fmt.Errorf("tracing config error: service name must be specified") + } + + if c.Exporter == OTLPgRPC && len(c.Endpoint) == 0 { + return fmt.Errorf("tracing config error: exporter '%s' requires endpoint", c.Exporter) + } + + return nil +} + +func (c *Config) hasChange(other *Config) bool { + if !c.Enabled && !other.Enabled { + return false + } + if c.Enabled != other.Enabled { + return true + } + + if c.Exporter == Stdout && other.Exporter == Stdout { + return !c.serviceInfoEqual(other) + } + + return c.Exporter != other.Exporter || + c.Endpoint != other.Endpoint || + !c.serviceInfoEqual(other) +} + +func (c *Config) serviceInfoEqual(other *Config) bool { + return c.Service == other.Service && + c.InstanceID == other.InstanceID && + c.Version == other.Version +} diff --git a/pkg/tracing/config_test.go b/pkg/tracing/config_test.go new file mode 100644 index 0000000..031dd3b --- /dev/null +++ b/pkg/tracing/config_test.go @@ -0,0 +1,212 @@ +package tracing + +import ( + "testing" +) + +func TestConfig_validate(t *testing.T) { + tests := []struct { + name string + config Config + wantErr bool + }{ + { + name: "disabled", + wantErr: false, + config: Config{ + Enabled: false, + }, + }, + { + name: "stdout", + wantErr: false, + config: Config{ + Enabled: true, + Exporter: Stdout, + Service: "test", + InstanceID: "s01", + Version: "v0.0.1", + }, + }, + { + name: "OTLP gRPC", + wantErr: false, + config: Config{ + Enabled: true, + Exporter: OTLPgRPC, + Service: "test", + Endpoint: "localhost:4717", + InstanceID: "s01", + Version: "v0.0.1", + }, + }, + { + name: "unknown exporter", + wantErr: true, + config: Config{ + Enabled: true, + Exporter: "unknown", + Service: "test", + Endpoint: "localhost:4717", + InstanceID: "s01", + Version: "v0.0.1", + }, + }, + { + name: "no exporter", + wantErr: true, + config: Config{ + Enabled: true, + Service: "test", + Endpoint: "localhost:4717", + InstanceID: "s01", + Version: "v0.0.1", + }, + }, + { + name: "no service", + wantErr: true, + config: Config{ + Enabled: true, + Exporter: OTLPgRPC, + Endpoint: "localhost:4717", + InstanceID: "s01", + Version: "v0.0.1", + }, + }, + { + name: "no endpoint for grpc", + wantErr: true, + config: Config{ + Enabled: true, + Exporter: OTLPgRPC, + Service: "test", + InstanceID: "s01", + Version: "v0.0.1", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := tt.config.validate(); (err != nil) != tt.wantErr { + t.Errorf("Config.validate() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestConfig_hasChange(t *testing.T) { + tests := []struct { + name string + config Config + other Config + want bool + }{ + { + name: "disabled configs always equal", + want: false, + config: Config{ + Enabled: false, + Exporter: Stdout, + Service: "test", + InstanceID: "s01", + Version: "v1.0.0", + }, + other: Config{ + Enabled: false, + Exporter: OTLPgRPC, + Endpoint: "localhost:4717", + Service: "test", + InstanceID: "s01", + Version: "v1.0.0", + }, + }, + { + name: "enabled", + want: true, + config: Config{ + Enabled: false, + Exporter: OTLPgRPC, + Endpoint: "localhost:4717", + Service: "test", + InstanceID: "s01", + Version: "v1.0.0", + }, + other: Config{ + Enabled: true, + Exporter: OTLPgRPC, + Endpoint: "localhost:4717", + Service: "test", + InstanceID: "s01", + Version: "v1.0.0", + }, + }, + { + name: "disabled", + want: true, + config: Config{ + Enabled: true, + Exporter: OTLPgRPC, + Endpoint: "localhost:4717", + Service: "test", + InstanceID: "s01", + Version: "v1.0.0", + }, + other: Config{ + Enabled: false, + Exporter: OTLPgRPC, + Endpoint: "localhost:4717", + Service: "test", + InstanceID: "s01", + Version: "v1.0.0", + }, + }, + { + name: "do not use endpoint for stdout", + want: false, + config: Config{ + Enabled: true, + Exporter: Stdout, + Endpoint: "localhost:4717", + Service: "test", + InstanceID: "s01", + Version: "v1.0.0", + }, + other: Config{ + Enabled: true, + Exporter: Stdout, + Endpoint: "otherhost:4717", + Service: "test", + InstanceID: "s01", + Version: "v1.0.0", + }, + }, + { + name: "use endpoint for grpc", + want: true, + config: Config{ + Enabled: true, + Exporter: OTLPgRPC, + Endpoint: "localhost:4717", + Service: "test", + InstanceID: "s01", + Version: "v1.0.0", + }, + other: Config{ + Enabled: true, + Exporter: OTLPgRPC, + Endpoint: "otherhost:4717", + Service: "test", + InstanceID: "s01", + Version: "v1.0.0", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.config.hasChange(&tt.other); got != tt.want { + t.Errorf("Config.equal() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/tracing/propagator.go b/pkg/tracing/propagator.go new file mode 100644 index 0000000..3dabfdc --- /dev/null +++ b/pkg/tracing/propagator.go @@ -0,0 +1,96 @@ +package tracing + +import ( + "context" + "fmt" + "strconv" + + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +const ( + traceIDHeader = "x-frostfs-trace-id" + spanIDHeader = "x-frostfs-span-id" + flagsHeader = "x-frostfs-trace-flags" +) + +const ( + flagsSampled = 1 << iota +) + +// propagator serializes SpanContext to/from headers. +// x-frostfs-trace-id - TraceID, 16 bytes, hex-string (32 bytes). +// x-frostfs-span-id - SpanID, 8 bytes, hexstring (16 bytes). +// x-frostfs-trace-flags - trace flags (now sampled only). +type propagator struct{} + +// Propagator is propagation.TextMapPropagator instance, used to extract/inject trace info from/to remote context. +var Propagator propagation.TextMapPropagator = &propagator{} + +// Inject injects tracing info to carrier. +func (p *propagator) Inject(ctx context.Context, carrier propagation.TextMapCarrier) { + sc := trace.SpanFromContext(ctx).SpanContext() + if !sc.TraceID().IsValid() || !sc.SpanID().IsValid() { + return + } + + var flags int + if sc.IsSampled() { + flags = flags | flagsSampled + } + + carrier.Set(traceIDHeader, sc.TraceID().String()) + carrier.Set(spanIDHeader, sc.SpanID().String()) + carrier.Set(flagsHeader, fmt.Sprintf("%x", flags)) +} + +// Extract extracts tracing info from carrier and returns context with tracing info. +// In case of error returns ctx. +func (p *propagator) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context { + spanConfig := trace.SpanContextConfig{} + var err error + + traceIDStr := carrier.Get(traceIDHeader) + traceIDDefined := false + if traceIDStr != "" { + traceIDDefined = true + spanConfig.TraceID, err = trace.TraceIDFromHex(traceIDStr) + if err != nil { + return ctx + } + } + + spanIDstr := carrier.Get(spanIDHeader) + spanIDDefined := false + if spanIDstr != "" { + spanIDDefined = true + spanConfig.SpanID, err = trace.SpanIDFromHex(spanIDstr) + if err != nil { + return ctx + } + } + + if traceIDDefined != spanIDDefined { + return ctx //traceID + spanID must be defined OR no traceID and no spanID + } + + flagsStr := carrier.Get(flagsHeader) + if flagsStr != "" { + var v int64 + v, err = strconv.ParseInt(flagsStr, 16, 32) + if err != nil { + return ctx + } + if v&flagsSampled == flagsSampled { + spanConfig.TraceFlags = trace.FlagsSampled + } + } + + return trace.ContextWithRemoteSpanContext(ctx, trace.NewSpanContext(spanConfig)) +} + +// Fields returns the keys whose values are set with Inject. +func (p *propagator) Fields() []string { + return []string{traceIDHeader, spanIDHeader, flagsHeader} +} diff --git a/pkg/tracing/propagator_test.go b/pkg/tracing/propagator_test.go new file mode 100644 index 0000000..fc81e48 --- /dev/null +++ b/pkg/tracing/propagator_test.go @@ -0,0 +1,257 @@ +package tracing + +import ( + "context" + "encoding/hex" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" +) + +type testCarrier struct { + Values map[string]string +} + +func (c *testCarrier) Get(key string) string { + return c.Values[key] +} + +func (c *testCarrier) Set(key string, value string) { + c.Values[key] = value +} + +func (c *testCarrier) Keys() []string { + res := make([]string, 0, len(c.Values)) + for k := range c.Values { + res = append(res, k) + } + return res +} + +var p = &propagator{} + +func TestPropagator_Inject(t *testing.T) { + t.Run("injects trace_id and span_id if valid", func(t *testing.T) { + traceIDBytes := make([]byte, 16) + rand.Read(traceIDBytes) + traceIDHex := hex.EncodeToString(traceIDBytes) + + spanIDBytes := make([]byte, 8) + rand.Read(spanIDBytes) + spanIDHex := hex.EncodeToString(spanIDBytes) + + spanConfig := trace.SpanContextConfig{} + spanConfig.TraceID, _ = trace.TraceIDFromHex(traceIDHex) + spanConfig.SpanID, _ = trace.SpanIDFromHex(spanIDHex) + spanConfig.TraceFlags = trace.FlagsSampled + + ctx := trace.ContextWithRemoteSpanContext(context.Background(), trace.NewSpanContext(spanConfig)) + c := &testCarrier{ + Values: make(map[string]string), + } + p.Inject(ctx, c) + + require.Equal(t, 3, len(c.Values), "not all headers were saved") + require.Equal(t, traceIDHex, c.Values[traceIDHeader], "unexpected trace id") + require.Equal(t, spanIDHex, c.Values[spanIDHeader], "unexpected span id") + require.Equal(t, "1", c.Values[flagsHeader], "unexpected flags") + }) + t.Run("doesn't injects if trace_id is invalid", func(t *testing.T) { + traceIDBytes := make([]byte, 16) + traceIDHex := hex.EncodeToString(traceIDBytes) + + spanIDBytes := make([]byte, 8) + rand.Read(spanIDBytes) + spanIDHex := hex.EncodeToString(spanIDBytes) + + spanConfig := trace.SpanContextConfig{} + spanConfig.TraceID, _ = trace.TraceIDFromHex(traceIDHex) + spanConfig.SpanID, _ = trace.SpanIDFromHex(spanIDHex) + spanConfig.TraceFlags = trace.FlagsSampled + + ctx := trace.ContextWithRemoteSpanContext(context.Background(), trace.NewSpanContext(spanConfig)) + c := &testCarrier{ + Values: make(map[string]string), + } + p.Inject(ctx, c) + + require.Equal(t, 0, len(c.Values), "some headers were saved") + }) + t.Run("doesn't injects if span_id is invalid", func(t *testing.T) { + traceIDBytes := make([]byte, 16) + rand.Read(traceIDBytes) + traceIDHex := hex.EncodeToString(traceIDBytes) + + spanIDBytes := make([]byte, 8) + spanIDHex := hex.EncodeToString(spanIDBytes) + + spanConfig := trace.SpanContextConfig{} + spanConfig.TraceID, _ = trace.TraceIDFromHex(traceIDHex) + spanConfig.SpanID, _ = trace.SpanIDFromHex(spanIDHex) + spanConfig.TraceFlags = trace.FlagsSampled + + ctx := trace.ContextWithRemoteSpanContext(context.Background(), trace.NewSpanContext(spanConfig)) + c := &testCarrier{ + Values: make(map[string]string), + } + p.Inject(ctx, c) + + require.Equal(t, 0, len(c.Values), "some headers were saved") + }) + t.Run("injects flags if no flags specified", func(t *testing.T) { + traceIDBytes := make([]byte, 16) + rand.Read(traceIDBytes) + traceIDHex := hex.EncodeToString(traceIDBytes) + + spanIDBytes := make([]byte, 8) + rand.Read(spanIDBytes) + spanIDHex := hex.EncodeToString(spanIDBytes) + + spanConfig := trace.SpanContextConfig{} + spanConfig.TraceID, _ = trace.TraceIDFromHex(traceIDHex) + spanConfig.SpanID, _ = trace.SpanIDFromHex(spanIDHex) + + ctx := trace.ContextWithRemoteSpanContext(context.Background(), trace.NewSpanContext(spanConfig)) + c := &testCarrier{ + Values: make(map[string]string), + } + p.Inject(ctx, c) + + require.Equal(t, 3, len(c.Values), "not all headers were saved") + require.Equal(t, traceIDHex, c.Values[traceIDHeader], "unexpected trace id") + require.Equal(t, spanIDHex, c.Values[spanIDHeader], "unexpected span id") + require.Equal(t, "0", c.Values[flagsHeader], "unexpected flags") + }) + +} + +func TestPropagator_Extract(t *testing.T) { + t.Run("extracts if set", func(t *testing.T) { + c := &testCarrier{ + Values: make(map[string]string), + } + + traceIDBytes := make([]byte, 16) + rand.Read(traceIDBytes) + traceIDHex := hex.EncodeToString(traceIDBytes) + c.Values[traceIDHeader] = traceIDHex + + spanIDBytes := make([]byte, 8) + rand.Read(spanIDBytes) + spanIDHex := hex.EncodeToString(spanIDBytes) + c.Values[spanIDHeader] = spanIDHex + + c.Values[flagsHeader] = "1" + + ctx := p.Extract(context.Background(), c) + + sc := trace.SpanFromContext(ctx).SpanContext() + require.True(t, sc.HasTraceID(), "trace_id was not set") + require.Equal(t, traceIDHex, sc.TraceID().String(), "trace_id doesn't match") + require.True(t, sc.HasSpanID(), "span_id was not set") + require.Equal(t, spanIDHex, sc.SpanID().String(), "span_id doesn't match") + require.True(t, sc.IsSampled(), "sampled was not set") + }) + t.Run("not extracts if only trace_id defined", func(t *testing.T) { + c := &testCarrier{ + Values: make(map[string]string), + } + + traceIDBytes := make([]byte, 16) + rand.Read(traceIDBytes) + traceIDHex := hex.EncodeToString(traceIDBytes) + c.Values[traceIDHeader] = traceIDHex + c.Values[flagsHeader] = "1" + + ctx := p.Extract(context.Background(), c) + + sc := trace.SpanFromContext(ctx).SpanContext() + require.False(t, sc.HasTraceID(), "trace_id was set") + require.False(t, sc.HasSpanID(), "span_id was set") + require.False(t, sc.IsSampled(), "sampled was set") + }) + t.Run("not extracts if only span_id defined", func(t *testing.T) { + c := &testCarrier{ + Values: make(map[string]string), + } + + spanIDBytes := make([]byte, 8) + rand.Read(spanIDBytes) + spanIDHex := hex.EncodeToString(spanIDBytes) + c.Values[spanIDHeader] = spanIDHex + c.Values[flagsHeader] = "1" + + ctx := p.Extract(context.Background(), c) + + sc := trace.SpanFromContext(ctx).SpanContext() + require.False(t, sc.HasTraceID(), "trace_id was set") + require.False(t, sc.HasSpanID(), "span_id was set") + require.False(t, sc.IsSampled(), "sampled was set") + }) + t.Run("not extracts if trace_id is in invalid", func(t *testing.T) { + c := &testCarrier{ + Values: make(map[string]string), + } + + c.Values[traceIDHeader] = "loren ipsum" + + spanIDBytes := make([]byte, 8) + rand.Read(spanIDBytes) + spanIDHex := hex.EncodeToString(spanIDBytes) + c.Values[spanIDHeader] = spanIDHex + c.Values[flagsHeader] = "1" + + ctx := p.Extract(context.Background(), c) + + sc := trace.SpanFromContext(ctx).SpanContext() + require.False(t, sc.HasTraceID(), "trace_id was set") + require.False(t, sc.HasSpanID(), "span_id was set") + require.False(t, sc.IsSampled(), "sampled was set") + }) + t.Run("not extracts if span_id is invalid", func(t *testing.T) { + c := &testCarrier{ + Values: make(map[string]string), + } + + c.Values[spanIDHeader] = "loren ipsum" + + traceIDBytes := make([]byte, 16) + rand.Read(traceIDBytes) + traceIDHex := hex.EncodeToString(traceIDBytes) + c.Values[traceIDHeader] = traceIDHex + c.Values[flagsHeader] = "1" + + ctx := p.Extract(context.Background(), c) + + sc := trace.SpanFromContext(ctx).SpanContext() + require.False(t, sc.HasTraceID(), "trace_id was set") + require.False(t, sc.HasSpanID(), "span_id was set") + require.False(t, sc.IsSampled(), "sampled was set") + }) + t.Run("not extracts if flags is invalid", func(t *testing.T) { + c := &testCarrier{ + Values: make(map[string]string), + } + + traceIDBytes := make([]byte, 16) + rand.Read(traceIDBytes) + traceIDHex := hex.EncodeToString(traceIDBytes) + c.Values[traceIDHeader] = traceIDHex + + spanIDBytes := make([]byte, 8) + rand.Read(spanIDBytes) + spanIDHex := hex.EncodeToString(spanIDBytes) + c.Values[spanIDHeader] = spanIDHex + + c.Values[flagsHeader] = "loren ipsum" + + ctx := p.Extract(context.Background(), c) + + sc := trace.SpanFromContext(ctx).SpanContext() + require.False(t, sc.HasTraceID(), "trace_id was set") + require.False(t, sc.HasSpanID(), "span_id was set") + require.False(t, sc.IsSampled(), "sampled was set") + }) +} diff --git a/pkg/tracing/setup.go b/pkg/tracing/setup.go new file mode 100644 index 0000000..3df069e --- /dev/null +++ b/pkg/tracing/setup.go @@ -0,0 +1,156 @@ +package tracing + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "go.opentelemetry.io/otel/trace" +) + +var ( + // tracingLock protects provider, done, config and tracer from concurrent update. + // These fields change when the config is updated or the application is shutdown. + tracingLock = &sync.Mutex{} + + provider *sdktrace.TracerProvider + done bool + + config = Config{} + tracer = getDefaultTracer() +) + +// Setup initializes global tracer. +// Returns true if global tracer was updated. +// Shutdown method must be called for graceful shutdown. +func Setup(ctx context.Context, cfg Config) (bool, error) { + if err := cfg.validate(); err != nil { + return false, err + } + + tracingLock.Lock() + defer tracingLock.Unlock() + + if done { + return false, fmt.Errorf("failed to setup tracing: already shutdown") + } + + if !config.hasChange(&cfg) { + return false, nil + } + + if !cfg.Enabled { + config = cfg + tracer.Store(&tracerHolder{Tracer: trace.NewNoopTracerProvider().Tracer("")}) + return true, flushAndShutdown(ctx) + } + + exp, err := getExporter(ctx, &cfg) + if err != nil { + return false, err + } + + prevProvider := provider + + provider = sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + sdktrace.WithResource(newResource(&cfg)), + ) + + config = cfg + tracer.Store(&tracerHolder{Tracer: provider.Tracer(cfg.Service)}) + + var retErr error + if prevProvider != nil { + retErr = prevProvider.ForceFlush(ctx) + if err := prevProvider.Shutdown(ctx); err != nil { + if retErr == nil { + retErr = err + } else { + retErr = fmt.Errorf("%v ; %v", retErr, err) + } + } + } + + return true, retErr +} + +// Shutdown shutdowns tracing. +func Shutdown(ctx context.Context) error { + tracingLock.Lock() + defer tracingLock.Unlock() + + if done { + return nil + } + + done = true + + config = Config{} + tracer.Store(&tracerHolder{Tracer: trace.NewNoopTracerProvider().Tracer("")}) + + return flushAndShutdown(ctx) +} + +func getDefaultTracer() *atomic.Value { + v := &atomic.Value{} + v.Store(&tracerHolder{Tracer: trace.NewNoopTracerProvider().Tracer("")}) + return v +} + +func flushAndShutdown(ctx context.Context) error { + if provider == nil { + return nil + } + + tmp := provider + provider = nil + var retErr error + retErr = tmp.ForceFlush(ctx) + if err := tmp.Shutdown(ctx); err != nil { + if retErr == nil { + retErr = err + } else { + retErr = fmt.Errorf("%v ; %v", retErr, err) + } + } + return retErr +} + +func getExporter(ctx context.Context, cfg *Config) (sdktrace.SpanExporter, error) { + switch cfg.Exporter { + default: + return nil, fmt.Errorf("failed to setup tracing: unknown tracing exporter (%s)", cfg.Exporter) + case Stdout: + return stdouttrace.New() + case OTLPgRPC: + return otlptracegrpc.New(ctx, otlptracegrpc.WithEndpoint(cfg.Endpoint), otlptracegrpc.WithInsecure()) + } +} + +func newResource(cfg *Config) *resource.Resource { + attrs := []attribute.KeyValue{ + semconv.ServiceName(cfg.Service), + } + if len(cfg.Version) > 0 { + attrs = append(attrs, semconv.ServiceVersion(cfg.Version)) + } + if len(cfg.InstanceID) > 0 { + attrs = append(attrs, semconv.ServiceInstanceID(cfg.InstanceID)) + } + return resource.NewWithAttributes( + semconv.SchemaURL, + attrs..., + ) +} + +type tracerHolder struct { + Tracer trace.Tracer +} diff --git a/pkg/tracing/span.go b/pkg/tracing/span.go new file mode 100644 index 0000000..5e28c75 --- /dev/null +++ b/pkg/tracing/span.go @@ -0,0 +1,12 @@ +package tracing + +import ( + "context" + + "go.opentelemetry.io/otel/trace" +) + +// StartSpanFromContext creates a span and a context.Context containing the newly-created span. +func StartSpanFromContext(ctx context.Context, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + return tracer.Load().(*tracerHolder).Tracer.Start(ctx, operationName, opts...) +}