From f690b3ebe26f14c5d83998b2fec73f7491ec14d2 Mon Sep 17 00:00:00 2001 From: gotgelf Date: Mon, 18 Dec 2023 22:04:22 +0100 Subject: [PATCH] Added Open Telemetry Tracing to Filesystem package Signed-off-by: gotgelf --- go.mod | 10 +- registry/storage/driver/base/base.go | 131 +++++++++++++++++++++++---- tracing/exporter.go | 42 +++++++++ tracing/exporter_test.go | 82 +++++++++++++++++ tracing/tracing.go | 34 ++++++- 5 files changed, 274 insertions(+), 25 deletions(-) create mode 100644 tracing/exporter.go create mode 100644 tracing/exporter_test.go diff --git a/go.mod b/go.mod index 00cf41974..b0b02f951 100644 --- a/go.mod +++ b/go.mod @@ -29,8 +29,13 @@ require ( github.com/spf13/cobra v1.8.0 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/contrib/exporters/autoexport v0.46.1 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 + go.opentelemetry.io/otel v1.21.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 go.opentelemetry.io/otel/sdk v1.21.0 + go.opentelemetry.io/otel/trace v1.21.0 golang.org/x/crypto v0.18.0 + golang.org/x/net v0.20.0 golang.org/x/oauth2 v0.11.0 google.golang.org/api v0.126.0 gopkg.in/yaml.v2 v2.4.0 @@ -73,8 +78,6 @@ require ( github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 - go.opentelemetry.io/otel v1.21.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.44.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect @@ -82,12 +85,9 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.44.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0 // indirect - go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 // indirect go.opentelemetry.io/otel/metric v1.21.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect - golang.org/x/net v0.20.0 golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/registry/storage/driver/base/base.go b/registry/storage/driver/base/base.go index 32c9037ba..9dbb609ea 100644 --- a/registry/storage/driver/base/base.go +++ b/registry/storage/driver/base/base.go @@ -46,12 +46,20 @@ import ( "github.com/distribution/distribution/v3/internal/dcontext" prometheus "github.com/distribution/distribution/v3/metrics" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" + "github.com/distribution/distribution/v3/tracing" "github.com/docker/go-metrics" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // storageAction is the metrics of blob related operations var storageAction = prometheus.StorageNamespace.NewLabeledTimer("action", "The number of seconds that the storage action takes", "driver", "action") +// tracer is the OpenTelemetry tracer utilized for tracing operations within +// this package's code. +var tracer = otel.Tracer("github.com/distribution/distribution/v3/registry/storage/driver/base") + func init() { metrics.Register(prometheus.StorageNamespace) } @@ -89,8 +97,16 @@ func (base *Base) setDriverName(e error) error { // GetContent wraps GetContent of underlying storage driver. func (base *Base) GetContent(ctx context.Context, path string) ([]byte, error) { - ctx, done := dcontext.WithTrace(ctx) - defer done("%s.GetContent(%q)", base.Name(), path) + attrs := []attribute.KeyValue{ + attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()), + attribute.String(tracing.AttributePrefix+"storage.path", path), + } + ctx, span := tracer.Start( + ctx, + "GetContent", + trace.WithAttributes(attrs...)) + + defer span.End() if !storagedriver.PathRegexp.MatchString(path) { return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} @@ -104,8 +120,17 @@ func (base *Base) GetContent(ctx context.Context, path string) ([]byte, error) { // PutContent wraps PutContent of underlying storage driver. func (base *Base) PutContent(ctx context.Context, path string, content []byte) error { - ctx, done := dcontext.WithTrace(ctx) - defer done("%s.PutContent(%q)", base.Name(), path) + attrs := []attribute.KeyValue{ + attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()), + attribute.String(tracing.AttributePrefix+"storage.path", path), + attribute.Int(tracing.AttributePrefix+"storage.content.length", len(content)), + } + ctx, span := tracer.Start( + ctx, + "PutContent", + trace.WithAttributes(attrs...)) + + defer span.End() if !storagedriver.PathRegexp.MatchString(path) { return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} @@ -119,8 +144,17 @@ func (base *Base) PutContent(ctx context.Context, path string, content []byte) e // Reader wraps Reader of underlying storage driver. func (base *Base) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { - ctx, done := dcontext.WithTrace(ctx) - defer done("%s.Reader(%q, %d)", base.Name(), path, offset) + attrs := []attribute.KeyValue{ + attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()), + attribute.String(tracing.AttributePrefix+"storage.path", path), + attribute.Int64(tracing.AttributePrefix+"storage.offset", offset), + } + ctx, span := tracer.Start( + ctx, + "Reader", + trace.WithAttributes(attrs...)) + + defer span.End() if offset < 0 { return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset, DriverName: base.StorageDriver.Name()} @@ -136,8 +170,17 @@ func (base *Base) Reader(ctx context.Context, path string, offset int64) (io.Rea // Writer wraps Writer of underlying storage driver. func (base *Base) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { - ctx, done := dcontext.WithTrace(ctx) - defer done("%s.Writer(%q, %v)", base.Name(), path, append) + attrs := []attribute.KeyValue{ + attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()), + attribute.String(tracing.AttributePrefix+"storage.path", path), + attribute.Bool(tracing.AttributePrefix+"storage.append", append), + } + ctx, span := tracer.Start( + ctx, + "Writer", + trace.WithAttributes(attrs...)) + + defer span.End() if !storagedriver.PathRegexp.MatchString(path) { return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} @@ -149,8 +192,16 @@ func (base *Base) Writer(ctx context.Context, path string, append bool) (storage // Stat wraps Stat of underlying storage driver. func (base *Base) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { - ctx, done := dcontext.WithTrace(ctx) - defer done("%s.Stat(%q)", base.Name(), path) + attrs := []attribute.KeyValue{ + attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()), + attribute.String(tracing.AttributePrefix+"storage.path", path), + } + ctx, span := tracer.Start( + ctx, + "Stat", + trace.WithAttributes(attrs...)) + + defer span.End() if !storagedriver.PathRegexp.MatchString(path) && path != "/" { return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} @@ -164,8 +215,16 @@ func (base *Base) Stat(ctx context.Context, path string) (storagedriver.FileInfo // List wraps List of underlying storage driver. func (base *Base) List(ctx context.Context, path string) ([]string, error) { - ctx, done := dcontext.WithTrace(ctx) - defer done("%s.List(%q)", base.Name(), path) + attrs := []attribute.KeyValue{ + attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()), + attribute.String(tracing.AttributePrefix+"storage.path", path), + } + ctx, span := tracer.Start( + ctx, + "List", + trace.WithAttributes(attrs...)) + + defer span.End() if !storagedriver.PathRegexp.MatchString(path) && path != "/" { return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} @@ -179,6 +238,18 @@ func (base *Base) List(ctx context.Context, path string) ([]string, error) { // Move wraps Move of underlying storage driver. func (base *Base) Move(ctx context.Context, sourcePath string, destPath string) error { + attrs := []attribute.KeyValue{ + attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()), + attribute.String(tracing.AttributePrefix+"storage.source.path", sourcePath), + attribute.String(tracing.AttributePrefix+"storage.dest.path", destPath), + } + ctx, span := tracer.Start( + ctx, + "Move", + trace.WithAttributes(attrs...)) + + defer span.End() + ctx, done := dcontext.WithTrace(ctx) defer done("%s.Move(%q, %q", base.Name(), sourcePath, destPath) @@ -196,8 +267,16 @@ func (base *Base) Move(ctx context.Context, sourcePath string, destPath string) // Delete wraps Delete of underlying storage driver. func (base *Base) Delete(ctx context.Context, path string) error { - ctx, done := dcontext.WithTrace(ctx) - defer done("%s.Delete(%q)", base.Name(), path) + attrs := []attribute.KeyValue{ + attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()), + attribute.String(tracing.AttributePrefix+"storage.path", path), + } + ctx, span := tracer.Start( + ctx, + "Delete", + trace.WithAttributes(attrs...)) + + defer span.End() if !storagedriver.PathRegexp.MatchString(path) { return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} @@ -211,8 +290,16 @@ func (base *Base) Delete(ctx context.Context, path string) error { // RedirectURL wraps RedirectURL of the underlying storage driver. func (base *Base) RedirectURL(r *http.Request, path string) (string, error) { - ctx, done := dcontext.WithTrace(r.Context()) - defer done("%s.RedirectURL(%q)", base.Name(), path) + attrs := []attribute.KeyValue{ + attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()), + attribute.String(tracing.AttributePrefix+"storage.path", path), + } + ctx, span := tracer.Start( + r.Context(), + "RedirectURL", + trace.WithAttributes(attrs...)) + + defer span.End() if !storagedriver.PathRegexp.MatchString(path) { return "", storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} @@ -226,8 +313,16 @@ func (base *Base) RedirectURL(r *http.Request, path string) (string, error) { // Walk wraps Walk of underlying storage driver. func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error { - ctx, done := dcontext.WithTrace(ctx) - defer done("%s.Walk(%q)", base.Name(), path) + attrs := []attribute.KeyValue{ + attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()), + attribute.String(tracing.AttributePrefix+"storage.path", path), + } + ctx, span := tracer.Start( + ctx, + "Walk", + trace.WithAttributes(attrs...)) + + defer span.End() if !storagedriver.PathRegexp.MatchString(path) && path != "/" { return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} diff --git a/tracing/exporter.go b/tracing/exporter.go new file mode 100644 index 000000000..483c9b124 --- /dev/null +++ b/tracing/exporter.go @@ -0,0 +1,42 @@ +package tracing + +import ( + "context" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// compositeExporter is a custom exporter that wraps multiple SpanExporters. +// It allows you to export spans to multiple destinations, e.g., different telemetry backends. +type compositeExporter struct { + exporters []sdktrace.SpanExporter +} + +func newCompositeExporter(exporters ...sdktrace.SpanExporter) *compositeExporter { + return &compositeExporter{exporters: exporters} +} + +// ExportSpans iterates over each SpanExporter in the compositeExporter and +// exports the spans. If any exporter returns an error, the process is stopped +// and the error is returned. This ensures that span exporting behaves correctly +// and reports errors as expected. +func (ce *compositeExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { + for _, exporter := range ce.exporters { + if err := exporter.ExportSpans(ctx, spans); err != nil { + return err + } + } + return nil +} + +// Shutdown iterates over each SpanExporter in the compositeExporter and +// shuts them down. If any exporter returns an error during shutdown, the process +// is stopped and the error is returned. This ensures proper shutdown of all exporters. +func (ce *compositeExporter) Shutdown(ctx context.Context) error { + for _, exporter := range ce.exporters { + if err := exporter.Shutdown(ctx); err != nil { + return err + } + } + return nil +} diff --git a/tracing/exporter_test.go b/tracing/exporter_test.go new file mode 100644 index 000000000..ec5517355 --- /dev/null +++ b/tracing/exporter_test.go @@ -0,0 +1,82 @@ +package tracing + +import ( + "context" + "errors" + "testing" + + "go.opentelemetry.io/otel/sdk/trace" +) + +type mockSpanExporter struct { + exportSpansCalled bool + shutdownCalled bool + returnError bool +} + +func (m *mockSpanExporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error { + m.exportSpansCalled = true + if m.returnError { + return errors.New("export error") + } + return nil +} + +func (m *mockSpanExporter) Shutdown(ctx context.Context) error { + m.shutdownCalled = true + if m.returnError { + return errors.New("shutdown error") + } + return nil +} +func TestCompositeExporterExportSpans(t *testing.T) { + mockExporter1 := &mockSpanExporter{} + mockExporter2 := &mockSpanExporter{} + composite := newCompositeExporter(mockExporter1, mockExporter2) + + err := composite.ExportSpans(context.Background(), nil) + if err != nil { + t.Errorf("ExportSpans() error = %v", err) + } + + if !mockExporter1.exportSpansCalled || !mockExporter2.exportSpansCalled { + t.Error("ExportSpans was not called on all exporters") + } +} + +func TestCompositeExporterExportSpans_Error(t *testing.T) { + mockExporter1 := &mockSpanExporter{returnError: true} + mockExporter2 := &mockSpanExporter{} + composite := newCompositeExporter(mockExporter1, mockExporter2) + + err := composite.ExportSpans(context.Background(), nil) + if err == nil { + t.Error("Expected error from ExportSpans, but got none") + } +} + +func TestCompositeExporterShutdown(t *testing.T) { + mockExporter1 := &mockSpanExporter{} + mockExporter2 := &mockSpanExporter{} + composite := newCompositeExporter(mockExporter1, mockExporter2) + + err := composite.Shutdown(context.Background()) + if err != nil { + t.Errorf("Shutdown() error = %v", err) + } + + if !mockExporter1.shutdownCalled || !mockExporter2.shutdownCalled { + t.Error("Shutdown was not called on all exporters") + } +} + +func TestCompositeExporterShutdown_Error(t *testing.T) { + mockExporter1 := &mockSpanExporter{returnError: true} + mockExporter2 := &mockSpanExporter{} + composite := newCompositeExporter(mockExporter1, mockExporter2) + + err := composite.Shutdown(context.Background()) + if err == nil { + t.Error("Expected error from Shutdown, but got none") + } +} diff --git a/tracing/tracing.go b/tracing/tracing.go index c75acf81e..899477ca7 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -3,9 +3,11 @@ package tracing import ( "context" + "github.com/distribution/distribution/v3/internal/dcontext" "github.com/distribution/distribution/v3/version" "go.opentelemetry.io/contrib/exporters/autoexport" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -18,8 +20,25 @@ const ( // DefaultSamplingRatio default sample ratio defaultSamplingRatio = 1 + + // AttributePrefix defines a standardized prefix for custom telemetry attributes + // associated with the CNCF Distribution project. + AttributePrefix = "io.cncf.distribution." ) +// loggerWriter is a custom writer that implements the io.Writer interface. +// It is designed to redirect log messages to the Logger interface, specifically +// for use with OpenTelemetry's stdouttrace exporter. +type loggerWriter struct { + logger dcontext.Logger // Use the Logger interface +} + +// Write logs the data using the Debug level of the provided logger. +func (lw *loggerWriter) Write(p []byte) (n int, err error) { + lw.logger.Debug(string(p)) + return len(p), nil +} + // InitOpenTelemetry initializes OpenTelemetry for the application. This function sets up the // necessary components for collecting telemetry data, such as traces. func InitOpenTelemetry(ctx context.Context) error { @@ -29,12 +48,23 @@ func InitOpenTelemetry(ctx context.Context) error { semconv.ServiceVersionKey.String(version.Version()), ) - exp, err := autoexport.NewSpanExporter(ctx) + autoExp, err := autoexport.NewSpanExporter(ctx) if err != nil { return err } - sp := sdktrace.NewBatchSpanProcessor(exp) + lw := &loggerWriter{ + logger: dcontext.GetLogger(ctx), + } + + loggerExp, err := stdouttrace.New(stdouttrace.WithWriter(lw)) + if err != nil { + return err + } + + compositeExp := newCompositeExporter(autoExp, loggerExp) + + sp := sdktrace.NewBatchSpanProcessor(compositeExp) provider := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.TraceIDRatioBased(defaultSamplingRatio)), sdktrace.WithResource(res),