[#84] add tracing support #123
8 changed files with 194 additions and 4 deletions
|
@ -258,6 +258,8 @@ func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center aut
|
||||||
// Attach user authentication for all S3 routes.
|
// Attach user authentication for all S3 routes.
|
||||||
AuthMiddleware(log, center),
|
AuthMiddleware(log, center),
|
||||||
|
|
||||||
|
TracingMiddleware(),
|
||||||
|
|
||||||
metricsMiddleware(log, h.ResolveBucket, appMetrics),
|
metricsMiddleware(log, h.ResolveBucket, appMetrics),
|
||||||
|
|
||||||
// -- logging error requests
|
// -- logging error requests
|
||||||
|
|
119
api/tracing.go
Normal file
119
api/tracing.go
Normal file
|
@ -0,0 +1,119 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TracingMiddleware adds tracing support for requests.
|
||||||
|
func TracingMiddleware() mux.MiddlewareFunc {
|
||||||
|
return func(h http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
appCtx, span := StartHTTPServerSpan(r, "REQUEST S3")
|
||||||
dkirillov marked this conversation as resolved
Outdated
|
|||||||
|
lw := &traceResponseWriter{ResponseWriter: w, ctx: appCtx, span: span}
|
||||||
|
h.ServeHTTP(lw, r.WithContext(appCtx))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type traceResponseWriter struct {
|
||||||
|
sync.Once
|
||||||
|
http.ResponseWriter
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
span trace.Span
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lrw *traceResponseWriter) WriteHeader(code int) {
|
||||||
|
lrw.Do(func() {
|
||||||
|
lrw.span.SetAttributes(
|
||||||
|
semconv.HTTPStatusCode(code),
|
||||||
|
)
|
||||||
|
|
||||||
|
carrier := &httpResponseCarrier{resp: lrw.ResponseWriter}
|
||||||
|
tracing.Propagator.Inject(lrw.ctx, carrier)
|
||||||
|
|
||||||
|
lrw.ResponseWriter.WriteHeader(code)
|
||||||
|
lrw.span.End()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lrw *traceResponseWriter) Flush() {
|
||||||
|
if f, ok := lrw.ResponseWriter.(http.Flusher); ok {
|
||||||
|
f.Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type httpResponseCarrier struct {
|
||||||
|
resp http.ResponseWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h httpResponseCarrier) Get(key string) string {
|
||||||
|
return h.resp.Header().Get(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h httpResponseCarrier) Set(key string, value string) {
|
||||||
|
h.resp.Header().Set(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h httpResponseCarrier) Keys() []string {
|
||||||
|
result := make([]string, 0, len(h.resp.Header()))
|
||||||
|
for key := range h.resp.Header() {
|
||||||
|
result = append(result, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
type httpRequestCarrier struct {
|
||||||
|
req *http.Request
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
It seems we can omit explicit It seems we can omit explicit `ctx contextContext` param and use `r.Context()`
|
|||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpRequestCarrier) Get(key string) string {
|
||||||
|
bytes := c.req.Header.Get(key)
|
||||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
I suggest to use I suggest to use `s3` prefix instead of `http`. S3 protocol works on top of http anyway, but it would be easier to distinguish from http gateway.
|
|||||||
|
if len(bytes) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpRequestCarrier) Set(key string, value string) {
|
||||||
|
c.req.Response.Header.Set(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpRequestCarrier) Keys() []string {
|
||||||
|
result := make([]string, 0, len(c.req.Header))
|
||||||
|
for key := range c.req.Header {
|
||||||
|
result = append(result, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractHTTPTraceInfo(ctx context.Context, req *http.Request) context.Context {
|
||||||
|
if req == nil {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
carrier := &httpRequestCarrier{req: req}
|
||||||
|
return tracing.Propagator.Extract(ctx, carrier)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartHTTPServerSpan starts root HTTP server span.
|
||||||
|
func StartHTTPServerSpan(r *http.Request, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
||||||
|
ctx := extractHTTPTraceInfo(r.Context(), r)
|
||||||
|
opts = append(opts, trace.WithAttributes(
|
||||||
|
attribute.String("s3.client_address", r.RemoteAddr),
|
||||||
|
attribute.String("s3.path", r.Host),
|
||||||
|
semconv.HTTPMethod(r.Method),
|
||||||
|
semconv.RPCService("frostfs-s3-gw"),
|
||||||
|
attribute.String("s3.query", r.RequestURI),
|
||||||
|
), trace.WithSpanKind(trace.SpanKindServer))
|
||||||
|
return tracing.StartSpanFromContext(ctx, operationName, opts...)
|
||||||
|
}
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
||||||
|
@ -110,6 +111,7 @@ func (a *App) init(ctx context.Context) {
|
||||||
a.initAPI(ctx)
|
a.initAPI(ctx)
|
||||||
a.initMetrics()
|
a.initMetrics()
|
||||||
a.initServers(ctx)
|
a.initServers(ctx)
|
||||||
|
a.initTracing(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) initLayer(ctx context.Context) {
|
func (a *App) initLayer(ctx context.Context) {
|
||||||
|
@ -214,6 +216,38 @@ func (a *App) getResolverConfig() ([]string, *resolver.Config) {
|
||||||
return order, resolveCfg
|
return order, resolveCfg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *App) initTracing(ctx context.Context) {
|
||||||
|
instanceID := ""
|
||||||
|
if len(a.servers) > 0 {
|
||||||
|
instanceID = a.servers[0].Address()
|
||||||
|
}
|
||||||
|
cfg := tracing.Config{
|
||||||
|
Enabled: a.cfg.GetBool(cfgTracingEnabled),
|
||||||
|
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
|
||||||
|
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
|
||||||
|
Service: "frostfs-s3-gw",
|
||||||
|
InstanceID: instanceID,
|
||||||
|
Version: version.Version,
|
||||||
|
}
|
||||||
|
updated, err := tracing.Setup(ctx, cfg)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Warn("failed to initialize tracing", zap.Error(err))
|
||||||
|
}
|
||||||
|
if updated {
|
||||||
|
a.log.Info("tracing config updated")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *App) shutdownTracing() {
|
||||||
|
const tracingShutdownTimeout = 5 * time.Second
|
||||||
|
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := tracing.Shutdown(shdnCtx); err != nil {
|
||||||
|
a.log.Warn("failed to shutdown tracing", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newMaxClients(cfg *viper.Viper) api.MaxClients {
|
func newMaxClients(cfg *viper.Viper) api.MaxClients {
|
||||||
maxClientsCount := cfg.GetInt(cfgMaxClientsCount)
|
maxClientsCount := cfg.GetInt(cfgMaxClientsCount)
|
||||||
if maxClientsCount <= 0 {
|
if maxClientsCount <= 0 {
|
||||||
|
@ -462,7 +496,7 @@ LOOP:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break LOOP
|
break LOOP
|
||||||
case <-sigs:
|
case <-sigs:
|
||||||
a.configReload()
|
a.configReload(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -473,6 +507,7 @@ LOOP:
|
||||||
|
|
||||||
a.metrics.Shutdown()
|
a.metrics.Shutdown()
|
||||||
a.stopServices()
|
a.stopServices()
|
||||||
|
a.shutdownTracing()
|
||||||
|
|
||||||
close(a.webDone)
|
close(a.webDone)
|
||||||
}
|
}
|
||||||
|
@ -481,7 +516,7 @@ func shutdownContext() (context.Context, context.CancelFunc) {
|
||||||
return context.WithTimeout(context.Background(), defaultShutdownTimeout)
|
return context.WithTimeout(context.Background(), defaultShutdownTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) configReload() {
|
func (a *App) configReload(ctx context.Context) {
|
||||||
a.log.Info("SIGHUP config reload started")
|
a.log.Info("SIGHUP config reload started")
|
||||||
|
|
||||||
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
||||||
|
@ -507,6 +542,7 @@ func (a *App) configReload() {
|
||||||
a.updateSettings()
|
a.updateSettings()
|
||||||
|
|
||||||
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
||||||
|
a.initTracing(ctx)
|
||||||
a.setHealthStatus()
|
a.setHealthStatus()
|
||||||
|
|
||||||
a.log.Info("SIGHUP config reload completed")
|
a.log.Info("SIGHUP config reload completed")
|
||||||
|
|
|
@ -98,6 +98,11 @@ const ( // Settings.
|
||||||
cfgPProfEnabled = "pprof.enabled"
|
cfgPProfEnabled = "pprof.enabled"
|
||||||
cfgPProfAddress = "pprof.address"
|
cfgPProfAddress = "pprof.address"
|
||||||
|
|
||||||
|
// Tracing.
|
||||||
alexvanin marked this conversation as resolved
Outdated
|
|||||||
|
cfgTracingEnabled = "tracing.enabled"
|
||||||
|
cfgTracingExporter = "tracing.exporter"
|
||||||
|
cfgTracingEndpoint = "tracing.endpoint"
|
||||||
|
|
||||||
cfgListenDomains = "listen_domains"
|
cfgListenDomains = "listen_domains"
|
||||||
|
|
||||||
// Peers.
|
// Peers.
|
||||||
|
|
|
@ -141,3 +141,7 @@ S3_GW_RESOLVE_BUCKET_ALLOW=container
|
||||||
S3_GW_KLUDGE_USE_DEFAULT_XMLNS_FOR_COMPLETE_MULTIPART=false
|
S3_GW_KLUDGE_USE_DEFAULT_XMLNS_FOR_COMPLETE_MULTIPART=false
|
||||||
# Set timeout between whitespace transmissions during CompleteMultipartUpload processing.
|
# Set timeout between whitespace transmissions during CompleteMultipartUpload processing.
|
||||||
S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s
|
S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s
|
||||||
|
|
||||||
|
S3_GW_TRACING_ENABLED=false
|
||||||
|
S3_GW_TRACING_ENDPOINT="localhost:4318"
|
||||||
|
S3_GW_TRACING_EXPORTER="otlp_grpc"
|
||||||
|
|
|
@ -64,6 +64,11 @@ prometheus:
|
||||||
enabled: false
|
enabled: false
|
||||||
address: localhost:8086
|
address: localhost:8086
|
||||||
|
|
||||||
|
tracing:
|
||||||
|
enabled: false
|
||||||
|
exporter: "otlp_grpc"
|
||||||
|
endpoint: "localhost:4318"
|
||||||
|
|
||||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
Update Update `docs/configuration.md`
|
|||||||
# Timeout to connect to a node
|
# Timeout to connect to a node
|
||||||
connect_timeout: 10s
|
connect_timeout: 10s
|
||||||
# Timeout for individual operations in streaming RPC.
|
# Timeout for individual operations in streaming RPC.
|
||||||
|
|
|
@ -182,6 +182,7 @@ There are some custom types used for brevity:
|
||||||
| `cors` | [CORS configuration](#cors-section) |
|
| `cors` | [CORS configuration](#cors-section) |
|
||||||
| `pprof` | [Pprof configuration](#pprof-section) |
|
| `pprof` | [Pprof configuration](#pprof-section) |
|
||||||
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
||||||
|
| `tracing` | [Tracing configuration](#tracing-section) |
|
||||||
| `frostfs` | [Parameters of requests to FrostFS](#frostfs-section) |
|
| `frostfs` | [Parameters of requests to FrostFS](#frostfs-section) |
|
||||||
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
||||||
| `kludge` | [Different kludge configuration](#kludge-section) |
|
| `kludge` | [Different kludge configuration](#kludge-section) |
|
||||||
|
@ -499,6 +500,24 @@ prometheus:
|
||||||
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
|
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
|
||||||
| `address` | `string` | yes | `localhost:8086` | Address that service listener binds to. |
|
| `address` | `string` | yes | `localhost:8086` | Address that service listener binds to. |
|
||||||
|
|
||||||
|
# `tracing` section
|
||||||
|
|
||||||
|
Contains configuration for the `tracing` service.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
tracing:
|
||||||
|
enabled: false
|
||||||
|
exporter: "otlp_grpc"
|
||||||
|
endpoint: "localhost:4318"
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
|-------------|----------|---------------|---------------|-----------------------------------------|
|
||||||
|
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
|
||||||
|
| `exporter` | `string` | yes | `` | Type of tracing exporter. |
|
||||||
|
| `endpoint` | `string` | yes | `` | Address that service listener binds to. |
|
||||||
dkirillov
commented
Actually, currently there are not default values for tracing. To be more precise current default values: Actually, currently there are not default values for tracing.
To be more precise current default values: `false`, `""`, `""`
dkirillov
commented
@pogpp It seems your last force push didn't change anything @pogpp It seems your last force push didn't change anything
|
|||||||
|
|
||||||
|
|
||||||
# `frostfs` section
|
# `frostfs` section
|
||||||
|
|
||||||
Contains parameters of requests to FrostFS.
|
Contains parameters of requests to FrostFS.
|
||||||
|
|
4
go.mod
4
go.mod
|
@ -20,6 +20,8 @@ require (
|
||||||
github.com/spf13/viper v1.15.0
|
github.com/spf13/viper v1.15.0
|
||||||
github.com/stretchr/testify v1.8.2
|
github.com/stretchr/testify v1.8.2
|
||||||
github.com/urfave/cli/v2 v2.3.0
|
github.com/urfave/cli/v2 v2.3.0
|
||||||
|
go.opentelemetry.io/otel v1.14.0
|
||||||
|
go.opentelemetry.io/otel/trace v1.14.0
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
golang.org/x/crypto v0.4.0
|
golang.org/x/crypto v0.4.0
|
||||||
google.golang.org/grpc v1.53.0
|
google.golang.org/grpc v1.53.0
|
||||||
|
@ -71,13 +73,11 @@ require (
|
||||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
||||||
github.com/subosito/gotenv v1.4.2 // indirect
|
github.com/subosito/gotenv v1.4.2 // indirect
|
||||||
github.com/urfave/cli v1.22.5 // indirect
|
github.com/urfave/cli v1.22.5 // indirect
|
||||||
go.opentelemetry.io/otel v1.14.0 // indirect
|
|
||||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
|
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/trace v1.14.0 // indirect
|
|
||||||
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
||||||
go.uber.org/atomic v1.10.0 // indirect
|
go.uber.org/atomic v1.10.0 // indirect
|
||||||
go.uber.org/multierr v1.9.0 // indirect
|
go.uber.org/multierr v1.9.0 // indirect
|
||||||
|
|
Loading…
Reference in a new issue
Let's write
REQUEST S3
instead of justREQUEST