frostfs-observability/tracing/setup.go
Aleksey Savaitan db6cf1ea16
All checks were successful
DCO action / DCO (pull_request) Successful in 45s
Tests and linters / Tests (1.21) (pull_request) Successful in 1m38s
Tests and linters / Tests (1.22) (pull_request) Successful in 1m38s
Tests and linters / Tests with -race (pull_request) Successful in 1m42s
Tests and linters / Staticcheck (pull_request) Successful in 1m50s
Tests and linters / Lint (pull_request) Successful in 2m0s
[#13] support tls over grpc for otlp_grpc exporter type
Signed-off-by: Aleksey Savaitan <a.savaitan@yadro.com>
2024-09-04 14:42:35 +03:00

177 lines
4.3 KiB
Go

package tracing
import (
"context"
"crypto/x509"
"fmt"
"os"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/grpc/credentials"
)
var ErrInvalidServerRootCaCertificate = fmt.Errorf("invalid server root ca certificate")
var (
// tracingLock protects provider, done, config and tracer from concurrent update.
// These fields change when the config is updated or the application is shutdown.
tracingLock = &sync.Mutex{}
provider *sdktrace.TracerProvider
done bool
config = Config{}
tracer = getDefaultTracer()
)
// Setup initializes global tracer.
// Returns true if global tracer was updated.
// Shutdown method must be called for graceful shutdown.
func Setup(ctx context.Context, cfg Config) (bool, error) {
if err := cfg.validate(); err != nil {
return false, err
}
tracingLock.Lock()
defer tracingLock.Unlock()
if done {
return false, fmt.Errorf("failed to setup tracing: already shutdown")
}
if !config.hasChange(&cfg) {
return false, nil
}
if !cfg.Enabled {
config = cfg
tracer.Store(&tracerHolder{Tracer: noop.NewTracerProvider().Tracer("")})
return true, flushAndShutdown(ctx)
}
exp, err := getExporter(ctx, &cfg)
if err != nil {
return false, err
}
prevProvider := provider
provider = sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(newResource(&cfg)),
)
config = cfg
tracer.Store(&tracerHolder{Tracer: provider.Tracer(cfg.Service)})
var retErr error
if prevProvider != nil {
retErr = prevProvider.ForceFlush(ctx)
if err := prevProvider.Shutdown(ctx); err != nil {
if retErr == nil {
retErr = err
} else {
retErr = fmt.Errorf("%v ; %v", retErr, err)
}
}
}
return true, retErr
}
// Shutdown shutdowns tracing.
func Shutdown(ctx context.Context) error {
tracingLock.Lock()
defer tracingLock.Unlock()
if done {
return nil
}
done = true
config = Config{}
tracer.Store(&tracerHolder{Tracer: noop.NewTracerProvider().Tracer("")})
return flushAndShutdown(ctx)
}
func getDefaultTracer() *atomic.Pointer[tracerHolder] {
v := new(atomic.Pointer[tracerHolder])
v.Store(&tracerHolder{Tracer: noop.NewTracerProvider().Tracer("")})
return v
}
func flushAndShutdown(ctx context.Context) error {
if provider == nil {
return nil
}
tmp := provider
provider = nil
var retErr error
retErr = tmp.ForceFlush(ctx)
if err := tmp.Shutdown(ctx); err != nil {
if retErr == nil {
retErr = err
} else {
retErr = fmt.Errorf("%v ; %v", retErr, err)
}
}
return retErr
}
func getExporter(ctx context.Context, cfg *Config) (sdktrace.SpanExporter, error) {
switch cfg.Exporter {
default:
return nil, fmt.Errorf("failed to setup tracing: unknown tracing exporter (%s)", cfg.Exporter)
case StdoutExporter:
return stdouttrace.New()
case NoOpExporter:
return tracetest.NewNoopExporter(), nil
case OTLPgRPCExporter:
securityOption := otlptracegrpc.WithInsecure()
if cfg.ServerCaCertPath != "" {
ca, err := os.ReadFile(cfg.ServerCaCertPath)
if err != nil {
return nil, fmt.Errorf("%w: cannot read server CA cert by path %s, %w", ErrInvalidServerRootCaCertificate, cfg.ServerCaCertPath, err)
}
roots := x509.NewCertPool()
if !roots.AppendCertsFromPEM(ca) {
return nil, fmt.Errorf("%w: failed to append certificates from server CA pem %s", ErrInvalidServerRootCaCertificate, cfg.ServerCaCertPath)
}
securityOption = otlptracegrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(roots, ""))
}
return otlptracegrpc.New(ctx, otlptracegrpc.WithEndpoint(cfg.Endpoint), securityOption)
}
}
func newResource(cfg *Config) *resource.Resource {
attrs := []attribute.KeyValue{
semconv.ServiceName(cfg.Service),
}
if len(cfg.Version) > 0 {
attrs = append(attrs, semconv.ServiceVersion(cfg.Version))
}
if len(cfg.InstanceID) > 0 {
attrs = append(attrs, semconv.ServiceInstanceID(cfg.InstanceID))
}
return resource.NewWithAttributes(
semconv.SchemaURL,
attrs...,
)
}
type tracerHolder struct {
Tracer trace.Tracer
}