[#12] tracing: Add tracing package

Add tracing config, implementation and setup

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-03-21 11:47:38 +03:00
parent 3a7280968b
commit 816628d37d
6 changed files with 807 additions and 0 deletions

74
pkg/tracing/config.go Normal file
View file

@ -0,0 +1,74 @@
package tracing
import "fmt"
// Exporter is type of tracing target.
type Exporter string
const (
Stdout Exporter = "stdout"
OTLPgRPC Exporter = "otlp_grpc"
)
type Config struct {
// Enabled is true, if tracing enabled.
Enabled bool
// Exporter is collector type.
Exporter Exporter
// Endpoint is collector endpoint for OTLP exporters.
Endpoint string
// Service is service name that will be used in tracing.
// Mandatory.
Service string
// InstanceID is identity of service instance.
// Optional.
InstanceID string
// Version is version of service instance.
// Optional.
Version string
}
func (c *Config) validate() error {
if !c.Enabled {
return nil
}
if c.Exporter != Stdout && c.Exporter != OTLPgRPC {
return fmt.Errorf("tracing config error: unknown exporter '%s', valid values are %v",
c.Exporter, []string{string(Stdout), string(OTLPgRPC)})
}
if len(c.Service) == 0 {
return fmt.Errorf("tracing config error: service name must be specified")
}
if c.Exporter == OTLPgRPC && len(c.Endpoint) == 0 {
return fmt.Errorf("tracing config error: exporter '%s' requires endpoint", c.Exporter)
}
return nil
}
func (c *Config) hasChange(other *Config) bool {
if !c.Enabled && !other.Enabled {
return false
}
if c.Enabled != other.Enabled {
return true
}
if c.Exporter == Stdout && other.Exporter == Stdout {
return !c.serviceInfoEqual(other)
}
return c.Exporter != other.Exporter ||
c.Endpoint != other.Endpoint ||
!c.serviceInfoEqual(other)
}
func (c *Config) serviceInfoEqual(other *Config) bool {
return c.Service == other.Service &&
c.InstanceID == other.InstanceID &&
c.Version == other.Version
}

212
pkg/tracing/config_test.go Normal file
View file

@ -0,0 +1,212 @@
package tracing
import (
"testing"
)
func TestConfig_validate(t *testing.T) {
tests := []struct {
name string
config Config
wantErr bool
}{
{
name: "disabled",
wantErr: false,
config: Config{
Enabled: false,
},
},
{
name: "stdout",
wantErr: false,
config: Config{
Enabled: true,
Exporter: Stdout,
Service: "test",
InstanceID: "s01",
Version: "v0.0.1",
},
},
{
name: "OTLP gRPC",
wantErr: false,
config: Config{
Enabled: true,
Exporter: OTLPgRPC,
Service: "test",
Endpoint: "localhost:4717",
InstanceID: "s01",
Version: "v0.0.1",
},
},
{
name: "unknown exporter",
wantErr: true,
config: Config{
Enabled: true,
Exporter: "unknown",
Service: "test",
Endpoint: "localhost:4717",
InstanceID: "s01",
Version: "v0.0.1",
},
},
{
name: "no exporter",
wantErr: true,
config: Config{
Enabled: true,
Service: "test",
Endpoint: "localhost:4717",
InstanceID: "s01",
Version: "v0.0.1",
},
},
{
name: "no service",
wantErr: true,
config: Config{
Enabled: true,
Exporter: OTLPgRPC,
Endpoint: "localhost:4717",
InstanceID: "s01",
Version: "v0.0.1",
},
},
{
name: "no endpoint for grpc",
wantErr: true,
config: Config{
Enabled: true,
Exporter: OTLPgRPC,
Service: "test",
InstanceID: "s01",
Version: "v0.0.1",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := tt.config.validate(); (err != nil) != tt.wantErr {
t.Errorf("Config.validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestConfig_hasChange(t *testing.T) {
tests := []struct {
name string
config Config
other Config
want bool
}{
{
name: "disabled configs always equal",
want: false,
config: Config{
Enabled: false,
Exporter: Stdout,
Service: "test",
InstanceID: "s01",
Version: "v1.0.0",
},
other: Config{
Enabled: false,
Exporter: OTLPgRPC,
Endpoint: "localhost:4717",
Service: "test",
InstanceID: "s01",
Version: "v1.0.0",
},
},
{
name: "enabled",
want: true,
config: Config{
Enabled: false,
Exporter: OTLPgRPC,
Endpoint: "localhost:4717",
Service: "test",
InstanceID: "s01",
Version: "v1.0.0",
},
other: Config{
Enabled: true,
Exporter: OTLPgRPC,
Endpoint: "localhost:4717",
Service: "test",
InstanceID: "s01",
Version: "v1.0.0",
},
},
{
name: "disabled",
want: true,
config: Config{
Enabled: true,
Exporter: OTLPgRPC,
Endpoint: "localhost:4717",
Service: "test",
InstanceID: "s01",
Version: "v1.0.0",
},
other: Config{
Enabled: false,
Exporter: OTLPgRPC,
Endpoint: "localhost:4717",
Service: "test",
InstanceID: "s01",
Version: "v1.0.0",
},
},
{
name: "do not use endpoint for stdout",
want: false,
config: Config{
Enabled: true,
Exporter: Stdout,
Endpoint: "localhost:4717",
Service: "test",
InstanceID: "s01",
Version: "v1.0.0",
},
other: Config{
Enabled: true,
Exporter: Stdout,
Endpoint: "otherhost:4717",
Service: "test",
InstanceID: "s01",
Version: "v1.0.0",
},
},
{
name: "use endpoint for grpc",
want: true,
config: Config{
Enabled: true,
Exporter: OTLPgRPC,
Endpoint: "localhost:4717",
Service: "test",
InstanceID: "s01",
Version: "v1.0.0",
},
other: Config{
Enabled: true,
Exporter: OTLPgRPC,
Endpoint: "otherhost:4717",
Service: "test",
InstanceID: "s01",
Version: "v1.0.0",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.config.hasChange(&tt.other); got != tt.want {
t.Errorf("Config.equal() = %v, want %v", got, tt.want)
}
})
}
}

96
pkg/tracing/propagator.go Normal file
View file

@ -0,0 +1,96 @@
package tracing
import (
"context"
"fmt"
"strconv"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
const (
traceIDHeader = "x-frostfs-trace-id"
spanIDHeader = "x-frostfs-span-id"
flagsHeader = "x-frostfs-trace-flags"
)
const (
flagsSampled = 1 << iota
)
// propagator serializes SpanContext to/from headers.
// x-frostfs-trace-id - TraceID, 16 bytes, hex-string (32 bytes).
// x-frostfs-span-id - SpanID, 8 bytes, hexstring (16 bytes).
// x-frostfs-trace-flags - trace flags (now sampled only).
type propagator struct{}
// Propagator is propagation.TextMapPropagator instance, used to extract/inject trace info from/to remote context.
var Propagator propagation.TextMapPropagator = &propagator{}
// Inject injects tracing info to carrier.
func (p *propagator) Inject(ctx context.Context, carrier propagation.TextMapCarrier) {
sc := trace.SpanFromContext(ctx).SpanContext()
if !sc.TraceID().IsValid() || !sc.SpanID().IsValid() {
return
}
var flags int
if sc.IsSampled() {
flags = flags | flagsSampled
}
carrier.Set(traceIDHeader, sc.TraceID().String())
carrier.Set(spanIDHeader, sc.SpanID().String())
carrier.Set(flagsHeader, fmt.Sprintf("%x", flags))
}
// Extract extracts tracing info from carrier and returns context with tracing info.
// In case of error returns ctx.
func (p *propagator) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context {
spanConfig := trace.SpanContextConfig{}
var err error
traceIDStr := carrier.Get(traceIDHeader)
traceIDDefined := false
if traceIDStr != "" {
traceIDDefined = true
spanConfig.TraceID, err = trace.TraceIDFromHex(traceIDStr)
if err != nil {
return ctx
}
}
spanIDstr := carrier.Get(spanIDHeader)
spanIDDefined := false
if spanIDstr != "" {
spanIDDefined = true
spanConfig.SpanID, err = trace.SpanIDFromHex(spanIDstr)
if err != nil {
return ctx
}
}
if traceIDDefined != spanIDDefined {
return ctx //traceID + spanID must be defined OR no traceID and no spanID
}
flagsStr := carrier.Get(flagsHeader)
if flagsStr != "" {
var v int64
v, err = strconv.ParseInt(flagsStr, 16, 32)
if err != nil {
return ctx
}
if v&flagsSampled == flagsSampled {
spanConfig.TraceFlags = trace.FlagsSampled
}
}
return trace.ContextWithRemoteSpanContext(ctx, trace.NewSpanContext(spanConfig))
}
// Fields returns the keys whose values are set with Inject.
func (p *propagator) Fields() []string {
return []string{traceIDHeader, spanIDHeader, flagsHeader}
}

View file

@ -0,0 +1,257 @@
package tracing
import (
"context"
"encoding/hex"
"math/rand"
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
)
type testCarrier struct {
Values map[string]string
}
func (c *testCarrier) Get(key string) string {
return c.Values[key]
}
func (c *testCarrier) Set(key string, value string) {
c.Values[key] = value
}
func (c *testCarrier) Keys() []string {
res := make([]string, 0, len(c.Values))
for k := range c.Values {
res = append(res, k)
}
return res
}
var p = &propagator{}
func TestPropagator_Inject(t *testing.T) {
t.Run("injects trace_id and span_id if valid", func(t *testing.T) {
traceIDBytes := make([]byte, 16)
rand.Read(traceIDBytes)
traceIDHex := hex.EncodeToString(traceIDBytes)
spanIDBytes := make([]byte, 8)
rand.Read(spanIDBytes)
spanIDHex := hex.EncodeToString(spanIDBytes)
spanConfig := trace.SpanContextConfig{}
spanConfig.TraceID, _ = trace.TraceIDFromHex(traceIDHex)
spanConfig.SpanID, _ = trace.SpanIDFromHex(spanIDHex)
spanConfig.TraceFlags = trace.FlagsSampled
ctx := trace.ContextWithRemoteSpanContext(context.Background(), trace.NewSpanContext(spanConfig))
c := &testCarrier{
Values: make(map[string]string),
}
p.Inject(ctx, c)
require.Equal(t, 3, len(c.Values), "not all headers were saved")
require.Equal(t, traceIDHex, c.Values[traceIDHeader], "unexpected trace id")
require.Equal(t, spanIDHex, c.Values[spanIDHeader], "unexpected span id")
require.Equal(t, "1", c.Values[flagsHeader], "unexpected flags")
})
t.Run("doesn't injects if trace_id is invalid", func(t *testing.T) {
traceIDBytes := make([]byte, 16)
traceIDHex := hex.EncodeToString(traceIDBytes)
spanIDBytes := make([]byte, 8)
rand.Read(spanIDBytes)
spanIDHex := hex.EncodeToString(spanIDBytes)
spanConfig := trace.SpanContextConfig{}
spanConfig.TraceID, _ = trace.TraceIDFromHex(traceIDHex)
spanConfig.SpanID, _ = trace.SpanIDFromHex(spanIDHex)
spanConfig.TraceFlags = trace.FlagsSampled
ctx := trace.ContextWithRemoteSpanContext(context.Background(), trace.NewSpanContext(spanConfig))
c := &testCarrier{
Values: make(map[string]string),
}
p.Inject(ctx, c)
require.Equal(t, 0, len(c.Values), "some headers were saved")
})
t.Run("doesn't injects if span_id is invalid", func(t *testing.T) {
traceIDBytes := make([]byte, 16)
rand.Read(traceIDBytes)
traceIDHex := hex.EncodeToString(traceIDBytes)
spanIDBytes := make([]byte, 8)
spanIDHex := hex.EncodeToString(spanIDBytes)
spanConfig := trace.SpanContextConfig{}
spanConfig.TraceID, _ = trace.TraceIDFromHex(traceIDHex)
spanConfig.SpanID, _ = trace.SpanIDFromHex(spanIDHex)
spanConfig.TraceFlags = trace.FlagsSampled
ctx := trace.ContextWithRemoteSpanContext(context.Background(), trace.NewSpanContext(spanConfig))
c := &testCarrier{
Values: make(map[string]string),
}
p.Inject(ctx, c)
require.Equal(t, 0, len(c.Values), "some headers were saved")
})
t.Run("injects flags if no flags specified", func(t *testing.T) {
traceIDBytes := make([]byte, 16)
rand.Read(traceIDBytes)
traceIDHex := hex.EncodeToString(traceIDBytes)
spanIDBytes := make([]byte, 8)
rand.Read(spanIDBytes)
spanIDHex := hex.EncodeToString(spanIDBytes)
spanConfig := trace.SpanContextConfig{}
spanConfig.TraceID, _ = trace.TraceIDFromHex(traceIDHex)
spanConfig.SpanID, _ = trace.SpanIDFromHex(spanIDHex)
ctx := trace.ContextWithRemoteSpanContext(context.Background(), trace.NewSpanContext(spanConfig))
c := &testCarrier{
Values: make(map[string]string),
}
p.Inject(ctx, c)
require.Equal(t, 3, len(c.Values), "not all headers were saved")
require.Equal(t, traceIDHex, c.Values[traceIDHeader], "unexpected trace id")
require.Equal(t, spanIDHex, c.Values[spanIDHeader], "unexpected span id")
require.Equal(t, "0", c.Values[flagsHeader], "unexpected flags")
})
}
func TestPropagator_Extract(t *testing.T) {
t.Run("extracts if set", func(t *testing.T) {
c := &testCarrier{
Values: make(map[string]string),
}
traceIDBytes := make([]byte, 16)
rand.Read(traceIDBytes)
traceIDHex := hex.EncodeToString(traceIDBytes)
c.Values[traceIDHeader] = traceIDHex
spanIDBytes := make([]byte, 8)
rand.Read(spanIDBytes)
spanIDHex := hex.EncodeToString(spanIDBytes)
c.Values[spanIDHeader] = spanIDHex
c.Values[flagsHeader] = "1"
ctx := p.Extract(context.Background(), c)
sc := trace.SpanFromContext(ctx).SpanContext()
require.True(t, sc.HasTraceID(), "trace_id was not set")
require.Equal(t, traceIDHex, sc.TraceID().String(), "trace_id doesn't match")
require.True(t, sc.HasSpanID(), "span_id was not set")
require.Equal(t, spanIDHex, sc.SpanID().String(), "span_id doesn't match")
require.True(t, sc.IsSampled(), "sampled was not set")
})
t.Run("not extracts if only trace_id defined", func(t *testing.T) {
c := &testCarrier{
Values: make(map[string]string),
}
traceIDBytes := make([]byte, 16)
rand.Read(traceIDBytes)
traceIDHex := hex.EncodeToString(traceIDBytes)
c.Values[traceIDHeader] = traceIDHex
c.Values[flagsHeader] = "1"
ctx := p.Extract(context.Background(), c)
sc := trace.SpanFromContext(ctx).SpanContext()
require.False(t, sc.HasTraceID(), "trace_id was set")
require.False(t, sc.HasSpanID(), "span_id was set")
require.False(t, sc.IsSampled(), "sampled was set")
})
t.Run("not extracts if only span_id defined", func(t *testing.T) {
c := &testCarrier{
Values: make(map[string]string),
}
spanIDBytes := make([]byte, 8)
rand.Read(spanIDBytes)
spanIDHex := hex.EncodeToString(spanIDBytes)
c.Values[spanIDHeader] = spanIDHex
c.Values[flagsHeader] = "1"
ctx := p.Extract(context.Background(), c)
sc := trace.SpanFromContext(ctx).SpanContext()
require.False(t, sc.HasTraceID(), "trace_id was set")
require.False(t, sc.HasSpanID(), "span_id was set")
require.False(t, sc.IsSampled(), "sampled was set")
})
t.Run("not extracts if trace_id is in invalid", func(t *testing.T) {
c := &testCarrier{
Values: make(map[string]string),
}
c.Values[traceIDHeader] = "loren ipsum"
spanIDBytes := make([]byte, 8)
rand.Read(spanIDBytes)
spanIDHex := hex.EncodeToString(spanIDBytes)
c.Values[spanIDHeader] = spanIDHex
c.Values[flagsHeader] = "1"
ctx := p.Extract(context.Background(), c)
sc := trace.SpanFromContext(ctx).SpanContext()
require.False(t, sc.HasTraceID(), "trace_id was set")
require.False(t, sc.HasSpanID(), "span_id was set")
require.False(t, sc.IsSampled(), "sampled was set")
})
t.Run("not extracts if span_id is invalid", func(t *testing.T) {
c := &testCarrier{
Values: make(map[string]string),
}
c.Values[spanIDHeader] = "loren ipsum"
traceIDBytes := make([]byte, 16)
rand.Read(traceIDBytes)
traceIDHex := hex.EncodeToString(traceIDBytes)
c.Values[traceIDHeader] = traceIDHex
c.Values[flagsHeader] = "1"
ctx := p.Extract(context.Background(), c)
sc := trace.SpanFromContext(ctx).SpanContext()
require.False(t, sc.HasTraceID(), "trace_id was set")
require.False(t, sc.HasSpanID(), "span_id was set")
require.False(t, sc.IsSampled(), "sampled was set")
})
t.Run("not extracts if flags is invalid", func(t *testing.T) {
c := &testCarrier{
Values: make(map[string]string),
}
traceIDBytes := make([]byte, 16)
rand.Read(traceIDBytes)
traceIDHex := hex.EncodeToString(traceIDBytes)
c.Values[traceIDHeader] = traceIDHex
spanIDBytes := make([]byte, 8)
rand.Read(spanIDBytes)
spanIDHex := hex.EncodeToString(spanIDBytes)
c.Values[spanIDHeader] = spanIDHex
c.Values[flagsHeader] = "loren ipsum"
ctx := p.Extract(context.Background(), c)
sc := trace.SpanFromContext(ctx).SpanContext()
require.False(t, sc.HasTraceID(), "trace_id was set")
require.False(t, sc.HasSpanID(), "span_id was set")
require.False(t, sc.IsSampled(), "sampled was set")
})
}

156
pkg/tracing/setup.go Normal file
View file

@ -0,0 +1,156 @@
package tracing
import (
"context"
"fmt"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
var (
// tracingLock protects provider, done, config and tracer from concurrent update.
// These fields change when the config is updated or the application is shutdown.
tracingLock = &sync.Mutex{}
provider *sdktrace.TracerProvider
done bool
config = Config{}
tracer = getDefaultTracer()
)
// Setup initializes global tracer.
// Returns true if global tracer was updated.
// Shutdown method must be called for graceful shutdown.
func Setup(ctx context.Context, cfg Config) (bool, error) {
if err := cfg.validate(); err != nil {
return false, err
}
tracingLock.Lock()
defer tracingLock.Unlock()
if done {
return false, fmt.Errorf("failed to setup tracing: already shutdown")
}
if !config.hasChange(&cfg) {
return false, nil
}
if !cfg.Enabled {
config = cfg
tracer.Store(&tracerHolder{Tracer: trace.NewNoopTracerProvider().Tracer("")})
return true, flushAndShutdown(ctx)
}
exp, err := getExporter(ctx, &cfg)
if err != nil {
return false, err
}
prevProvider := provider
provider = sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(newResource(&cfg)),
)
config = cfg
tracer.Store(&tracerHolder{Tracer: provider.Tracer(cfg.Service)})
var retErr error
if prevProvider != nil {
retErr = prevProvider.ForceFlush(ctx)
if err := prevProvider.Shutdown(ctx); err != nil {
if retErr == nil {
retErr = err
} else {
retErr = fmt.Errorf("%v ; %v", retErr, err)
}
}
}
return true, retErr
}
// Shutdown shutdowns tracing.
func Shutdown(ctx context.Context) error {
tracingLock.Lock()
defer tracingLock.Unlock()
if done {
return nil
}
done = true
config = Config{}
tracer.Store(&tracerHolder{Tracer: trace.NewNoopTracerProvider().Tracer("")})
return flushAndShutdown(ctx)
}
func getDefaultTracer() *atomic.Value {
v := &atomic.Value{}
v.Store(&tracerHolder{Tracer: trace.NewNoopTracerProvider().Tracer("")})
return v
}
func flushAndShutdown(ctx context.Context) error {
if provider == nil {
return nil
}
tmp := provider
provider = nil
var retErr error
retErr = tmp.ForceFlush(ctx)
if err := tmp.Shutdown(ctx); err != nil {
if retErr == nil {
retErr = err
} else {
retErr = fmt.Errorf("%v ; %v", retErr, err)
}
}
return retErr
}
func getExporter(ctx context.Context, cfg *Config) (sdktrace.SpanExporter, error) {
switch cfg.Exporter {
default:
return nil, fmt.Errorf("failed to setup tracing: unknown tracing exporter (%s)", cfg.Exporter)
case Stdout:
return stdouttrace.New()
case OTLPgRPC:
return otlptracegrpc.New(ctx, otlptracegrpc.WithEndpoint(cfg.Endpoint), otlptracegrpc.WithInsecure())
}
}
func newResource(cfg *Config) *resource.Resource {
attrs := []attribute.KeyValue{
semconv.ServiceName(cfg.Service),
}
if len(cfg.Version) > 0 {
attrs = append(attrs, semconv.ServiceVersion(cfg.Version))
}
if len(cfg.InstanceID) > 0 {
attrs = append(attrs, semconv.ServiceInstanceID(cfg.InstanceID))
}
return resource.NewWithAttributes(
semconv.SchemaURL,
attrs...,
)
}
type tracerHolder struct {
Tracer trace.Tracer
}

12
pkg/tracing/span.go Normal file
View file

@ -0,0 +1,12 @@
package tracing
import (
"context"
"go.opentelemetry.io/otel/trace"
)
// StartSpanFromContext creates a span and a context.Context containing the newly-created span.
func StartSpanFromContext(ctx context.Context, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return tracer.Load().(*tracerHolder).Tracer.Start(ctx, operationName, opts...)
}