[#84] add tracing support

Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
This commit is contained in:
Pavel Pogodaev 2023-05-31 19:35:20 +03:00
parent bd898ad59e
commit 4e1fd9589b
8 changed files with 194 additions and 4 deletions

View file

@ -258,6 +258,8 @@ func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center aut
// Attach user authentication for all S3 routes. // Attach user authentication for all S3 routes.
AuthMiddleware(log, center), AuthMiddleware(log, center),
TracingMiddleware(),
metricsMiddleware(log, h.ResolveBucket, appMetrics), metricsMiddleware(log, h.ResolveBucket, appMetrics),
// -- logging error requests // -- logging error requests

119
api/tracing.go Normal file
View file

@ -0,0 +1,119 @@
package api
import (
"context"
"net/http"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"github.com/gorilla/mux"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.opentelemetry.io/otel/trace"
)
// TracingMiddleware adds tracing support for requests.
func TracingMiddleware() mux.MiddlewareFunc {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
appCtx, span := StartHTTPServerSpan(r, "REQUEST S3")
lw := &traceResponseWriter{ResponseWriter: w, ctx: appCtx, span: span}
h.ServeHTTP(lw, r.WithContext(appCtx))
})
}
}
type traceResponseWriter struct {
sync.Once
http.ResponseWriter
ctx context.Context
span trace.Span
}
func (lrw *traceResponseWriter) WriteHeader(code int) {
lrw.Do(func() {
lrw.span.SetAttributes(
semconv.HTTPStatusCode(code),
)
carrier := &httpResponseCarrier{resp: lrw.ResponseWriter}
tracing.Propagator.Inject(lrw.ctx, carrier)
lrw.ResponseWriter.WriteHeader(code)
lrw.span.End()
})
}
func (lrw *traceResponseWriter) Flush() {
if f, ok := lrw.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}
type httpResponseCarrier struct {
resp http.ResponseWriter
}
func (h httpResponseCarrier) Get(key string) string {
return h.resp.Header().Get(key)
}
func (h httpResponseCarrier) Set(key string, value string) {
h.resp.Header().Set(key, value)
}
func (h httpResponseCarrier) Keys() []string {
result := make([]string, 0, len(h.resp.Header()))
for key := range h.resp.Header() {
result = append(result, key)
}
return result
}
type httpRequestCarrier struct {
req *http.Request
}
func (c *httpRequestCarrier) Get(key string) string {
bytes := c.req.Header.Get(key)
if len(bytes) == 0 {
return ""
}
return bytes
}
func (c *httpRequestCarrier) Set(key string, value string) {
c.req.Response.Header.Set(key, value)
}
func (c *httpRequestCarrier) Keys() []string {
result := make([]string, 0, len(c.req.Header))
for key := range c.req.Header {
result = append(result, key)
}
return result
}
func extractHTTPTraceInfo(ctx context.Context, req *http.Request) context.Context {
if req == nil {
return ctx
}
carrier := &httpRequestCarrier{req: req}
return tracing.Propagator.Extract(ctx, carrier)
}
// StartHTTPServerSpan starts root HTTP server span.
func StartHTTPServerSpan(r *http.Request, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
ctx := extractHTTPTraceInfo(r.Context(), r)
opts = append(opts, trace.WithAttributes(
attribute.String("s3.client_address", r.RemoteAddr),
attribute.String("s3.path", r.Host),
semconv.HTTPMethod(r.Method),
semconv.RPCService("frostfs-s3-gw"),
attribute.String("s3.query", r.RequestURI),
), trace.WithSpanKind(trace.SpanKindServer))
return tracing.StartSpanFromContext(ctx, operationName, opts...)
}

View file

