From a945cdd42cbdbf4d29d6ab95a181dd038fd4d650 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 15 Mar 2023 11:07:44 +0300 Subject: [PATCH] [#20] get/head: Add tracing support Signed-off-by: Dmitrii Stepanov --- app.go | 41 ++++++++++++++++++- config/config.env | 4 ++ config/config.yaml | 4 ++ docs/gate-configuration.md | 18 +++++++++ downloader/download.go | 79 ++++++++++++++++++++++-------------- downloader/head.go | 47 ++++++++++++++-------- go.mod | 4 +- settings.go | 5 +++ utils/tracing.go | 82 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 234 insertions(+), 50 deletions(-) create mode 100644 utils/tracing.go diff --git a/app.go b/app.go index c2dabc0..487fdae 100644 --- a/app.go +++ b/app.go @@ -10,7 +10,9 @@ import ( "strconv" "sync" "syscall" + "time" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" @@ -166,6 +168,7 @@ func newApp(ctx context.Context, opt ...Option) App { a.initAppSettings() a.initResolver() a.initMetrics() + a.initTracing(ctx) return a } @@ -375,7 +378,7 @@ LOOP: case <-ctx.Done(): break LOOP case <-sigs: - a.configReload() + a.configReload(ctx) } } @@ -383,11 +386,22 @@ LOOP: a.metrics.Shutdown() a.stopServices() + a.shutdownTracing() close(a.webDone) } -func (a *app) configReload() { +func (a *app) shutdownTracing() { + const tracingShutdownTimeout = 5 * time.Second + shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout) + defer cancel() + + if err := tracing.Shutdown(shdnCtx); err != nil { + a.log.Warn("failed to shutdown tracing", zap.Error(err)) + } +} + +func (a *app) configReload(ctx context.Context) { a.log.Info("SIGHUP config reload started") if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) { a.log.Warn("failed to reload config because it's missed") @@ -418,6 +432,7 @@ func (a *app) configReload() { a.updateSettings() a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled)) + a.initTracing(ctx) a.setHealthStatus() a.log.Info("SIGHUP config reload completed") @@ -549,3 +564,25 @@ func (a *app) serverIndex(address string) int { } return -1 } + +func (a *app) initTracing(ctx context.Context) { + instanceID := "" + if len(a.servers) > 0 { + instanceID = a.servers[0].Address() + } + cfg := tracing.Config{ + Enabled: a.cfg.GetBool(cfgTracingEnabled), + Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)), + Endpoint: a.cfg.GetString(cfgTracingEndpoint), + Service: "frostfs-http-gw", + InstanceID: instanceID, + Version: Version, + } + updated, err := tracing.Setup(ctx, cfg) + if err != nil { + a.log.Warn("failed to initialize tracing", zap.Error(err)) + } + if updated { + a.log.Info("tracing config updated") + } +} diff --git a/config/config.env b/config/config.env index 4f13bb0..2d5ea94 100644 --- a/config/config.env +++ b/config/config.env @@ -92,3 +92,7 @@ HTTP_GW_POOL_ERROR_THRESHOLD=100 # Enable zip compression to download files by common prefix. HTTP_GW_ZIP_COMPRESSION=false + +HTTP_GW_TRACING_ENABLED=true +HTTP_GW_TRACING_ENDPOINT="localhost:4317" +HTTP_GW_TRACING_EXPORTER="otlp_grpc" \ No newline at end of file diff --git a/config/config.yaml b/config/config.yaml index 0a117df..510cb43 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -9,6 +9,10 @@ pprof: prometheus: enabled: false # Enable metrics. address: localhost:8084 +tracing: + enabled: true + exporter: "otlp_grpc" + endpoint: "localhost:4317" logger: level: debug # Log level. diff --git a/docs/gate-configuration.md b/docs/gate-configuration.md index 8323bdc..0d0504f 100644 --- a/docs/gate-configuration.md +++ b/docs/gate-configuration.md @@ -52,6 +52,7 @@ $ cat http.log | `zip` | [ZIP configuration](#zip-section) | | `pprof` | [Pprof configuration](#pprof-section) | | `prometheus` | [Prometheus configuration](#prometheus-section) | +| `tracing` | [Tracing configuration](#tracing-section) | # General section @@ -238,3 +239,20 @@ prometheus: |-----------|----------|---------------|------------------|-----------------------------------------| | `enabled` | `bool` | yes | `false` | Flag to enable the service. | | `address` | `string` | yes | `localhost:8084` | Address that service listener binds to. | + +# `tracing` section + +Contains configuration for the `tracing` service. + +```yaml +tracing: + enabled: true + exporter: "otlp_grpc" + endpoint: "localhost:4317" +``` + +| Parameter | Type | SIGHUP reload | Default value | Description | +|------------|----------|---------------|------------------|---------------------------------------------------------------| +| `enabled` | `bool` | yes | `false` | Flag to enable the tracing. | +| `exporter` | `string` | yes | | Trace collector type (`stdout` or `otlp_grpc` are supported). | +| `endpoint` | `string` | yes | | Address of collector endpoint for OTLP exporters. | diff --git a/downloader/download.go b/downloader/download.go index 6d24daa..7d29a26 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -27,14 +27,15 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "github.com/valyala/fasthttp" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "go.uber.org/zap" ) type request struct { *fasthttp.RequestCtx - appCtx context.Context - log *zap.Logger + log *zap.Logger } func isValidToken(s string) bool { @@ -88,40 +89,40 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str return http.DetectContentType(buf), buf, err // to not lose io.EOF } -func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) { +func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) { var ( err error dis = "inline" start = time.Now() filename string ) - if err = tokens.StoreBearerToken(r.RequestCtx); err != nil { - r.log.Error("could not fetch and store bearer token", zap.Error(err)) - response.Error(r.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) + if err = tokens.StoreBearerToken(req.RequestCtx); err != nil { + req.log.Error("could not fetch and store bearer token", zap.Error(err)) + response.Error(req.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) return } var prm pool.PrmObjectGet prm.SetAddress(objectAddress) - if btoken := bearerToken(r.RequestCtx); btoken != nil { + if btoken := bearerToken(req.RequestCtx); btoken != nil { prm.UseBearer(*btoken) } - rObj, err := clnt.GetObject(r.appCtx, prm) + rObj, err := clnt.GetObject(ctx, prm) if err != nil { - r.handleFrostFSErr(err, start) + req.handleFrostFSErr(err, start) return } // we can't close reader in this function, so how to do it? - if r.Request.URI().QueryArgs().GetBool("download") { + if req.Request.URI().QueryArgs().GetBool("download") { dis = "attachment" } payloadSize := rObj.Header.PayloadSize() - r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) + req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) var contentType string for _, attr := range rObj.Header.Attributes() { key := attr.Key() @@ -132,27 +133,27 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) { key = utils.BackwardTransformIfSystem(key) - r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) + req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) switch key { case object.AttributeFileName: filename = val case object.AttributeTimestamp: value, err := strconv.ParseInt(val, 10, 64) if err != nil { - r.log.Info("couldn't parse creation date", + req.log.Info("couldn't parse creation date", zap.String("key", key), zap.String("val", val), zap.Error(err)) continue } - r.Response.Header.Set(fasthttp.HeaderLastModified, + req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat)) case object.AttributeContentType: contentType = val } } - idsToResponse(&r.Response, &rObj.Header) + idsToResponse(&req.Response, &rObj.Header) if len(contentType) == 0 { // determine the Content-Type from the payload head @@ -162,8 +163,8 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) { return rObj.Payload, nil }) if err != nil && err != io.EOF { - r.log.Error("could not detect Content-Type from payload", zap.Error(err)) - response.Error(r.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest) + req.log.Error("could not detect Content-Type from payload", zap.Error(err)) + response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest) return } @@ -178,11 +179,11 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) { // if it implements io.Closer and that's useful for us. rObj.Payload = readCloser{headReader, rObj.Payload} } - r.SetContentType(contentType) + req.SetContentType(contentType) - r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) + req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) - r.Response.SetBodyStream(rObj.Payload, int(payloadSize)) + req.Response.SetBodyStream(rObj.Payload, int(payloadSize)) } func bearerToken(ctx context.Context) *bearer.Token { @@ -240,26 +241,35 @@ func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Down func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { return &request{ RequestCtx: ctx, - appCtx: d.appCtx, log: log, } } // DownloadByAddress handles download requests using simple cid/oid format. func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) { - d.byAddress(c, request.receiveFile) + d.byAddress(c, receiveFile) } // byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // prepares request and object address to it. -func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) { +func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) { var ( idCnr, _ = c.UserValue("cid").(string) idObj, _ = c.UserValue("oid").(string) log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj)) ) - cnrID, err := utils.GetContainerID(d.appCtx, idCnr, d.containerResolver) + ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object", + trace.WithAttributes( + attribute.String("cid", idCnr), + attribute.String("oid", idObj), + )) + defer func() { + utils.SetHTTPTraceInfo(ctx, span, c) + span.End() + }() + + cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver) if err != nil { log.Error("wrong container id", zap.Error(err)) response.Error(c, "wrong container id", fasthttp.StatusBadRequest) @@ -277,16 +287,16 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Poo addr.SetContainer(*cnrID) addr.SetObject(*objID) - f(*d.newRequest(c, log), d.pool, addr) + f(ctx, *d.newRequest(c, log), d.pool, addr) } // DownloadByAttribute handles attribute-based download requests. func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { - d.byAttribute(c, request.receiveFile) + d.byAttribute(c, receiveFile) } // byAttribute is a wrapper similar to byAddress. -func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) { +func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) { var ( scid, _ = c.UserValue("cid").(string) key, _ = url.QueryUnescape(c.UserValue("attr_key").(string)) @@ -294,7 +304,18 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) ) - containerID, err := utils.GetContainerID(d.appCtx, scid, d.containerResolver) + ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object", + trace.WithAttributes( + attribute.String("attr_key", key), + attribute.String("attr_val", val), + attribute.String("cid", scid), + )) + defer func() { + utils.SetHTTPTraceInfo(ctx, span, c) + span.End() + }() + + containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver) if err != nil { log.Error("wrong container id", zap.Error(err)) response.Error(c, "wrong container id", fasthttp.StatusBadRequest) @@ -329,7 +350,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P addrObj.SetContainer(*containerID) addrObj.SetObject(buf[0]) - f(*d.newRequest(c, log), d.pool, addrObj) + f(ctx, *d.newRequest(c, log), d.pool, addrObj) } func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) { diff --git a/downloader/head.go b/downloader/head.go index 4615103..9745019 100644 --- a/downloader/head.go +++ b/downloader/head.go @@ -1,6 +1,7 @@ package downloader import ( + "context" "io" "net/http" "strconv" @@ -13,6 +14,8 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "github.com/valyala/fasthttp" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -25,15 +28,25 @@ const ( hdrContainerID = "X-Container-Id" ) -func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) { +func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) { + ctx, span := utils.StartHTTPServerSpan(ctx, req.RequestCtx, "HEAD Object", + trace.WithAttributes( + attribute.String("cid", objectAddress.Container().EncodeToString()), + attribute.String("oid", objectAddress.Object().EncodeToString()), + )) + defer func() { + utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx) + span.End() + }() + var start = time.Now() - if err := tokens.StoreBearerToken(r.RequestCtx); err != nil { - r.log.Error("could not fetch and store bearer token", zap.Error(err)) - response.Error(r.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest) + if err := tokens.StoreBearerToken(req.RequestCtx); err != nil { + req.log.Error("could not fetch and store bearer token", zap.Error(err)) + response.Error(req.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest) return } - btoken := bearerToken(r.RequestCtx) + btoken := bearerToken(req.RequestCtx) var prm pool.PrmObjectHead prm.SetAddress(objectAddress) @@ -41,13 +54,13 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) { prm.UseBearer(*btoken) } - obj, err := clnt.HeadObject(r.appCtx, prm) + obj, err := clnt.HeadObject(ctx, prm) if err != nil { - r.handleFrostFSErr(err, start) + req.handleFrostFSErr(err, start) return } - r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10)) + req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10)) var contentType string for _, attr := range obj.Attributes() { key := attr.Key() @@ -58,24 +71,24 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) { key = utils.BackwardTransformIfSystem(key) - r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) + req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) switch key { case object.AttributeTimestamp: value, err := strconv.ParseInt(val, 10, 64) if err != nil { - r.log.Info("couldn't parse creation date", + req.log.Info("couldn't parse creation date", zap.String("key", key), zap.String("val", val), zap.Error(err)) continue } - r.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat)) + req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat)) case object.AttributeContentType: contentType = val } } - idsToResponse(&r.Response, &obj) + idsToResponse(&req.Response, &obj) if len(contentType) == 0 { contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) { @@ -86,18 +99,18 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) { prmRange.UseBearer(*btoken) } - resObj, err := clnt.ObjectRange(r.appCtx, prmRange) + resObj, err := clnt.ObjectRange(ctx, prmRange) if err != nil { return nil, err } return &resObj, nil }) if err != nil && err != io.EOF { - r.handleFrostFSErr(err, start) + req.handleFrostFSErr(err, start) return } } - r.SetContentType(contentType) + req.SetContentType(contentType) } func idsToResponse(resp *fasthttp.Response, obj *object.Object) { @@ -110,10 +123,10 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) { // HeadByAddress handles head requests using simple cid/oid format. func (d *Downloader) HeadByAddress(c *fasthttp.RequestCtx) { - d.byAddress(c, request.headObject) + d.byAddress(c, headObject) } // HeadByAttribute handles attribute-based head requests. func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) { - d.byAttribute(c, request.headObject) + d.byAttribute(c, headObject) } diff --git a/go.mod b/go.mod index 53cb35e..906e032 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,8 @@ require ( github.com/stretchr/testify v1.8.2 github.com/testcontainers/testcontainers-go v0.13.0 github.com/valyala/fasthttp v1.34.0 + go.opentelemetry.io/otel v1.14.0 + go.opentelemetry.io/otel/trace v1.14.0 go.uber.org/atomic v1.10.0 go.uber.org/zap v1.24.0 ) @@ -86,13 +88,11 @@ require ( github.com/urfave/cli v1.22.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect go.opentelemetry.io/otel/sdk v1.14.0 // indirect - go.opentelemetry.io/otel/trace v1.14.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.4.0 // indirect diff --git a/settings.go b/settings.go index 8d569c0..19188fd 100644 --- a/settings.go +++ b/settings.go @@ -47,6 +47,11 @@ const ( cfgPprofEnabled = "pprof.enabled" cfgPprofAddress = "pprof.address" + // Tracing ... + cfgTracingEnabled = "tracing.enabled" + cfgTracingExporter = "tracing.exporter" + cfgTracingEndpoint = "tracing.endpoint" + // Pool config. cfgConTimeout = "connect_timeout" cfgStreamTimeout = "stream_timeout" diff --git a/utils/tracing.go b/utils/tracing.go new file mode 100644 index 0000000..4bb74df --- /dev/null +++ b/utils/tracing.go @@ -0,0 +1,82 @@ +package utils + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" + "github.com/valyala/fasthttp" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "go.opentelemetry.io/otel/trace" +) + +type httpCarrier struct { + r *fasthttp.RequestCtx +} + +func (c *httpCarrier) Get(key string) string { + bytes := c.r.Request.Header.Peek(key) + if len(bytes) == 0 { + return "" + } + return string(bytes) +} + +func (c *httpCarrier) Set(key string, value string) { + c.r.Response.Header.Set(key, value) +} + +func (c *httpCarrier) Keys() []string { + dict := make(map[string]interface{}) + c.r.Request.Header.VisitAll( + func(key, value []byte) { + dict[string(key)] = true + }, + ) + c.r.Response.Header.VisitAll( + func(key, value []byte) { + dict[string(key)] = true + }, + ) + result := make([]string, 0, len(dict)) + for key := range dict { + result = append(result, key) + } + return result +} + +func extractHTTPTraceInfo(ctx context.Context, req *fasthttp.RequestCtx) context.Context { + if req == nil { + return ctx + } + carrier := &httpCarrier{r: req} + return tracing.Propagator.Extract(ctx, carrier) +} + +// SetHTTPTraceInfo saves trace headers to response. +func SetHTTPTraceInfo(ctx context.Context, span trace.Span, req *fasthttp.RequestCtx) { + if req == nil { + return + } + if err := req.Err(); err != nil { + span.SetStatus(codes.Error, err.Error()) + } + span.SetAttributes( + semconv.HTTPStatusCode(req.Response.StatusCode()), + ) + carrier := &httpCarrier{r: req} + tracing.Propagator.Inject(ctx, carrier) +} + +// StartHTTPServerSpan starts root HTTP server span. +func StartHTTPServerSpan(ctx context.Context, req *fasthttp.RequestCtx, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + ctx = extractHTTPTraceInfo(ctx, req) + opts = append(opts, trace.WithAttributes( + attribute.String("http.client_address", req.RemoteAddr().String()), + attribute.String("http.path", string(req.Path())), + semconv.HTTPMethod(string(req.Method())), + semconv.RPCService("frostfs-http-gw"), + ), trace.WithSpanKind(trace.SpanKindServer)) + return tracing.StartSpanFromContext(ctx, operationName, opts...) +} -- 2.45.2