GET/HEAD: Add tracing support #20

Merged
alexvanin merged 1 commit from dstepanov-yadro/frostfs-http-gw:feat/OBJECT-3311 into master 2023-05-04 14:01:57 +00:00
9 changed files with 234 additions and 50 deletions

41
app.go
View file

@ -10,7 +10,9 @@ import (
"strconv" "strconv"
"sync" "sync"
"syscall" "syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
@ -166,6 +168,7 @@ func newApp(ctx context.Context, opt ...Option) App {
a.initAppSettings() a.initAppSettings()
a.initResolver() a.initResolver()
a.initMetrics() a.initMetrics()
alexvanin marked this conversation as resolved Outdated

Should we call tracing.Shutdown() near a.stopServices() call?

Should we call `tracing.Shutdown()` near `a.stopServices()` call?

Sure, fixed

Sure, fixed
a.initTracing(ctx)
return a return a
} }
@ -375,7 +378,7 @@ LOOP:
case <-ctx.Done(): case <-ctx.Done():
break LOOP break LOOP
case <-sigs: case <-sigs:
a.configReload() a.configReload(ctx)
} }
} }
@ -383,11 +386,22 @@ LOOP:
a.metrics.Shutdown() a.metrics.Shutdown()
a.stopServices() a.stopServices()
a.shutdownTracing()
close(a.webDone) close(a.webDone)
} }
func (a *app) configReload() { func (a *app) shutdownTracing() {
dkirillov marked this conversation as resolved Outdated

Can we move this code (that shutdown tracing) to separate function?

Can we move this code (that shutdown tracing) to separate function?

done

done
const tracingShutdownTimeout = 5 * time.Second
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
defer cancel()
if err := tracing.Shutdown(shdnCtx); err != nil {
a.log.Warn("failed to shutdown tracing", zap.Error(err))
}
}
func (a *app) configReload(ctx context.Context) {
a.log.Info("SIGHUP config reload started") a.log.Info("SIGHUP config reload started")
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) { if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
a.log.Warn("failed to reload config because it's missed") a.log.Warn("failed to reload config because it's missed")
@ -418,6 +432,7 @@ func (a *app) configReload() {
a.updateSettings() a.updateSettings()
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled)) a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.initTracing(ctx)
a.setHealthStatus() a.setHealthStatus()
a.log.Info("SIGHUP config reload completed") a.log.Info("SIGHUP config reload completed")
@ -549,3 +564,25 @@ func (a *app) serverIndex(address string) int {
} }
return -1 return -1
} }
func (a *app) initTracing(ctx context.Context) {
instanceID := ""
if len(a.servers) > 0 {
dkirillov marked this conversation as resolved Outdated

Why don't we get context as parameter?

Why don't we get context as parameter?

fixed

fixed
dkirillov marked this conversation as resolved Outdated

Let's write:

if len(a.servers) > 0 {
	instanceID = a.servers[0].Address()
}
Let's write: ``` if len(a.servers) > 0 { instanceID = a.servers[0].Address() } ```

done

done
instanceID = a.servers[0].Address()
}
cfg := tracing.Config{
Enabled: a.cfg.GetBool(cfgTracingEnabled),
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
Service: "frostfs-http-gw",
InstanceID: instanceID,
Version: Version,
}
updated, err := tracing.Setup(ctx, cfg)
if err != nil {
a.log.Warn("failed to initialize tracing", zap.Error(err))
}
if updated {
a.log.Info("tracing config updated")
}
}

View file

@ -92,3 +92,7 @@ HTTP_GW_POOL_ERROR_THRESHOLD=100
# Enable zip compression to download files by common prefix. # Enable zip compression to download files by common prefix.
HTTP_GW_ZIP_COMPRESSION=false HTTP_GW_ZIP_COMPRESSION=false
HTTP_GW_TRACING_ENABLED=true
HTTP_GW_TRACING_ENDPOINT="localhost:4317"
HTTP_GW_TRACING_EXPORTER="otlp_grpc"

View file

@ -9,6 +9,10 @@ pprof:
prometheus: prometheus:
enabled: false # Enable metrics. enabled: false # Enable metrics.
address: localhost:8084 address: localhost:8084
tracing:
enabled: true
exporter: "otlp_grpc"
endpoint: "localhost:4317"
logger: logger:
level: debug # Log level. level: debug # Log level.

