[#84] add tracing support #123
8 changed files with 194 additions and 4 deletions
|
@ -258,6 +258,8 @@ func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center aut
|
|||
// Attach user authentication for all S3 routes.
|
||||
AuthMiddleware(log, center),
|
||||
|
||||
TracingMiddleware(),
|
||||
|
||||
metricsMiddleware(log, h.ResolveBucket, appMetrics),
|
||||
|
||||
// -- logging error requests
|
||||
|
|
119
api/tracing.go
Normal file
119
api/tracing.go
Normal file
|
@ -0,0 +1,119 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"github.com/gorilla/mux"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// TracingMiddleware adds tracing support for requests.
|
||||
func TracingMiddleware() mux.MiddlewareFunc {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
appCtx, span := StartHTTPServerSpan(r, "REQUEST S3")
|
||||
lw := &traceResponseWriter{ResponseWriter: w, ctx: appCtx, span: span}
|
||||
h.ServeHTTP(lw, r.WithContext(appCtx))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type traceResponseWriter struct {
|
||||
sync.Once
|
||||
http.ResponseWriter
|
||||
|
||||
ctx context.Context
|
||||
span trace.Span
|
||||
}
|
||||
|
||||
func (lrw *traceResponseWriter) WriteHeader(code int) {
|
||||
lrw.Do(func() {
|
||||
lrw.span.SetAttributes(
|
||||
semconv.HTTPStatusCode(code),
|
||||
)
|
||||
|
||||
carrier := &httpResponseCarrier{resp: lrw.ResponseWriter}
|
||||
tracing.Propagator.Inject(lrw.ctx, carrier)
|
||||
|
||||
lrw.ResponseWriter.WriteHeader(code)
|
||||
lrw.span.End()
|
||||
})
|
||||
}
|
||||
|
||||
func (lrw *traceResponseWriter) Flush() {
|
||||
if f, ok := lrw.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
type httpResponseCarrier struct {
|
||||
resp http.ResponseWriter
|
||||
}
|
||||
|
||||
func (h httpResponseCarrier) Get(key string) string {
|
||||
return h.resp.Header().Get(key)
|
||||
}
|
||||
|
||||
func (h httpResponseCarrier) Set(key string, value string) {
|
||||
h.resp.Header().Set(key, value)
|
||||
}
|
||||
|
||||
func (h httpResponseCarrier) Keys() []string {
|
||||
result := make([]string, 0, len(h.resp.Header()))
|
||||
for key := range h.resp.Header() {
|
||||
result = append(result, key)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
type httpRequestCarrier struct {
|
||||
req *http.Request
|
||||
}
|
||||
|
||||
func (c *httpRequestCarrier) Get(key string) string {
|
||||
bytes := c.req.Header.Get(key)
|
||||
if len(bytes) == 0 {
|
||||
return ""
|
||||
}
|
||||
return bytes
|
||||
}
|
||||
|
||||
func (c *httpRequestCarrier) Set(key string, value string) {
|
||||
c.req.Response.Header.Set(key, value)
|
||||
}
|
||||
|
||||
func (c *httpRequestCarrier) Keys() []string {
|
||||
result := make([]string, 0, len(c.req.Header))
|
||||
for key := range c.req.Header {
|
||||
result = append(result, key)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func extractHTTPTraceInfo(ctx context.Context, req *http.Request) context.Context {
|
||||
if req == nil {
|
||||
return ctx
|
||||
}
|
||||
carrier := &httpRequestCarrier{req: req}
|
||||
return tracing.Propagator.Extract(ctx, carrier)
|
||||
}
|
||||
|
||||
// StartHTTPServerSpan starts root HTTP server span.
|
||||
func StartHTTPServerSpan(r *http.Request, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
||||
ctx := extractHTTPTraceInfo(r.Context(), r)
|
||||
opts = append(opts, trace.WithAttributes(
|
||||
attribute.String("s3.client_address", r.RemoteAddr),
|
||||
attribute.String("s3.path", r.Host),
|
||||
semconv.HTTPMethod(r.Method),
|
||||
semconv.RPCService("frostfs-s3-gw"),
|
||||
attribute.String("s3.query", r.RequestURI),
|
||||
), trace.WithSpanKind(trace.SpanKindServer))
|
||||
return tracing.StartSpanFromContext(ctx, operationName, opts...)
|
||||
}
|
|
@ -13,6 +13,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
||||
|
@ -110,6 +111,7 @@ func (a *App) init(ctx context.Context) {
|
|||
a.initAPI(ctx)
|
||||
a.initMetrics()
|
||||
a.initServers(ctx)
|
||||
a.initTracing(ctx)
|
||||
}
|
||||
|
||||
func (a *App) initLayer(ctx context.Context) {
|
||||
|
@ -214,6 +216,38 @@ func (a *App) getResolverConfig() ([]string, *resolver.Config) {
|
|||
return order, resolveCfg
|
||||
}
|
||||
|
||||
func (a *App) initTracing(ctx context.Context) {
|
||||
instanceID := ""
|
||||
if len(a.servers) > 0 {
|
||||
instanceID = a.servers[0].Address()
|
||||
}
|
||||
cfg := tracing.Config{
|
||||
Enabled: a.cfg.GetBool(cfgTracingEnabled),
|
||||
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
|
||||
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
|
||||
Service: "frostfs-s3-gw",
|
||||
InstanceID: instanceID,
|
||||
Version: version.Version,
|
||||
}
|
||||
updated, err := tracing.Setup(ctx, cfg)
|
||||
if err != nil {
|
||||
a.log.Warn("failed to initialize tracing", zap.Error(err))
|
||||
}
|
||||
if updated {
|
||||
a.log.Info("tracing config updated")
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) shutdownTracing() {
|
||||
const tracingShutdownTimeout = 5 * time.Second
|
||||
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
if err := tracing.Shutdown(shdnCtx); err != nil {
|
||||
a.log.Warn("failed to shutdown tracing", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func newMaxClients(cfg *viper.Viper) api.MaxClients {
|
||||
maxClientsCount := cfg.GetInt(cfgMaxClientsCount)
|
||||
if maxClientsCount <= 0 {
|
||||
|
@ -462,7 +496,7 @@ LOOP:
|
|||
case <-ctx.Done():
|
||||
break LOOP
|
||||
case <-sigs:
|
||||
a.configReload()
|
||||
a.configReload(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -473,6 +507,7 @@ LOOP:
|
|||
|
||||
a.metrics.Shutdown()
|
||||
a.stopServices()
|
||||
a.shutdownTracing()
|
||||
|
||||
close(a.webDone)
|
||||
}
|
||||
|
@ -481,7 +516,7 @@ func shutdownContext() (context.Context, context.CancelFunc) {
|
|||
return context.WithTimeout(context.Background(), defaultShutdownTimeout)
|
||||
}
|
||||
|
||||
func (a *App) configReload() {
|
||||
func (a *App) configReload(ctx context.Context) {
|
||||
a.log.Info("SIGHUP config reload started")
|
||||
|
||||
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
||||
|
@ -507,6 +542,7 @@ func (a *App) configReload() {
|
|||
a.updateSettings()
|
||||
|
||||
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
||||
a.initTracing(ctx)
|
||||
a.setHealthStatus()
|
||||
|
||||
a.log.Info("SIGHUP config reload completed")
|
||||
|
|
|
@ -98,6 +98,11 @@ const ( // Settings.
|
|||
cfgPProfEnabled = "pprof.enabled"
|
||||
cfgPProfAddress = "pprof.address"
|
||||
|
||||
// Tracing.
|
||||
cfgTracingEnabled = "tracing.enabled"
|
||||
cfgTracingExporter = "tracing.exporter"
|
||||
cfgTracingEndpoint = "tracing.endpoint"
|
||||
|
||||
cfgListenDomains = "listen_domains"
|
||||
|
||||
// Peers.
|
||||
|
|
|
@ -141,3 +141,7 @@ S3_GW_RESOLVE_BUCKET_ALLOW=container
|
|||
S3_GW_KLUDGE_USE_DEFAULT_XMLNS_FOR_COMPLETE_MULTIPART=false
|
||||
# Set timeout between whitespace transmissions during CompleteMultipartUpload processing.
|
||||
S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s
|
||||
|
||||
S3_GW_TRACING_ENABLED=false
|
||||
S3_GW_TRACING_ENDPOINT="localhost:4318"
|
||||
S3_GW_TRACING_EXPORTER="otlp_grpc"
|
||||
|
|
|
@ -64,6 +64,11 @@ prometheus:
|
|||
enabled: false
|
||||
address: localhost:8086
|
||||
|
||||
tracing:
|
||||
enabled: false
|
||||
exporter: "otlp_grpc"
|
||||
endpoint: "localhost:4318"
|
||||
|
||||
# Timeout to connect to a node
|
||||
connect_timeout: 10s
|
||||
# Timeout for individual operations in streaming RPC.
|
||||
|
|
|
@ -182,6 +182,7 @@ There are some custom types used for brevity:
|
|||
| `cors` | [CORS configuration](#cors-section) |
|
||||
| `pprof` | [Pprof configuration](#pprof-section) |
|
||||
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
||||
| `tracing` | [Tracing configuration](#tracing-section) |
|
||||
| `frostfs` | [Parameters of requests to FrostFS](#frostfs-section) |
|
||||
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
||||
| `kludge` | [Different kludge configuration](#kludge-section) |
|
||||
|
@ -499,6 +500,24 @@ prometheus:
|
|||
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
|
||||
| `address` | `string` | yes | `localhost:8086` | Address that service listener binds to. |
|
||||
|
||||
# `tracing` section
|
||||
|
||||
Contains configuration for the `tracing` service.
|
||||
|
||||
```yaml
|
||||
tracing:
|
||||
enabled: false
|
||||
exporter: "otlp_grpc"
|
||||
endpoint: "localhost:4318"
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|-------------|----------|---------------|---------------|-----------------------------------------|
|
||||
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
|
||||
| `exporter` | `string` | yes | `` | Type of tracing exporter. |
|
||||
| `endpoint` | `string` | yes | `` | Address that service listener binds to. |
|
||||
|
||||
|
||||
# `frostfs` section
|
||||
|
||||
Contains parameters of requests to FrostFS.
|
||||
|
|
4
go.mod
4
go.mod
|
@ -20,6 +20,8 @@ require (
|
|||
github.com/spf13/viper v1.15.0
|
||||
github.com/stretchr/testify v1.8.2
|
||||
github.com/urfave/cli/v2 v2.3.0
|
||||
go.opentelemetry.io/otel v1.14.0
|
||||
go.opentelemetry.io/otel/trace v1.14.0
|
||||
go.uber.org/zap v1.24.0
|
||||
golang.org/x/crypto v0.4.0
|
||||
google.golang.org/grpc v1.53.0
|
||||
|
@ -71,13 +73,11 @@ require (
|
|||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
||||
github.com/subosito/gotenv v1.4.2 // indirect
|
||||
github.com/urfave/cli v1.22.5 // indirect
|
||||
go.opentelemetry.io/otel v1.14.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.14.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
go.uber.org/multierr v1.9.0 // indirect
|
||||
|
|
Loading…
Reference in a new issue