176 lines
4.2 KiB
Go
176 lines
4.2 KiB
Go
package tracing
|
|
|
|
import (
|
|
"context"
|
|
"crypto/x509"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
|
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
|
"go.opentelemetry.io/otel/sdk/trace/tracetest"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.opentelemetry.io/otel/trace/noop"
|
|
"google.golang.org/grpc/credentials"
|
|
)
|
|
|
|
// ErrEmptyServerRootCaPool indicates that cert pool is empty and does not have any certificates inside.
|
|
var ErrEmptyServerRootCaPool = errors.New("empty server root ca cert pool")
|
|
|
|
var (
|
|
// tracingLock protects provider, done, config and tracer from concurrent update.
|
|
// These fields change when the config is updated or the application is shutdown.
|
|
tracingLock = &sync.Mutex{}
|
|
|
|
provider *sdktrace.TracerProvider
|
|
done bool
|
|
|
|
config = Config{}
|
|
tracer = getDefaultTracer()
|
|
)
|
|
|
|
// Setup initializes global tracer.
|
|
// Returns true if global tracer was updated.
|
|
// Shutdown method must be called for graceful shutdown.
|
|
func Setup(ctx context.Context, cfg Config) (bool, error) {
|
|
if err := cfg.validate(); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
tracingLock.Lock()
|
|
defer tracingLock.Unlock()
|
|
|
|
if done {
|
|
return false, fmt.Errorf("failed to setup tracing: already shutdown")
|
|
}
|
|
|
|
if !config.hasChange(&cfg) {
|
|
return false, nil
|
|
}
|
|
|
|
if !cfg.Enabled {
|
|
config = cfg
|
|
tracer.Store(&tracerHolder{Tracer: noop.NewTracerProvider().Tracer("")})
|
|
return true, flushAndShutdown(ctx)
|
|
}
|
|
|
|
exp, err := getExporter(ctx, &cfg)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
prevProvider := provider
|
|
|
|
provider = sdktrace.NewTracerProvider(
|
|
sdktrace.WithBatcher(exp),
|
|
sdktrace.WithResource(newResource(&cfg)),
|
|
)
|
|
|
|
config = cfg
|
|
tracer.Store(&tracerHolder{Tracer: provider.Tracer(cfg.Service)})
|
|
|
|
var retErr error
|
|
if prevProvider != nil {
|
|
retErr = prevProvider.ForceFlush(ctx)
|
|
if err := prevProvider.Shutdown(ctx); err != nil {
|
|
if retErr == nil {
|
|
retErr = err
|
|
} else {
|
|
retErr = fmt.Errorf("%v ; %v", retErr, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return true, retErr
|
|
}
|
|
|
|
// Shutdown shutdowns tracing.
|
|
func Shutdown(ctx context.Context) error {
|
|
tracingLock.Lock()
|
|
defer tracingLock.Unlock()
|
|
|
|
if done {
|
|
return nil
|
|
}
|
|
|
|
done = true
|
|
|
|
config = Config{}
|
|
tracer.Store(&tracerHolder{Tracer: noop.NewTracerProvider().Tracer("")})
|
|
|
|
return flushAndShutdown(ctx)
|
|
}
|
|
|
|
func getDefaultTracer() *atomic.Pointer[tracerHolder] {
|
|
v := new(atomic.Pointer[tracerHolder])
|
|
v.Store(&tracerHolder{Tracer: noop.NewTracerProvider().Tracer("")})
|
|
return v
|
|
}
|
|
|
|
func flushAndShutdown(ctx context.Context) error {
|
|
if provider == nil {
|
|
return nil
|
|
}
|
|
|
|
tmp := provider
|
|
provider = nil
|
|
var retErr error
|
|
retErr = tmp.ForceFlush(ctx)
|
|
if err := tmp.Shutdown(ctx); err != nil {
|
|
if retErr == nil {
|
|
retErr = err
|
|
} else {
|
|
retErr = fmt.Errorf("%v ; %v", retErr, err)
|
|
}
|
|
}
|
|
return retErr
|
|
}
|
|
|
|
func getExporter(ctx context.Context, cfg *Config) (sdktrace.SpanExporter, error) {
|
|
switch cfg.Exporter {
|
|
default:
|
|
return nil, fmt.Errorf("failed to setup tracing: unknown tracing exporter (%s)", cfg.Exporter)
|
|
case StdoutExporter:
|
|
return stdouttrace.New()
|
|
case NoOpExporter:
|
|
return tracetest.NewNoopExporter(), nil
|
|
case OTLPgRPCExporter:
|
|
securityOption := otlptracegrpc.WithInsecure()
|
|
if cfg.ServerCaCertPool != nil {
|
|
if cfg.ServerCaCertPool.Equal(x509.NewCertPool()) {
|
|
return nil, fmt.Errorf("failed to setup tracing: %w", ErrEmptyServerRootCaPool)
|
|
}
|
|
securityOption = otlptracegrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(cfg.ServerCaCertPool, ""))
|
|
}
|
|
return otlptracegrpc.New(ctx, otlptracegrpc.WithEndpoint(cfg.Endpoint), securityOption)
|
|
}
|
|
}
|
|
|
|
func newResource(cfg *Config) *resource.Resource {
|
|
attrs := []attribute.KeyValue{
|
|
semconv.ServiceName(cfg.Service),
|
|
}
|
|
if len(cfg.Version) > 0 {
|
|
attrs = append(attrs, semconv.ServiceVersion(cfg.Version))
|
|
}
|
|
if len(cfg.InstanceID) > 0 {
|
|
attrs = append(attrs, semconv.ServiceInstanceID(cfg.InstanceID))
|
|
}
|
|
for k, v := range cfg.Attributes {
|
|
attrs = append(attrs, attribute.String(k, v))
|
|
}
|
|
return resource.NewWithAttributes(
|
|
semconv.SchemaURL,
|
|
attrs...,
|
|
)
|
|
}
|
|
|
|
type tracerHolder struct {
|
|
Tracer trace.Tracer
|
|
}
|