@ -13,6 +13,7 @@ import (
"syscall" "syscall"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
@ -110,6 +111,7 @@ func (a *App) init(ctx context.Context) {
a.initAPI(ctx) a.initAPI(ctx)
a.initMetrics() a.initMetrics()
a.initServers(ctx) a.initServers(ctx)
a.initTracing(ctx)
} }
func (a *App) initLayer(ctx context.Context) { func (a *App) initLayer(ctx context.Context) {
@ -214,6 +216,38 @@ func (a *App) getResolverConfig() ([]string, *resolver.Config) {
return order, resolveCfg return order, resolveCfg
} }
func (a *App) initTracing(ctx context.Context) {
instanceID := ""
if len(a.servers) > 0 {
instanceID = a.servers[0].Address()
}
cfg := tracing.Config{
Enabled: a.cfg.GetBool(cfgTracingEnabled),
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
Service: "frostfs-s3-gw",
InstanceID: instanceID,
Version: version.Version,
}
updated, err := tracing.Setup(ctx, cfg)
if err != nil {
a.log.Warn("failed to initialize tracing", zap.Error(err))
}
if updated {
a.log.Info("tracing config updated")
}
}
func (a *App) shutdownTracing() {
const tracingShutdownTimeout = 5 * time.Second
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
defer cancel()
if err := tracing.Shutdown(shdnCtx); err != nil {
a.log.Warn("failed to shutdown tracing", zap.Error(err))
}
}
func newMaxClients(cfg *viper.Viper) api.MaxClients { func newMaxClients(cfg *viper.Viper) api.MaxClients {
maxClientsCount := cfg.GetInt(cfgMaxClientsCount) maxClientsCount := cfg.GetInt(cfgMaxClientsCount)
if maxClientsCount <= 0 { if maxClientsCount <= 0 {
@ -462,7 +496,7 @@ LOOP:
case <-ctx.Done(): case <-ctx.Done():
break LOOP break LOOP
case <-sigs: case <-sigs:
a.configReload() a.configReload(ctx)
} }
} }
@ -473,6 +507,7 @@ LOOP:
a.metrics.Shutdown() a.metrics.Shutdown()
a.stopServices() a.stopServices()
a.shutdownTracing()
close(a.webDone) close(a.webDone)
} }
@ -481,7 +516,7 @@ func shutdownContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultShutdownTimeout) return context.WithTimeout(context.Background(), defaultShutdownTimeout)
} }
func (a *App) configReload() { func (a *App) configReload(ctx context.Context) {
a.log.Info("SIGHUP config reload started") a.log.Info("SIGHUP config reload started")
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) { if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
@ -507,6 +542,7 @@ func (a *App) configReload() {
a.updateSettings() a.updateSettings()
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled)) a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.initTracing(ctx)
a.setHealthStatus() a.setHealthStatus()
a.log.Info("SIGHUP config reload completed") a.log.Info("SIGHUP config reload completed")

View file

@ -98,6 +98,11 @@ const ( // Settings.
cfgPProfEnabled = "pprof.enabled" cfgPProfEnabled = "pprof.enabled"
cfgPProfAddress = "pprof.address" cfgPProfAddress = "pprof.address"
// Tracing.
cfgTracingEnabled = "tracing.enabled"
cfgTracingExporter = "tracing.exporter"
cfgTracingEndpoint = "tracing.endpoint"
cfgListenDomains = "listen_domains" cfgListenDomains = "listen_domains"
// Peers. // Peers.

View file

@ -141,3 +141,7 @@ S3_GW_RESOLVE_BUCKET_ALLOW=container
S3_GW_KLUDGE_USE_DEFAULT_XMLNS_FOR_COMPLETE_MULTIPART=false S3_GW_KLUDGE_USE_DEFAULT_XMLNS_FOR_COMPLETE_MULTIPART=false
# Set timeout between whitespace transmissions during CompleteMultipartUpload processing. # Set timeout between whitespace transmissions during CompleteMultipartUpload processing.
S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s
S3_GW_TRACING_ENABLED=false
S3_GW_TRACING_ENDPOINT="localhost:4318"
S3_GW_TRACING_EXPORTER="otlp_grpc"

View file

@ -64,6 +64,11 @@ prometheus:
enabled: false enabled: false
address: localhost:8086 address: localhost:8086
tracing:
enabled: false
exporter: "otlp_grpc"
endpoint: "localhost:4318"
# Timeout to connect to a node # Timeout to connect to a node
connect_timeout: 10s connect_timeout: 10s
# Timeout for individual operations in streaming RPC. # Timeout for individual operations in streaming RPC.

View file

@ -182,6 +182,7 @@ There are some custom types used for brevity:
| `cors` | [CORS configuration](#cors-section) | | `cors` | [CORS configuration](#cors-section) |
| `pprof` | [Pprof configuration](#pprof-section) | | `pprof` | [Pprof configuration](#pprof-section) |
| `prometheus` | [Prometheus configuration](#prometheus-section) | | `prometheus` | [Prometheus configuration](#prometheus-section) |
| `tracing` | [Tracing configuration](#tracing-section) |
| `frostfs` | [Parameters of requests to FrostFS](#frostfs-section) | | `frostfs` | [Parameters of requests to FrostFS](#frostfs-section) |
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) | | `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
| `kludge` | [Different kludge configuration](#kludge-section) | | `kludge` | [Different kludge configuration](#kludge-section) |
@ -499,6 +500,24 @@ prometheus:
| `enabled` | `bool` | yes | `false` | Flag to enable the service. | | `enabled` | `bool` | yes | `false` | Flag to enable the service. |
| `address` | `string` | yes | `localhost:8086` | Address that service listener binds to. | | `address` | `string` | yes | `localhost:8086` | Address that service listener binds to. |
# `tracing` section
Contains configuration for the `tracing` service.
```yaml
tracing:
enabled: false
exporter: "otlp_grpc"
endpoint: "localhost:4318"
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|-------------|----------|---------------|---------------|-----------------------------------------|
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
| `exporter` | `string` | yes | `` | Type of tracing exporter. |
| `endpoint` | `string` | yes | `` | Address that service listener binds to. |
# `frostfs` section # `frostfs` section
Contains parameters of requests to FrostFS. Contains parameters of requests to FrostFS.

4
go.mod
View file

@ -20,6 +20,8 @@ require (
github.com/spf13/viper v1.15.0 github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.2 github.com/stretchr/testify v1.8.2
github.com/urfave/cli/v2 v2.3.0 github.com/urfave/cli/v2 v2.3.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
golang.org/x/crypto v0.4.0 golang.org/x/crypto v0.4.0
google.golang.org/grpc v1.53.0 google.golang.org/grpc v1.53.0
@ -71,13 +73,11 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect github.com/subosito/gotenv v1.4.2 // indirect
github.com/urfave/cli v1.22.5 // indirect github.com/urfave/cli v1.22.5 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
go.opentelemetry.io/otel/sdk v1.14.0 // indirect go.opentelemetry.io/otel/sdk v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect