GET/HEAD: Add tracing support #20

Merged
alexvanin merged 1 commit from dstepanov-yadro/frostfs-http-gw:feat/OBJECT-3311 into master 2023-05-04 14:01:57 +00:00
9 changed files with 234 additions and 50 deletions

41
app.go
View file

@ -10,7 +10,9 @@ import (
"strconv" "strconv"
"sync" "sync"
"syscall" "syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
@ -166,6 +168,7 @@ func newApp(ctx context.Context, opt ...Option) App {
a.initAppSettings() a.initAppSettings()
a.initResolver() a.initResolver()
a.initMetrics() a.initMetrics()
a.initTracing(ctx)
return a return a
} }
@ -375,7 +378,7 @@ LOOP:
case <-ctx.Done(): case <-ctx.Done():
break LOOP break LOOP
case <-sigs: case <-sigs:
a.configReload() a.configReload(ctx)
} }
} }
@ -383,11 +386,22 @@ LOOP:
a.metrics.Shutdown() a.metrics.Shutdown()
a.stopServices() a.stopServices()
a.shutdownTracing()
close(a.webDone) close(a.webDone)
} }
func (a *app) configReload() { func (a *app) shutdownTracing() {
const tracingShutdownTimeout = 5 * time.Second
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
defer cancel()
if err := tracing.Shutdown(shdnCtx); err != nil {
a.log.Warn("failed to shutdown tracing", zap.Error(err))
}
}
func (a *app) configReload(ctx context.Context) {
a.log.Info("SIGHUP config reload started") a.log.Info("SIGHUP config reload started")
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) { if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
a.log.Warn("failed to reload config because it's missed") a.log.Warn("failed to reload config because it's missed")
@ -418,6 +432,7 @@ func (a *app) configReload() {
a.updateSettings() a.updateSettings()
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled)) a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.initTracing(ctx)
a.setHealthStatus() a.setHealthStatus()
a.log.Info("SIGHUP config reload completed") a.log.Info("SIGHUP config reload completed")
@ -549,3 +564,25 @@ func (a *app) serverIndex(address string) int {
} }
return -1 return -1
} }
func (a *app) initTracing(ctx context.Context) {
instanceID := ""
if len(a.servers) > 0 {
instanceID = a.servers[0].Address()
}
cfg := tracing.Config{
Enabled: a.cfg.GetBool(cfgTracingEnabled),
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
Service: "frostfs-http-gw",
InstanceID: instanceID,
Version: Version,
}
updated, err := tracing.Setup(ctx, cfg)
if err != nil {
a.log.Warn("failed to initialize tracing", zap.Error(err))
}
if updated {
a.log.Info("tracing config updated")
}
}

View file

@ -92,3 +92,7 @@ HTTP_GW_POOL_ERROR_THRESHOLD=100
# Enable zip compression to download files by common prefix. # Enable zip compression to download files by common prefix.
HTTP_GW_ZIP_COMPRESSION=false HTTP_GW_ZIP_COMPRESSION=false
HTTP_GW_TRACING_ENABLED=true
HTTP_GW_TRACING_ENDPOINT="localhost:4317"
HTTP_GW_TRACING_EXPORTER="otlp_grpc"

View file

@ -9,6 +9,10 @@ pprof:
prometheus: prometheus:
enabled: false # Enable metrics. enabled: false # Enable metrics.
address: localhost:8084 address: localhost:8084
tracing:
enabled: true
exporter: "otlp_grpc"
endpoint: "localhost:4317"
logger: logger:
level: debug # Log level. level: debug # Log level.

View file

