forked from TrueCloudLab/distribution
[otel-tracing] Added Tracing to Base package (driver) (#4196)
This commit is contained in:
commit
51a72c2aef
5 changed files with 274 additions and 25 deletions
10
go.mod
10
go.mod
|
@ -29,8 +29,13 @@ require (
|
|||
github.com/spf13/cobra v1.8.0
|
||||
github.com/stretchr/testify v1.8.4
|
||||
go.opentelemetry.io/contrib/exporters/autoexport v0.46.1
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1
|
||||
go.opentelemetry.io/otel v1.21.0
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0
|
||||
go.opentelemetry.io/otel/sdk v1.21.0
|
||||
go.opentelemetry.io/otel/trace v1.21.0
|
||||
golang.org/x/crypto v0.18.0
|
||||
golang.org/x/net v0.20.0
|
||||
golang.org/x/oauth2 v0.11.0
|
||||
google.golang.org/api v0.126.0
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
|
@ -73,8 +78,6 @@ require (
|
|||
github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1
|
||||
go.opentelemetry.io/otel v1.21.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.44.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
|
||||
|
@ -82,12 +85,9 @@ require (
|
|||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.44.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.21.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
|
||||
golang.org/x/net v0.20.0
|
||||
golang.org/x/sync v0.3.0 // indirect
|
||||
golang.org/x/sys v0.16.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
|
|
|
@ -46,12 +46,20 @@ import (
|
|||
"github.com/distribution/distribution/v3/internal/dcontext"
|
||||
prometheus "github.com/distribution/distribution/v3/metrics"
|
||||
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
|
||||
"github.com/distribution/distribution/v3/tracing"
|
||||
"github.com/docker/go-metrics"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// storageAction is the metrics of blob related operations
|
||||
var storageAction = prometheus.StorageNamespace.NewLabeledTimer("action", "The number of seconds that the storage action takes", "driver", "action")
|
||||
|
||||
// tracer is the OpenTelemetry tracer utilized for tracing operations within
|
||||
// this package's code.
|
||||
var tracer = otel.Tracer("github.com/distribution/distribution/v3/registry/storage/driver/base")
|
||||
|
||||
func init() {
|
||||
metrics.Register(prometheus.StorageNamespace)
|
||||
}
|
||||
|
@ -89,8 +97,16 @@ func (base *Base) setDriverName(e error) error {
|
|||
|
||||
// GetContent wraps GetContent of underlying storage driver.
|
||||
func (base *Base) GetContent(ctx context.Context, path string) ([]byte, error) {
|
||||
ctx, done := dcontext.WithTrace(ctx)
|
||||
defer done("%s.GetContent(%q)", base.Name(), path)
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
|
||||
attribute.String(tracing.AttributePrefix+"storage.path", path),
|
||||
}
|
||||
ctx, span := tracer.Start(
|
||||
ctx,
|
||||
"GetContent",
|
||||
trace.WithAttributes(attrs...))
|
||||
|
||||
defer span.End()
|
||||
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
||||
|
@ -104,8 +120,17 @@ func (base *Base) GetContent(ctx context.Context, path string) ([]byte, error) {
|
|||
|
||||
// PutContent wraps PutContent of underlying storage driver.
|
||||
func (base *Base) PutContent(ctx context.Context, path string, content []byte) error {
|
||||
ctx, done := dcontext.WithTrace(ctx)
|
||||
defer done("%s.PutContent(%q)", base.Name(), path)
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
|
||||
attribute.String(tracing.AttributePrefix+"storage.path", path),
|
||||
attribute.Int(tracing.AttributePrefix+"storage.content.length", len(content)),
|
||||
}
|
||||
ctx, span := tracer.Start(
|
||||
ctx,
|
||||
"PutContent",
|
||||
trace.WithAttributes(attrs...))
|
||||
|
||||
defer span.End()
|
||||
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
||||
|
@ -119,8 +144,17 @@ func (base *Base) PutContent(ctx context.Context, path string, content []byte) e
|
|||
|
||||
// Reader wraps Reader of underlying storage driver.
|
||||
func (base *Base) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||
ctx, done := dcontext.WithTrace(ctx)
|
||||
defer done("%s.Reader(%q, %d)", base.Name(), path, offset)
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
|
||||
attribute.String(tracing.AttributePrefix+"storage.path", path),
|
||||
attribute.Int64(tracing.AttributePrefix+"storage.offset", offset),
|
||||
}
|
||||
ctx, span := tracer.Start(
|
||||
ctx,
|
||||
"Reader",
|
||||
trace.WithAttributes(attrs...))
|
||||
|
||||
defer span.End()
|
||||
|
||||
if offset < 0 {
|
||||
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset, DriverName: base.StorageDriver.Name()}
|
||||
|
@ -136,8 +170,17 @@ func (base *Base) Reader(ctx context.Context, path string, offset int64) (io.Rea
|
|||
|
||||
// Writer wraps Writer of underlying storage driver.
|
||||
func (base *Base) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
||||
ctx, done := dcontext.WithTrace(ctx)
|
||||
defer done("%s.Writer(%q, %v)", base.Name(), path, append)
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
|
||||
attribute.String(tracing.AttributePrefix+"storage.path", path),
|
||||
attribute.Bool(tracing.AttributePrefix+"storage.append", append),
|
||||
}
|
||||
ctx, span := tracer.Start(
|
||||
ctx,
|
||||
"Writer",
|
||||
trace.WithAttributes(attrs...))
|
||||
|
||||
defer span.End()
|
||||
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
||||
|
@ -149,8 +192,16 @@ func (base *Base) Writer(ctx context.Context, path string, append bool) (storage
|
|||
|
||||
// Stat wraps Stat of underlying storage driver.
|
||||
func (base *Base) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
|
||||
ctx, done := dcontext.WithTrace(ctx)
|
||||
defer done("%s.Stat(%q)", base.Name(), path)
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
|
||||
attribute.String(tracing.AttributePrefix+"storage.path", path),
|
||||
}
|
||||
ctx, span := tracer.Start(
|
||||
ctx,
|
||||
"Stat",
|
||||
trace.WithAttributes(attrs...))
|
||||
|
||||
defer span.End()
|
||||
|
||||
if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
|
||||
return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
||||
|
@ -164,8 +215,16 @@ func (base *Base) Stat(ctx context.Context, path string) (storagedriver.FileInfo
|
|||
|
||||
// List wraps List of underlying storage driver.
|
||||
func (base *Base) List(ctx context.Context, path string) ([]string, error) {
|
||||
ctx, done := dcontext.WithTrace(ctx)
|
||||
defer done("%s.List(%q)", base.Name(), path)
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
|
||||
attribute.String(tracing.AttributePrefix+"storage.path", path),
|
||||
}
|
||||
ctx, span := tracer.Start(
|
||||
ctx,
|
||||
"List",
|
||||
trace.WithAttributes(attrs...))
|
||||
|
||||
defer span.End()
|
||||
|
||||
if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
|
||||
return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
||||
|
@ -179,6 +238,18 @@ func (base *Base) List(ctx context.Context, path string) ([]string, error) {
|
|||
|
||||
// Move wraps Move of underlying storage driver.
|
||||
func (base *Base) Move(ctx context.Context, sourcePath string, destPath string) error {
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
|
||||
attribute.String(tracing.AttributePrefix+"storage.source.path", sourcePath),
|
||||
attribute.String(tracing.AttributePrefix+"storage.dest.path", destPath),
|
||||
}
|
||||
ctx, span := tracer.Start(
|
||||
ctx,
|
||||
"Move",
|
||||
trace.WithAttributes(attrs...))
|
||||
|
||||
defer span.End()
|
||||
|
||||
ctx, done := dcontext.WithTrace(ctx)
|
||||
defer done("%s.Move(%q, %q", base.Name(), sourcePath, destPath)
|
||||
|
||||
|
@ -196,8 +267,16 @@ func (base *Base) Move(ctx context.Context, sourcePath string, destPath string)
|
|||
|
||||
// Delete wraps Delete of underlying storage driver.
|
||||
func (base *Base) Delete(ctx context.Context, path string) error {
|
||||
ctx, done := dcontext.WithTrace(ctx)
|
||||
defer done("%s.Delete(%q)", base.Name(), path)
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
|
||||
attribute.String(tracing.AttributePrefix+"storage.path", path),
|
||||
}
|
||||
ctx, span := tracer.Start(
|
||||
ctx,
|
||||
"Delete",
|
||||
trace.WithAttributes(attrs...))
|
||||
|
||||
defer span.End()
|
||||
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
||||
|
@ -211,8 +290,16 @@ func (base *Base) Delete(ctx context.Context, path string) error {
|
|||
|
||||
// RedirectURL wraps RedirectURL of the underlying storage driver.
|
||||
func (base *Base) RedirectURL(r *http.Request, path string) (string, error) {
|
||||
ctx, done := dcontext.WithTrace(r.Context())
|
||||
defer done("%s.RedirectURL(%q)", base.Name(), path)
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
|
||||
attribute.String(tracing.AttributePrefix+"storage.path", path),
|
||||
}
|
||||
ctx, span := tracer.Start(
|
||||
r.Context(),
|
||||
"RedirectURL",
|
||||
trace.WithAttributes(attrs...))
|
||||
|
||||
defer span.End()
|
||||
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
return "", storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
||||
|
@ -226,8 +313,16 @@ func (base *Base) RedirectURL(r *http.Request, path string) (string, error) {
|
|||
|
||||
// Walk wraps Walk of underlying storage driver.
|
||||
func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
|
||||
ctx, done := dcontext.WithTrace(ctx)
|
||||
defer done("%s.Walk(%q)", base.Name(), path)
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
|
||||
attribute.String(tracing.AttributePrefix+"storage.path", path),
|
||||
}
|
||||
ctx, span := tracer.Start(
|
||||
ctx,
|
||||
"Walk",
|
||||
trace.WithAttributes(attrs...))
|
||||
|
||||
defer span.End()
|
||||
|
||||
if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
|
||||
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
|
||||
|
|
42
tracing/exporter.go
Normal file
42
tracing/exporter.go
Normal file
|
@ -0,0 +1,42 @@
|
|||
package tracing
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
)
|
||||
|
||||
// compositeExporter is a custom exporter that wraps multiple SpanExporters.
|
||||
// It allows you to export spans to multiple destinations, e.g., different telemetry backends.
|
||||
type compositeExporter struct {
|
||||
exporters []sdktrace.SpanExporter
|
||||
}
|
||||
|
||||
func newCompositeExporter(exporters ...sdktrace.SpanExporter) *compositeExporter {
|
||||
return &compositeExporter{exporters: exporters}
|
||||
}
|
||||
|
||||
// ExportSpans iterates over each SpanExporter in the compositeExporter and
|
||||
// exports the spans. If any exporter returns an error, the process is stopped
|
||||
// and the error is returned. This ensures that span exporting behaves correctly
|
||||
// and reports errors as expected.
|
||||
func (ce *compositeExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
|
||||
for _, exporter := range ce.exporters {
|
||||
if err := exporter.ExportSpans(ctx, spans); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown iterates over each SpanExporter in the compositeExporter and
|
||||
// shuts them down. If any exporter returns an error during shutdown, the process
|
||||
// is stopped and the error is returned. This ensures proper shutdown of all exporters.
|
||||
func (ce *compositeExporter) Shutdown(ctx context.Context) error {
|
||||
for _, exporter := range ce.exporters {
|
||||
if err := exporter.Shutdown(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
82
tracing/exporter_test.go
Normal file
82
tracing/exporter_test.go
Normal file
|
@ -0,0 +1,82 @@
|
|||
package tracing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/trace"
|
||||
)
|
||||
|
||||
type mockSpanExporter struct {
|
||||
exportSpansCalled bool
|
||||
shutdownCalled bool
|
||||
returnError bool
|
||||
}
|
||||
|
||||
func (m *mockSpanExporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
|
||||
m.exportSpansCalled = true
|
||||
if m.returnError {
|
||||
return errors.New("export error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockSpanExporter) Shutdown(ctx context.Context) error {
|
||||
m.shutdownCalled = true
|
||||
if m.returnError {
|
||||
return errors.New("shutdown error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func TestCompositeExporterExportSpans(t *testing.T) {
|
||||
mockExporter1 := &mockSpanExporter{}
|
||||
mockExporter2 := &mockSpanExporter{}
|
||||
composite := newCompositeExporter(mockExporter1, mockExporter2)
|
||||
|
||||
err := composite.ExportSpans(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Errorf("ExportSpans() error = %v", err)
|
||||
}
|
||||
|
||||
if !mockExporter1.exportSpansCalled || !mockExporter2.exportSpansCalled {
|
||||
t.Error("ExportSpans was not called on all exporters")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompositeExporterExportSpans_Error(t *testing.T) {
|
||||
mockExporter1 := &mockSpanExporter{returnError: true}
|
||||
mockExporter2 := &mockSpanExporter{}
|
||||
composite := newCompositeExporter(mockExporter1, mockExporter2)
|
||||
|
||||
err := composite.ExportSpans(context.Background(), nil)
|
||||
if err == nil {
|
||||
t.Error("Expected error from ExportSpans, but got none")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompositeExporterShutdown(t *testing.T) {
|
||||
mockExporter1 := &mockSpanExporter{}
|
||||
mockExporter2 := &mockSpanExporter{}
|
||||
composite := newCompositeExporter(mockExporter1, mockExporter2)
|
||||
|
||||
err := composite.Shutdown(context.Background())
|
||||
if err != nil {
|
||||
t.Errorf("Shutdown() error = %v", err)
|
||||
}
|
||||
|
||||
if !mockExporter1.shutdownCalled || !mockExporter2.shutdownCalled {
|
||||
t.Error("Shutdown was not called on all exporters")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompositeExporterShutdown_Error(t *testing.T) {
|
||||
mockExporter1 := &mockSpanExporter{returnError: true}
|
||||
mockExporter2 := &mockSpanExporter{}
|
||||
composite := newCompositeExporter(mockExporter1, mockExporter2)
|
||||
|
||||
err := composite.Shutdown(context.Background())
|
||||
if err == nil {
|
||||
t.Error("Expected error from Shutdown, but got none")
|
||||
}
|
||||
}
|
|
@ -3,9 +3,11 @@ package tracing
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/distribution/distribution/v3/internal/dcontext"
|
||||
"github.com/distribution/distribution/v3/version"
|
||||
"go.opentelemetry.io/contrib/exporters/autoexport"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
|
@ -18,8 +20,25 @@ const (
|
|||
|
||||
// DefaultSamplingRatio default sample ratio
|
||||
defaultSamplingRatio = 1
|
||||
|
||||
// AttributePrefix defines a standardized prefix for custom telemetry attributes
|
||||
// associated with the CNCF Distribution project.
|
||||
AttributePrefix = "io.cncf.distribution."
|
||||
)
|
||||
|
||||
// loggerWriter is a custom writer that implements the io.Writer interface.
|
||||
// It is designed to redirect log messages to the Logger interface, specifically
|
||||
// for use with OpenTelemetry's stdouttrace exporter.
|
||||
type loggerWriter struct {
|
||||
logger dcontext.Logger // Use the Logger interface
|
||||
}
|
||||
|
||||
// Write logs the data using the Debug level of the provided logger.
|
||||
func (lw *loggerWriter) Write(p []byte) (n int, err error) {
|
||||
lw.logger.Debug(string(p))
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// InitOpenTelemetry initializes OpenTelemetry for the application. This function sets up the
|
||||
// necessary components for collecting telemetry data, such as traces.
|
||||
func InitOpenTelemetry(ctx context.Context) error {
|
||||
|
@ -29,12 +48,23 @@ func InitOpenTelemetry(ctx context.Context) error {
|
|||
semconv.ServiceVersionKey.String(version.Version()),
|
||||
)
|
||||
|
||||
exp, err := autoexport.NewSpanExporter(ctx)
|
||||
autoExp, err := autoexport.NewSpanExporter(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sp := sdktrace.NewBatchSpanProcessor(exp)
|
||||
lw := &loggerWriter{
|
||||
logger: dcontext.GetLogger(ctx),
|
||||
}
|
||||
|
||||
loggerExp, err := stdouttrace.New(stdouttrace.WithWriter(lw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
compositeExp := newCompositeExporter(autoExp, loggerExp)
|
||||
|
||||
sp := sdktrace.NewBatchSpanProcessor(compositeExp)
|
||||
provider := sdktrace.NewTracerProvider(
|
||||
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(defaultSamplingRatio)),
|
||||
sdktrace.WithResource(res),
|
||||
|
|
Loading…
Reference in a new issue