View file

@ -52,6 +52,7 @@ $ cat http.log
| `zip` | [ZIP configuration](#zip-section) | | `zip` | [ZIP configuration](#zip-section) |
| `pprof` | [Pprof configuration](#pprof-section) | | `pprof` | [Pprof configuration](#pprof-section) |
| `prometheus` | [Prometheus configuration](#prometheus-section) | | `prometheus` | [Prometheus configuration](#prometheus-section) |
| `tracing` | [Tracing configuration](#tracing-section) |
# General section # General section
@ -238,3 +239,20 @@ prometheus:
|-----------|----------|---------------|------------------|-----------------------------------------| |-----------|----------|---------------|------------------|-----------------------------------------|
| `enabled` | `bool` | yes | `false` | Flag to enable the service. | | `enabled` | `bool` | yes | `false` | Flag to enable the service. |
| `address` | `string` | yes | `localhost:8084` | Address that service listener binds to. | | `address` | `string` | yes | `localhost:8084` | Address that service listener binds to. |
# `tracing` section
Contains configuration for the `tracing` service.
```yaml
tracing:
enabled: true
exporter: "otlp_grpc"
endpoint: "localhost:4317"
dkirillov marked this conversation as resolved Outdated

Is there some default port? If so, let's specify it too

Is there some default port? If so, let's specify it too

done

done
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|------------|----------|---------------|------------------|---------------------------------------------------------------|
| `enabled` | `bool` | yes | `false` | Flag to enable the tracing. |
| `exporter` | `string` | yes | | Trace collector type (`stdout` or `otlp_grpc` are supported). |
| `endpoint` | `string` | yes | | Address of collector endpoint for OTLP exporters. |

View file

@ -27,13 +27,14 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
type request struct { type request struct {
*fasthttp.RequestCtx *fasthttp.RequestCtx
appCtx context.Context
log *zap.Logger log *zap.Logger
} }
@ -88,40 +89,40 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
return http.DetectContentType(buf), buf, err // to not lose io.EOF return http.DetectContentType(buf), buf, err // to not lose io.EOF
} }
func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) { func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
var ( var (
dkirillov marked this conversation as resolved Outdated

With changing signature we can drop appCtx from request

With changing signature we can drop `appCtx` from `request`

Done

Done
err error err error
dis = "inline" dis = "inline"
start = time.Now() start = time.Now()
filename string filename string
) )
if err = tokens.StoreBearerToken(r.RequestCtx); err != nil { if err = tokens.StoreBearerToken(req.RequestCtx); err != nil {
r.log.Error("could not fetch and store bearer token", zap.Error(err)) req.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(r.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) response.Error(req.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
var prm pool.PrmObjectGet var prm pool.PrmObjectGet
prm.SetAddress(objectAddress) prm.SetAddress(objectAddress)
if btoken := bearerToken(r.RequestCtx); btoken != nil { if btoken := bearerToken(req.RequestCtx); btoken != nil {
prm.UseBearer(*btoken) prm.UseBearer(*btoken)
} }
rObj, err := clnt.GetObject(r.appCtx, prm) rObj, err := clnt.GetObject(ctx, prm)
if err != nil { if err != nil {
r.handleFrostFSErr(err, start) req.handleFrostFSErr(err, start)
return return
} }
// we can't close reader in this function, so how to do it? // we can't close reader in this function, so how to do it?
if r.Request.URI().QueryArgs().GetBool("download") { if req.Request.URI().QueryArgs().GetBool("download") {
dis = "attachment" dis = "attachment"
} }
payloadSize := rObj.Header.PayloadSize() payloadSize := rObj.Header.PayloadSize()
r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
var contentType string var contentType string
for _, attr := range rObj.Header.Attributes() { for _, attr := range rObj.Header.Attributes() {
key := attr.Key() key := attr.Key()
@ -132,27 +133,27 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
key = utils.BackwardTransformIfSystem(key) key = utils.BackwardTransformIfSystem(key)
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
switch key { switch key {
case object.AttributeFileName: case object.AttributeFileName:
filename = val filename = val
case object.AttributeTimestamp: case object.AttributeTimestamp:
value, err := strconv.ParseInt(val, 10, 64) value, err := strconv.ParseInt(val, 10, 64)
if err != nil { if err != nil {
r.log.Info("couldn't parse creation date", req.log.Info("couldn't parse creation date",
zap.String("key", key), zap.String("key", key),
zap.String("val", val), zap.String("val", val),
zap.Error(err)) zap.Error(err))
continue continue
} }
r.Response.Header.Set(fasthttp.HeaderLastModified, req.Response.Header.Set(fasthttp.HeaderLastModified,
time.Unix(value, 0).UTC().Format(http.TimeFormat)) time.Unix(value, 0).UTC().Format(http.TimeFormat))
case object.AttributeContentType: case object.AttributeContentType:
contentType = val contentType = val
} }
} }
idsToResponse(&r.Response, &rObj.Header) idsToResponse(&req.Response, &rObj.Header)
if len(contentType) == 0 { if len(contentType) == 0 {
// determine the Content-Type from the payload head // determine the Content-Type from the payload head
@ -162,8 +163,8 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
return rObj.Payload, nil return rObj.Payload, nil
}) })
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
r.log.Error("could not detect Content-Type from payload", zap.Error(err)) req.log.Error("could not detect Content-Type from payload", zap.Error(err))
response.Error(r.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest) response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
@ -178,11 +179,11 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
// if it implements io.Closer and that's useful for us. // if it implements io.Closer and that's useful for us.
rObj.Payload = readCloser{headReader, rObj.Payload} rObj.Payload = readCloser{headReader, rObj.Payload}
} }
r.SetContentType(contentType) req.SetContentType(contentType)
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
r.Response.SetBodyStream(rObj.Payload, int(payloadSize)) req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
} }
func bearerToken(ctx context.Context) *bearer.Token { func bearerToken(ctx context.Context) *bearer.Token {
@ -240,26 +241,35 @@ func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Down
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
return &request{ return &request{
RequestCtx: ctx, RequestCtx: ctx,
appCtx: d.appCtx,
log: log, log: log,
} }
} }
// DownloadByAddress handles download requests using simple cid/oid format. // DownloadByAddress handles download requests using simple cid/oid format.
func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) { func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
d.byAddress(c, request.receiveFile) d.byAddress(c, receiveFile)
} }
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it. // prepares request and object address to it.
func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) { func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
var ( var (
idCnr, _ = c.UserValue("cid").(string) idCnr, _ = c.UserValue("cid").(string)
idObj, _ = c.UserValue("oid").(string) idObj, _ = c.UserValue("oid").(string)
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj)) log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
) )
cnrID, err := utils.GetContainerID(d.appCtx, idCnr, d.containerResolver) ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
trace.WithAttributes(
attribute.String("cid", idCnr),
attribute.String("oid", idObj),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, c)
span.End()
}()
cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver)
if err != nil { if err != nil {
log.Error("wrong container id", zap.Error(err)) log.Error("wrong container id", zap.Error(err))
response.Error(c, "wrong container id", fasthttp.StatusBadRequest) response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
@ -277,16 +287,16 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Poo
addr.SetContainer(*cnrID) addr.SetContainer(*cnrID)
addr.SetObject(*objID) addr.SetObject(*objID)
f(*d.newRequest(c, log), d.pool, addr) f(ctx, *d.newRequest(c, log), d.pool, addr)
} }
// DownloadByAttribute handles attribute-based download requests. // DownloadByAttribute handles attribute-based download requests.
func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
d.byAttribute(c, request.receiveFile) d.byAttribute(c, receiveFile)
} }
// byAttribute is a wrapper similar to byAddress. // byAttribute is a wrapper similar to byAddress.
func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) { func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
var ( var (
scid, _ = c.UserValue("cid").(string) scid, _ = c.UserValue("cid").(string)
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string)) key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
@ -294,7 +304,18 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
) )
containerID, err := utils.GetContainerID(d.appCtx, scid, d.containerResolver) ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
trace.WithAttributes(
attribute.String("attr_key", key),
attribute.String("attr_val", val),
attribute.String("cid", scid),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, c)
span.End()
}()
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
if err != nil { if err != nil {
log.Error("wrong container id", zap.Error(err)) log.Error("wrong container id", zap.Error(err))
response.Error(c, "wrong container id", fasthttp.StatusBadRequest) response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
@ -329,7 +350,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P
addrObj.SetContainer(*containerID) addrObj.SetContainer(*containerID)
addrObj.SetObject(buf[0]) addrObj.SetObject(buf[0])
f(*d.newRequest(c, log), d.pool, addrObj) f(ctx, *d.newRequest(c, log), d.pool, addrObj)
} }
func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) { func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {

View file

@ -1,6 +1,7 @@
package downloader package downloader
import ( import (
"context"
"io" "io"
"net/http" "net/http"
"strconv" "strconv"
@ -13,6 +14,8 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -25,15 +28,25 @@ const (
hdrContainerID = "X-Container-Id" hdrContainerID = "X-Container-Id"
) )
func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) { func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
ctx, span := utils.StartHTTPServerSpan(ctx, req.RequestCtx, "HEAD Object",
trace.WithAttributes(
attribute.String("cid", objectAddress.Container().EncodeToString()),
attribute.String("oid", objectAddress.Object().EncodeToString()),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx)
span.End()
}()
var start = time.Now() var start = time.Now()
if err := tokens.StoreBearerToken(r.RequestCtx); err != nil { if err := tokens.StoreBearerToken(req.RequestCtx); err != nil {
r.log.Error("could not fetch and store bearer token", zap.Error(err)) req.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(r.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest) response.Error(req.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest)
return return
} }
btoken := bearerToken(r.RequestCtx) btoken := bearerToken(req.RequestCtx)
var prm pool.PrmObjectHead var prm pool.PrmObjectHead
prm.SetAddress(objectAddress) prm.SetAddress(objectAddress)
@ -41,13 +54,13 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
prm.UseBearer(*btoken) prm.UseBearer(*btoken)
} }
obj, err := clnt.HeadObject(r.appCtx, prm) obj, err := clnt.HeadObject(ctx, prm)
if err != nil { if err != nil {
r.handleFrostFSErr(err, start) req.handleFrostFSErr(err, start)
return return
} }
r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10)) req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
var contentType string var contentType string
for _, attr := range obj.Attributes() { for _, attr := range obj.Attributes() {
key := attr.Key() key := attr.Key()
@ -58,24 +71,24 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
key = utils.BackwardTransformIfSystem(key) key = utils.BackwardTransformIfSystem(key)
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
switch key { switch key {
case object.AttributeTimestamp: case object.AttributeTimestamp:
value, err := strconv.ParseInt(val, 10, 64) value, err := strconv.ParseInt(val, 10, 64)
if err != nil { if err != nil {
r.log.Info("couldn't parse creation date", req.log.Info("couldn't parse creation date",
zap.String("key", key), zap.String("key", key),
zap.String("val", val), zap.String("val", val),
zap.Error(err)) zap.Error(err))
continue continue
} }
r.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat)) req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
case object.AttributeContentType: case object.AttributeContentType:
contentType = val contentType = val
} }
} }
idsToResponse(&r.Response, &obj) idsToResponse(&req.Response, &obj)
if len(contentType) == 0 { if len(contentType) == 0 {
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) { contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
@ -86,18 +99,18 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
prmRange.UseBearer(*btoken) prmRange.UseBearer(*btoken)
} }
resObj, err := clnt.ObjectRange(r.appCtx, prmRange) resObj, err := clnt.ObjectRange(ctx, prmRange)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &resObj, nil return &resObj, nil
}) })
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
r.handleFrostFSErr(err, start) req.handleFrostFSErr(err, start)
return return
} }
} }
r.SetContentType(contentType) req.SetContentType(contentType)
} }
func idsToResponse(resp *fasthttp.Response, obj *object.Object) { func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
@ -110,10 +123,10 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
// HeadByAddress handles head requests using simple cid/oid format. // HeadByAddress handles head requests using simple cid/oid format.
func (d *Downloader) HeadByAddress(c *fasthttp.RequestCtx) { func (d *Downloader) HeadByAddress(c *fasthttp.RequestCtx) {
d.byAddress(c, request.headObject) d.byAddress(c, headObject)
} }
// HeadByAttribute handles attribute-based head requests. // HeadByAttribute handles attribute-based head requests.
func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) { func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) {
d.byAttribute(c, request.headObject) d.byAttribute(c, headObject)
} }

4
go.mod
View file

@ -14,6 +14,8 @@ require (
github.com/stretchr/testify v1.8.2 github.com/stretchr/testify v1.8.2
github.com/testcontainers/testcontainers-go v0.13.0 github.com/testcontainers/testcontainers-go v0.13.0
github.com/valyala/fasthttp v1.34.0 github.com/valyala/fasthttp v1.34.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/atomic v1.10.0 go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
) )
@ -86,13 +88,11 @@ require (
github.com/urfave/cli v1.22.5 // indirect github.com/urfave/cli v1.22.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
go.opencensus.io v0.24.0 // indirect go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
go.opentelemetry.io/otel/sdk v1.14.0 // indirect go.opentelemetry.io/otel/sdk v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/multierr v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.4.0 // indirect golang.org/x/crypto v0.4.0 // indirect

View file

@ -47,6 +47,11 @@ const (
cfgPprofEnabled = "pprof.enabled" cfgPprofEnabled = "pprof.enabled"
cfgPprofAddress = "pprof.address" cfgPprofAddress = "pprof.address"
// Tracing ...
cfgTracingEnabled = "tracing.enabled"
dkirillov marked this conversation as resolved Outdated

We should add this new config parameters to config example and documentation

We should add this new config parameters to [config example](https://git.frostfs.info/dstepanov-yadro/frostfs-http-gw/src/branch/feat/OBJECT-3311/config) and [documentation](https://git.frostfs.info/dstepanov-yadro/frostfs-http-gw/src/branch/feat/OBJECT-3311/docs/gate-configuration.md)

done

done
cfgTracingExporter = "tracing.exporter"
cfgTracingEndpoint = "tracing.endpoint"
// Pool config. // Pool config.
cfgConTimeout = "connect_timeout" cfgConTimeout = "connect_timeout"
cfgStreamTimeout = "stream_timeout" cfgStreamTimeout = "stream_timeout"

82
utils/tracing.go Normal file
View file

@ -0,0 +1,82 @@
package utils
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
type httpCarrier struct {
r *fasthttp.RequestCtx
}
func (c *httpCarrier) Get(key string) string {
bytes := c.r.Request.Header.Peek(key)
if len(bytes) == 0 {
return ""
}
return string(bytes)
}
func (c *httpCarrier) Set(key string, value string) {
c.r.Response.Header.Set(key, value)
}
alexvanin marked this conversation as resolved Outdated

No empty line between methods?

No empty line between methods?

fixed

fixed
func (c *httpCarrier) Keys() []string {
dict := make(map[string]interface{})
c.r.Request.Header.VisitAll(
func(key, value []byte) {
dict[string(key)] = true
},
)
c.r.Response.Header.VisitAll(
func(key, value []byte) {
dict[string(key)] = true
},
)
result := make([]string, 0, len(dict))
for key := range dict {
result = append(result, key)
}
return result
}
func extractHTTPTraceInfo(ctx context.Context, req *fasthttp.RequestCtx) context.Context {
if req == nil {
return ctx
}
carrier := &httpCarrier{r: req}
return tracing.Propagator.Extract(ctx, carrier)
}
// SetHTTPTraceInfo saves trace headers to response.
func SetHTTPTraceInfo(ctx context.Context, span trace.Span, req *fasthttp.RequestCtx) {
if req == nil {
return
}
if err := req.Err(); err != nil {
span.SetStatus(codes.Error, err.Error())
}
span.SetAttributes(
semconv.HTTPStatusCode(req.Response.StatusCode()),
)
carrier := &httpCarrier{r: req}
tracing.Propagator.Inject(ctx, carrier)
}
// StartHTTPServerSpan starts root HTTP server span.
func StartHTTPServerSpan(ctx context.Context, req *fasthttp.RequestCtx, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
ctx = extractHTTPTraceInfo(ctx, req)
opts = append(opts, trace.WithAttributes(
attribute.String("http.client_address", req.RemoteAddr().String()),
attribute.String("http.path", string(req.Path())),
semconv.HTTPMethod(string(req.Method())),
semconv.RPCService("frostfs-http-gw"),
), trace.WithSpanKind(trace.SpanKindServer))
return tracing.StartSpanFromContext(ctx, operationName, opts...)
}