@ -52,6 +52,7 @@ $ cat http.log
| `zip` | [ZIP configuration](#zip-section) | | `zip` | [ZIP configuration](#zip-section) |
| `pprof` | [Pprof configuration](#pprof-section) | | `pprof` | [Pprof configuration](#pprof-section) |
| `prometheus` | [Prometheus configuration](#prometheus-section) | | `prometheus` | [Prometheus configuration](#prometheus-section) |
| `tracing` | [Tracing configuration](#tracing-section) |
# General section # General section
@ -238,3 +239,20 @@ prometheus:
|-----------|----------|---------------|------------------|-----------------------------------------| |-----------|----------|---------------|------------------|-----------------------------------------|
| `enabled` | `bool` | yes | `false` | Flag to enable the service. | | `enabled` | `bool` | yes | `false` | Flag to enable the service. |
| `address` | `string` | yes | `localhost:8084` | Address that service listener binds to. | | `address` | `string` | yes | `localhost:8084` | Address that service listener binds to. |
# `tracing` section
Contains configuration for the `tracing` service.
```yaml
tracing:
enabled: true
exporter: "otlp_grpc"
endpoint: "localhost:4317"
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|------------|----------|---------------|------------------|---------------------------------------------------------------|
| `enabled` | `bool` | yes | `false` | Flag to enable the tracing. |
| `exporter` | `string` | yes | | Trace collector type (`stdout` or `otlp_grpc` are supported). |
| `endpoint` | `string` | yes | | Address of collector endpoint for OTLP exporters. |

View file

@ -27,14 +27,15 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
type request struct { type request struct {
*fasthttp.RequestCtx *fasthttp.RequestCtx
appCtx context.Context log *zap.Logger
log *zap.Logger
} }
func isValidToken(s string) bool { func isValidToken(s string) bool {
@ -88,40 +89,40 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
return http.DetectContentType(buf), buf, err // to not lose io.EOF return http.DetectContentType(buf), buf, err // to not lose io.EOF
} }
func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) { func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
var ( var (
err error err error
dis = "inline" dis = "inline"
start = time.Now() start = time.Now()
filename string filename string
) )
if err = tokens.StoreBearerToken(r.RequestCtx); err != nil { if err = tokens.StoreBearerToken(req.RequestCtx); err != nil {
r.log.Error("could not fetch and store bearer token", zap.Error(err)) req.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(r.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) response.Error(req.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
var prm pool.PrmObjectGet var prm pool.PrmObjectGet
prm.SetAddress(objectAddress) prm.SetAddress(objectAddress)
if btoken := bearerToken(r.RequestCtx); btoken != nil { if btoken := bearerToken(req.RequestCtx); btoken != nil {
prm.UseBearer(*btoken) prm.UseBearer(*btoken)
} }
rObj, err := clnt.GetObject(r.appCtx, prm) rObj, err := clnt.GetObject(ctx, prm)
if err != nil { if err != nil {
r.handleFrostFSErr(err, start) req.handleFrostFSErr(err, start)
return return
} }
// we can't close reader in this function, so how to do it? // we can't close reader in this function, so how to do it?
if r.Request.URI().QueryArgs().GetBool("download") { if req.Request.URI().QueryArgs().GetBool("download") {
dis = "attachment" dis = "attachment"
} }
payloadSize := rObj.Header.PayloadSize() payloadSize := rObj.Header.PayloadSize()
r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
var contentType string var contentType string
for _, attr := range rObj.Header.Attributes() { for _, attr := range rObj.Header.Attributes() {
key := attr.Key() key := attr.Key()
@ -132,27 +133,27 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
key = utils.BackwardTransformIfSystem(key) key = utils.BackwardTransformIfSystem(key)
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
switch key { switch key {
case object.AttributeFileName: case object.AttributeFileName:
filename = val filename = val
case object.AttributeTimestamp: case object.AttributeTimestamp:
value, err := strconv.ParseInt(val, 10, 64) value, err := strconv.ParseInt(val, 10, 64)
if err != nil { if err != nil {
r.log.Info("couldn't parse creation date", req.log.Info("couldn't parse creation date",
zap.String("key", key), zap.String("key", key),
zap.String("val", val), zap.String("val", val),
zap.Error(err)) zap.Error(err))
continue continue
} }
r.Response.Header.Set(fasthttp.HeaderLastModified, req.Response.Header.Set(fasthttp.HeaderLastModified,
time.Unix(value, 0).UTC().Format(http.TimeFormat)) time.Unix(value, 0).UTC().Format(http.TimeFormat))
case object.AttributeContentType: case object.AttributeContentType:
contentType = val contentType = val
} }
} }
idsToResponse(&r.Response, &rObj.Header) idsToResponse(&req.Response, &rObj.Header)
if len(contentType) == 0 { if len(contentType) == 0 {
// determine the Content-Type from the payload head // determine the Content-Type from the payload head
@ -162,8 +163,8 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
return rObj.Payload, nil return rObj.Payload, nil
}) })
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
r.log.Error("could not detect Content-Type from payload", zap.Error(err)) req.log.Error("could not detect Content-Type from payload", zap.Error(err))
response.Error(r.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest) response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
@ -178,11 +179,11 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
// if it implements io.Closer and that's useful for us. // if it implements io.Closer and that's useful for us.
rObj.Payload = readCloser{headReader, rObj.Payload} rObj.Payload = readCloser{headReader, rObj.Payload}
} }
r.SetContentType(contentType) req.SetContentType(contentType)
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
r.Response.SetBodyStream(rObj.Payload, int(payloadSize)) req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
} }
func bearerToken(ctx context.Context) *bearer.Token { func bearerToken(ctx context.Context) *bearer.Token {
@ -240,26 +241,35 @@ func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Down
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
return &request{ return &request{
RequestCtx: ctx, RequestCtx: ctx,
appCtx: d.appCtx,
log: log, log: log,
} }
} }
// DownloadByAddress handles download requests using simple cid/oid format. // DownloadByAddress handles download requests using simple cid/oid format.
func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) { func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
d.byAddress(c, request.receiveFile) d.byAddress(c, receiveFile)
} }
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it. // prepares request and object address to it.
func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) { func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
var ( var (
idCnr, _ = c.UserValue("cid").(string) idCnr, _ = c.UserValue("cid").(string)
idObj, _ = c.UserValue("oid").(string) idObj, _ = c.UserValue("oid").(string)
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj)) log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
) )
cnrID, err := utils.GetContainerID(d.appCtx, idCnr, d.containerResolver) ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
trace.WithAttributes(
attribute.String("cid", idCnr),
attribute.String("oid", idObj),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, c)
span.End()
}()
cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver)
if err != nil { if err != nil {
log.Error("wrong container id", zap.Error(err)) log.Error("wrong container id", zap.Error(err))
response.Error(c, "wrong container id", fasthttp.StatusBadRequest) response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
@ -277,16 +287,16 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Poo
addr.SetContainer(*cnrID) addr.SetContainer(*cnrID)
addr.SetObject(*objID) addr.SetObject(*objID)
f(*d.newRequest(c, log), d.pool, addr) f(ctx, *d.newRequest(c, log), d.pool, addr)
} }
// DownloadByAttribute handles attribute-based download requests. // DownloadByAttribute handles attribute-based download requests.
func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
d.byAttribute(c, request.receiveFile) d.byAttribute(c, receiveFile)
} }
// byAttribute is a wrapper similar to byAddress. // byAttribute is a wrapper similar to byAddress.
func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) { func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
var ( var (
scid, _ = c.UserValue("cid").(string) scid, _ = c.UserValue("cid").(string)
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string)) key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
@ -294,7 +304,18 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
) )
containerID, err := utils.GetContainerID(d.appCtx, scid, d.containerResolver) ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
trace.WithAttributes(
attribute.String("attr_key", key),
attribute.String("attr_val", val),
attribute.String("cid", scid),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, c)
span.End()
}()
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
if err != nil { if err != nil {
log.Error("wrong container id", zap.Error(err)) log.Error("wrong container id", zap.Error(err))
response.Error(c, "wrong container id", fasthttp.StatusBadRequest) response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
@ -329,7 +350,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P
addrObj.SetContainer(*containerID) addrObj.SetContainer(*containerID)
addrObj.SetObject(buf[0]) addrObj.SetObject(buf[0])
f(*d.newRequest(c, log), d.pool, addrObj) f(ctx, *d.newRequest(c, log), d.pool, addrObj)
} }
func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) { func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {

View file

@ -1,6 +1,7 @@
package downloader package downloader
import ( import (
"context"
"io" "io"
"net/http" "net/http"
"strconv" "strconv"
@ -13,6 +14,8 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -25,15 +28,25 @@ const (
hdrContainerID = "X-Container-Id" hdrContainerID = "X-Container-Id"
) )
func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) { func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
ctx, span := utils.StartHTTPServerSpan(ctx, req.RequestCtx, "HEAD Object",
trace.WithAttributes(
attribute.String("cid", objectAddress.Container().EncodeToString()),
attribute.String("oid", objectAddress.Object().EncodeToString()),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx)
span.End()
}()
var start = time.Now() var start = time.Now()
if err := tokens.StoreBearerToken(r.RequestCtx); err != nil { if err := tokens.StoreBearerToken(req.RequestCtx); err != nil {
r.log.Error("could not fetch and store bearer token", zap.Error(err)) req.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(r.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest) response.Error(req.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest)
return return
} }
btoken := bearerToken(r.RequestCtx) btoken := bearerToken(req.RequestCtx)
var prm pool.PrmObjectHead var prm pool.PrmObjectHead
prm.SetAddress(objectAddress) prm.SetAddress(objectAddress)
@ -41,13 +54,13 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
prm.UseBearer(*btoken) prm.UseBearer(*btoken)
} }
obj, err := clnt.HeadObject(r.appCtx, prm) obj, err := clnt.HeadObject(ctx, prm)
if err != nil { if err != nil {
r.handleFrostFSErr(err, start) req.handleFrostFSErr(err, start)
return return
} }
r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10)) req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
var contentType string var contentType string
for _, attr := range obj.Attributes() { for _, attr := range obj.Attributes() {
key := attr.Key() key := attr.Key()
@ -58,24 +71,24 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
key = utils.BackwardTransformIfSystem(key) key = utils.BackwardTransformIfSystem(key)
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
switch key { switch key {
case object.AttributeTimestamp: case object.AttributeTimestamp:
value, err := strconv.ParseInt(val, 10, 64) value, err := strconv.ParseInt(val, 10, 64)
if err != nil { if err != nil {
r.log.Info("couldn't parse creation date", req.log.Info("couldn't parse creation date",
zap.String("key", key), zap.String("key", key),
zap.String("val", val), zap.String("val", val),
zap.Error(err)) zap.Error(err))
continue continue
} }
r.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat)) req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
case object.AttributeContentType: case object.AttributeContentType:
contentType = val contentType = val
} }
} }
idsToResponse(&r.Response, &obj) idsToResponse(&req.Response, &obj)
if len(contentType) == 0 { if len(contentType) == 0 {
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) { contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
@ -86,18 +99,18 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
prmRange.UseBearer(*btoken) prmRange.UseBearer(*btoken)
} }
resObj, err := clnt.ObjectRange(r.appCtx, prmRange) resObj, err := clnt.ObjectRange(ctx, prmRange)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &resObj, nil return &resObj, nil
}) })
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
r.handleFrostFSErr(err, start) req.handleFrostFSErr(err, start)
return return
} }
} }
r.SetContentType(contentType) req.SetContentType(contentType)
} }
func idsToResponse(resp *fasthttp.Response, obj *object.Object) { func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
@ -110,10 +123,10 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
// HeadByAddress handles head requests using simple cid/oid format. // HeadByAddress handles head requests using simple cid/oid format.
func (d *Downloader) HeadByAddress(c *fasthttp.RequestCtx) { func (d *Downloader) HeadByAddress(c *fasthttp.RequestCtx) {
d.byAddress(c, request.headObject) d.byAddress(c, headObject)
} }
// HeadByAttribute handles attribute-based head requests. // HeadByAttribute handles attribute-based head requests.
func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) { func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) {
d.byAttribute(c, request.headObject) d.byAttribute(c, headObject)
} }

4
go.mod
View file

@ -14,6 +14,8 @@ require (
github.com/stretchr/testify v1.8.2 github.com/stretchr/testify v1.8.2
github.com/testcontainers/testcontainers-go v0.13.0 github.com/testcontainers/testcontainers-go v0.13.0
github.com/valyala/fasthttp v1.34.0 github.com/valyala/fasthttp v1.34.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/atomic v1.10.0 go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
) )
@ -86,13 +88,11 @@ require (
github.com/urfave/cli v1.22.5 // indirect github.com/urfave/cli v1.22.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
go.opencensus.io v0.24.0 // indirect go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
go.opentelemetry.io/otel/sdk v1.14.0 // indirect go.opentelemetry.io/otel/sdk v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/multierr v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.4.0 // indirect golang.org/x/crypto v0.4.0 // indirect

View file

@ -47,6 +47,11 @@ const (
cfgPprofEnabled = "pprof.enabled" cfgPprofEnabled = "pprof.enabled"
cfgPprofAddress = "pprof.address" cfgPprofAddress = "pprof.address"
// Tracing ...
cfgTracingEnabled = "tracing.enabled"
cfgTracingExporter = "tracing.exporter"
cfgTracingEndpoint = "tracing.endpoint"
// Pool config. // Pool config.
cfgConTimeout = "connect_timeout" cfgConTimeout = "connect_timeout"
cfgStreamTimeout = "stream_timeout" cfgStreamTimeout = "stream_timeout"

82
utils/tracing.go Normal file
View file

@ -0,0 +1,82 @@
package utils
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
type httpCarrier struct {
r *fasthttp.RequestCtx
}
func (c *httpCarrier) Get(key string) string {
bytes := c.r.Request.Header.Peek(key)
if len(bytes) == 0 {
return ""
}
return string(bytes)
}
func (c *httpCarrier) Set(key string, value string) {
c.r.Response.Header.Set(key, value)
}
func (c *httpCarrier) Keys() []string {
dict := make(map[string]interface{})
c.r.Request.Header.VisitAll(
func(key, value []byte) {
dict[string(key)] = true
},
)
c.r.Response.Header.VisitAll(
func(key, value []byte) {
dict[string(key)] = true
},
)
result := make([]string, 0, len(dict))
for key := range dict {
result = append(result, key)
}
return result
}
func extractHTTPTraceInfo(ctx context.Context, req *fasthttp.RequestCtx) context.Context {
if req == nil {
return ctx
}
carrier := &httpCarrier{r: req}
return tracing.Propagator.Extract(ctx, carrier)
}
// SetHTTPTraceInfo saves trace headers to response.
func SetHTTPTraceInfo(ctx context.Context, span trace.Span, req *fasthttp.RequestCtx) {
if req == nil {
return
}
if err := req.Err(); err != nil {
span.SetStatus(codes.Error, err.Error())
}
span.SetAttributes(
semconv.HTTPStatusCode(req.Response.StatusCode()),
)
carrier := &httpCarrier{r: req}
tracing.Propagator.Inject(ctx, carrier)
}
// StartHTTPServerSpan starts root HTTP server span.
func StartHTTPServerSpan(ctx context.Context, req *fasthttp.RequestCtx, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
ctx = extractHTTPTraceInfo(ctx, req)
opts = append(opts, trace.WithAttributes(
attribute.String("http.client_address", req.RemoteAddr().String()),
attribute.String("http.path", string(req.Path())),
semconv.HTTPMethod(string(req.Method())),
semconv.RPCService("frostfs-http-gw"),
), trace.WithSpanKind(trace.SpanKindServer))
return tracing.StartSpanFromContext(ctx, operationName, opts...)
}