GET/HEAD: Add tracing support #20
9 changed files with 234 additions and 50 deletions
41
app.go
41
app.go
|
@ -10,7 +10,9 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
|
@ -166,6 +168,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
||||||
a.initAppSettings()
|
a.initAppSettings()
|
||||||
a.initResolver()
|
a.initResolver()
|
||||||
a.initMetrics()
|
a.initMetrics()
|
||||||
alexvanin marked this conversation as resolved
Outdated
|
|||||||
|
a.initTracing(ctx)
|
||||||
|
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
@ -375,7 +378,7 @@ LOOP:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break LOOP
|
break LOOP
|
||||||
case <-sigs:
|
case <-sigs:
|
||||||
a.configReload()
|
a.configReload(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -383,11 +386,22 @@ LOOP:
|
||||||
|
|
||||||
a.metrics.Shutdown()
|
a.metrics.Shutdown()
|
||||||
a.stopServices()
|
a.stopServices()
|
||||||
|
a.shutdownTracing()
|
||||||
|
|
||||||
close(a.webDone)
|
close(a.webDone)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) configReload() {
|
func (a *app) shutdownTracing() {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Can we move this code (that shutdown tracing) to separate function? Can we move this code (that shutdown tracing) to separate function?
dstepanov-yadro
commented
done done
|
|||||||
|
const tracingShutdownTimeout = 5 * time.Second
|
||||||
|
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := tracing.Shutdown(shdnCtx); err != nil {
|
||||||
|
a.log.Warn("failed to shutdown tracing", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) configReload(ctx context.Context) {
|
||||||
a.log.Info("SIGHUP config reload started")
|
a.log.Info("SIGHUP config reload started")
|
||||||
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
||||||
a.log.Warn("failed to reload config because it's missed")
|
a.log.Warn("failed to reload config because it's missed")
|
||||||
|
@ -418,6 +432,7 @@ func (a *app) configReload() {
|
||||||
a.updateSettings()
|
a.updateSettings()
|
||||||
|
|
||||||
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
||||||
|
a.initTracing(ctx)
|
||||||
a.setHealthStatus()
|
a.setHealthStatus()
|
||||||
|
|
||||||
a.log.Info("SIGHUP config reload completed")
|
a.log.Info("SIGHUP config reload completed")
|
||||||
|
@ -549,3 +564,25 @@ func (a *app) serverIndex(address string) int {
|
||||||
}
|
}
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *app) initTracing(ctx context.Context) {
|
||||||
|
instanceID := ""
|
||||||
|
if len(a.servers) > 0 {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why don't we get context as parameter? Why don't we get context as parameter?
dstepanov-yadro
commented
fixed fixed
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Let's write:
Let's write:
```
if len(a.servers) > 0 {
instanceID = a.servers[0].Address()
}
```
dstepanov-yadro
commented
done done
|
|||||||
|
instanceID = a.servers[0].Address()
|
||||||
|
}
|
||||||
|
cfg := tracing.Config{
|
||||||
|
Enabled: a.cfg.GetBool(cfgTracingEnabled),
|
||||||
|
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
|
||||||
|
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
|
||||||
|
Service: "frostfs-http-gw",
|
||||||
|
InstanceID: instanceID,
|
||||||
|
Version: Version,
|
||||||
|
}
|
||||||
|
updated, err := tracing.Setup(ctx, cfg)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Warn("failed to initialize tracing", zap.Error(err))
|
||||||
|
}
|
||||||
|
if updated {
|
||||||
|
a.log.Info("tracing config updated")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -92,3 +92,7 @@ HTTP_GW_POOL_ERROR_THRESHOLD=100
|
||||||
|
|
||||||
# Enable zip compression to download files by common prefix.
|
# Enable zip compression to download files by common prefix.
|
||||||
HTTP_GW_ZIP_COMPRESSION=false
|
HTTP_GW_ZIP_COMPRESSION=false
|
||||||
|
|
||||||
|
HTTP_GW_TRACING_ENABLED=true
|
||||||
|
HTTP_GW_TRACING_ENDPOINT="localhost:4317"
|
||||||
|
HTTP_GW_TRACING_EXPORTER="otlp_grpc"
|
|
@ -9,6 +9,10 @@ pprof:
|
||||||
prometheus:
|
prometheus:
|
||||||
enabled: false # Enable metrics.
|
enabled: false # Enable metrics.
|
||||||
address: localhost:8084
|
address: localhost:8084
|
||||||
|
tracing:
|
||||||
|
enabled: true
|
||||||
|
exporter: "otlp_grpc"
|
||||||
|
endpoint: "localhost:4317"
|
||||||
|
|
||||||
logger:
|
logger:
|
||||||
level: debug # Log level.
|
level: debug # Log level.
|
||||||
|
|
|
@ -52,6 +52,7 @@ $ cat http.log
|
||||||
| `zip` | [ZIP configuration](#zip-section) |
|
| `zip` | [ZIP configuration](#zip-section) |
|
||||||
| `pprof` | [Pprof configuration](#pprof-section) |
|
| `pprof` | [Pprof configuration](#pprof-section) |
|
||||||
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
||||||
|
| `tracing` | [Tracing configuration](#tracing-section) |
|
||||||
|
|
||||||
|
|
||||||
# General section
|
# General section
|
||||||
|
@ -238,3 +239,20 @@ prometheus:
|
||||||
|-----------|----------|---------------|------------------|-----------------------------------------|
|
|-----------|----------|---------------|------------------|-----------------------------------------|
|
||||||
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
|
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
|
||||||
| `address` | `string` | yes | `localhost:8084` | Address that service listener binds to. |
|
| `address` | `string` | yes | `localhost:8084` | Address that service listener binds to. |
|
||||||
|
|
||||||
|
# `tracing` section
|
||||||
|
|
||||||
|
Contains configuration for the `tracing` service.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
tracing:
|
||||||
|
enabled: true
|
||||||
|
exporter: "otlp_grpc"
|
||||||
|
endpoint: "localhost:4317"
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Is there some default port? If so, let's specify it too Is there some default port? If so, let's specify it too
dstepanov-yadro
commented
done done
|
|||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
|------------|----------|---------------|------------------|---------------------------------------------------------------|
|
||||||
|
| `enabled` | `bool` | yes | `false` | Flag to enable the tracing. |
|
||||||
|
| `exporter` | `string` | yes | | Trace collector type (`stdout` or `otlp_grpc` are supported). |
|
||||||
|
| `endpoint` | `string` | yes | | Address of collector endpoint for OTLP exporters. |
|
||||||
|
|
|
@ -27,13 +27,14 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type request struct {
|
type request struct {
|
||||||
*fasthttp.RequestCtx
|
*fasthttp.RequestCtx
|
||||||
appCtx context.Context
|
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,40 +89,40 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
|
||||||
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
|
func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
var (
|
var (
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
With changing signature we can drop With changing signature we can drop `appCtx` from `request`
dstepanov-yadro
commented
Done Done
|
|||||||
err error
|
err error
|
||||||
dis = "inline"
|
dis = "inline"
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
filename string
|
filename string
|
||||||
)
|
)
|
||||||
if err = tokens.StoreBearerToken(r.RequestCtx); err != nil {
|
if err = tokens.StoreBearerToken(req.RequestCtx); err != nil {
|
||||||
r.log.Error("could not fetch and store bearer token", zap.Error(err))
|
req.log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||||
response.Error(r.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm pool.PrmObjectGet
|
var prm pool.PrmObjectGet
|
||||||
prm.SetAddress(objectAddress)
|
prm.SetAddress(objectAddress)
|
||||||
if btoken := bearerToken(r.RequestCtx); btoken != nil {
|
if btoken := bearerToken(req.RequestCtx); btoken != nil {
|
||||||
prm.UseBearer(*btoken)
|
prm.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
rObj, err := clnt.GetObject(r.appCtx, prm)
|
rObj, err := clnt.GetObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.handleFrostFSErr(err, start)
|
req.handleFrostFSErr(err, start)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// we can't close reader in this function, so how to do it?
|
// we can't close reader in this function, so how to do it?
|
||||||
|
|
||||||
if r.Request.URI().QueryArgs().GetBool("download") {
|
if req.Request.URI().QueryArgs().GetBool("download") {
|
||||||
dis = "attachment"
|
dis = "attachment"
|
||||||
}
|
}
|
||||||
|
|
||||||
payloadSize := rObj.Header.PayloadSize()
|
payloadSize := rObj.Header.PayloadSize()
|
||||||
|
|
||||||
r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||||
var contentType string
|
var contentType string
|
||||||
for _, attr := range rObj.Header.Attributes() {
|
for _, attr := range rObj.Header.Attributes() {
|
||||||
key := attr.Key()
|
key := attr.Key()
|
||||||
|
@ -132,27 +133,27 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
|
|
||||||
key = utils.BackwardTransformIfSystem(key)
|
key = utils.BackwardTransformIfSystem(key)
|
||||||
|
|
||||||
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||||
switch key {
|
switch key {
|
||||||
case object.AttributeFileName:
|
case object.AttributeFileName:
|
||||||
filename = val
|
filename = val
|
||||||
case object.AttributeTimestamp:
|
case object.AttributeTimestamp:
|
||||||
value, err := strconv.ParseInt(val, 10, 64)
|
value, err := strconv.ParseInt(val, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Info("couldn't parse creation date",
|
req.log.Info("couldn't parse creation date",
|
||||||
zap.String("key", key),
|
zap.String("key", key),
|
||||||
zap.String("val", val),
|
zap.String("val", val),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
r.Response.Header.Set(fasthttp.HeaderLastModified,
|
req.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||||
case object.AttributeContentType:
|
case object.AttributeContentType:
|
||||||
contentType = val
|
contentType = val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
idsToResponse(&r.Response, &rObj.Header)
|
idsToResponse(&req.Response, &rObj.Header)
|
||||||
|
|
||||||
if len(contentType) == 0 {
|
if len(contentType) == 0 {
|
||||||
// determine the Content-Type from the payload head
|
// determine the Content-Type from the payload head
|
||||||
|
@ -162,8 +163,8 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
return rObj.Payload, nil
|
return rObj.Payload, nil
|
||||||
})
|
})
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
r.log.Error("could not detect Content-Type from payload", zap.Error(err))
|
req.log.Error("could not detect Content-Type from payload", zap.Error(err))
|
||||||
response.Error(r.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,11 +179,11 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
// if it implements io.Closer and that's useful for us.
|
// if it implements io.Closer and that's useful for us.
|
||||||
rObj.Payload = readCloser{headReader, rObj.Payload}
|
rObj.Payload = readCloser{headReader, rObj.Payload}
|
||||||
}
|
}
|
||||||
r.SetContentType(contentType)
|
req.SetContentType(contentType)
|
||||||
|
|
||||||
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||||
|
|
||||||
r.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
func bearerToken(ctx context.Context) *bearer.Token {
|
func bearerToken(ctx context.Context) *bearer.Token {
|
||||||
|
@ -240,26 +241,35 @@ func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Down
|
||||||
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
||||||
return &request{
|
return &request{
|
||||||
RequestCtx: ctx,
|
RequestCtx: ctx,
|
||||||
appCtx: d.appCtx,
|
|
||||||
log: log,
|
log: log,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadByAddress handles download requests using simple cid/oid format.
|
// DownloadByAddress handles download requests using simple cid/oid format.
|
||||||
func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
|
func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
|
||||||
d.byAddress(c, request.receiveFile)
|
d.byAddress(c, receiveFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
// prepares request and object address to it.
|
// prepares request and object address to it.
|
||||||
func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) {
|
func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
||||||
var (
|
var (
|
||||||
idCnr, _ = c.UserValue("cid").(string)
|
idCnr, _ = c.UserValue("cid").(string)
|
||||||
idObj, _ = c.UserValue("oid").(string)
|
idObj, _ = c.UserValue("oid").(string)
|
||||||
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
||||||
)
|
)
|
||||||
|
|
||||||
cnrID, err := utils.GetContainerID(d.appCtx, idCnr, d.containerResolver)
|
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("cid", idCnr),
|
||||||
|
attribute.String("oid", idObj),
|
||||||
|
))
|
||||||
|
defer func() {
|
||||||
|
utils.SetHTTPTraceInfo(ctx, span, c)
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
|
||||||
|
cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("wrong container id", zap.Error(err))
|
log.Error("wrong container id", zap.Error(err))
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
@ -277,16 +287,16 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Poo
|
||||||
addr.SetContainer(*cnrID)
|
addr.SetContainer(*cnrID)
|
||||||
addr.SetObject(*objID)
|
addr.SetObject(*objID)
|
||||||
|
|
||||||
f(*d.newRequest(c, log), d.pool, addr)
|
f(ctx, *d.newRequest(c, log), d.pool, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadByAttribute handles attribute-based download requests.
|
// DownloadByAttribute handles attribute-based download requests.
|
||||||
func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
d.byAttribute(c, request.receiveFile)
|
d.byAttribute(c, receiveFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byAttribute is a wrapper similar to byAddress.
|
// byAttribute is a wrapper similar to byAddress.
|
||||||
func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) {
|
func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
||||||
var (
|
var (
|
||||||
scid, _ = c.UserValue("cid").(string)
|
scid, _ = c.UserValue("cid").(string)
|
||||||
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
|
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
|
||||||
|
@ -294,7 +304,18 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P
|
||||||
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||||
)
|
)
|
||||||
|
|
||||||
containerID, err := utils.GetContainerID(d.appCtx, scid, d.containerResolver)
|
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("attr_key", key),
|
||||||
|
attribute.String("attr_val", val),
|
||||||
|
attribute.String("cid", scid),
|
||||||
|
))
|
||||||
|
defer func() {
|
||||||
|
utils.SetHTTPTraceInfo(ctx, span, c)
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
|
||||||
|
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("wrong container id", zap.Error(err))
|
log.Error("wrong container id", zap.Error(err))
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
@ -329,7 +350,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P
|
||||||
addrObj.SetContainer(*containerID)
|
addrObj.SetContainer(*containerID)
|
||||||
addrObj.SetObject(buf[0])
|
addrObj.SetObject(buf[0])
|
||||||
|
|
||||||
f(*d.newRequest(c, log), d.pool, addrObj)
|
f(ctx, *d.newRequest(c, log), d.pool, addrObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package downloader
|
package downloader
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -13,6 +14,8 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -25,15 +28,25 @@ const (
|
||||||
hdrContainerID = "X-Container-Id"
|
hdrContainerID = "X-Container-Id"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
|
func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
|
ctx, span := utils.StartHTTPServerSpan(ctx, req.RequestCtx, "HEAD Object",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("cid", objectAddress.Container().EncodeToString()),
|
||||||
|
attribute.String("oid", objectAddress.Object().EncodeToString()),
|
||||||
|
))
|
||||||
|
defer func() {
|
||||||
|
utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx)
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
|
||||||
var start = time.Now()
|
var start = time.Now()
|
||||||
if err := tokens.StoreBearerToken(r.RequestCtx); err != nil {
|
if err := tokens.StoreBearerToken(req.RequestCtx); err != nil {
|
||||||
r.log.Error("could not fetch and store bearer token", zap.Error(err))
|
req.log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||||
response.Error(r.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest)
|
response.Error(req.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
btoken := bearerToken(r.RequestCtx)
|
btoken := bearerToken(req.RequestCtx)
|
||||||
|
|
||||||
var prm pool.PrmObjectHead
|
var prm pool.PrmObjectHead
|
||||||
prm.SetAddress(objectAddress)
|
prm.SetAddress(objectAddress)
|
||||||
|
@ -41,13 +54,13 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
prm.UseBearer(*btoken)
|
prm.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := clnt.HeadObject(r.appCtx, prm)
|
obj, err := clnt.HeadObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.handleFrostFSErr(err, start)
|
req.handleFrostFSErr(err, start)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
|
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
|
||||||
var contentType string
|
var contentType string
|
||||||
for _, attr := range obj.Attributes() {
|
for _, attr := range obj.Attributes() {
|
||||||
key := attr.Key()
|
key := attr.Key()
|
||||||
|
@ -58,24 +71,24 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
|
|
||||||
key = utils.BackwardTransformIfSystem(key)
|
key = utils.BackwardTransformIfSystem(key)
|
||||||
|
|
||||||
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||||
switch key {
|
switch key {
|
||||||
case object.AttributeTimestamp:
|
case object.AttributeTimestamp:
|
||||||
value, err := strconv.ParseInt(val, 10, 64)
|
value, err := strconv.ParseInt(val, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Info("couldn't parse creation date",
|
req.log.Info("couldn't parse creation date",
|
||||||
zap.String("key", key),
|
zap.String("key", key),
|
||||||
zap.String("val", val),
|
zap.String("val", val),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
r.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||||
case object.AttributeContentType:
|
case object.AttributeContentType:
|
||||||
contentType = val
|
contentType = val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
idsToResponse(&r.Response, &obj)
|
idsToResponse(&req.Response, &obj)
|
||||||
|
|
||||||
if len(contentType) == 0 {
|
if len(contentType) == 0 {
|
||||||
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
|
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
|
||||||
|
@ -86,18 +99,18 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
prmRange.UseBearer(*btoken)
|
prmRange.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
resObj, err := clnt.ObjectRange(r.appCtx, prmRange)
|
resObj, err := clnt.ObjectRange(ctx, prmRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &resObj, nil
|
return &resObj, nil
|
||||||
})
|
})
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
r.handleFrostFSErr(err, start)
|
req.handleFrostFSErr(err, start)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
r.SetContentType(contentType)
|
req.SetContentType(contentType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
||||||
|
@ -110,10 +123,10 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
||||||
|
|
||||||
// HeadByAddress handles head requests using simple cid/oid format.
|
// HeadByAddress handles head requests using simple cid/oid format.
|
||||||
func (d *Downloader) HeadByAddress(c *fasthttp.RequestCtx) {
|
func (d *Downloader) HeadByAddress(c *fasthttp.RequestCtx) {
|
||||||
d.byAddress(c, request.headObject)
|
d.byAddress(c, headObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeadByAttribute handles attribute-based head requests.
|
// HeadByAttribute handles attribute-based head requests.
|
||||||
func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) {
|
func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
d.byAttribute(c, request.headObject)
|
d.byAttribute(c, headObject)
|
||||||
}
|
}
|
||||||
|
|
4
go.mod
4
go.mod
|
@ -14,6 +14,8 @@ require (
|
||||||
github.com/stretchr/testify v1.8.2
|
github.com/stretchr/testify v1.8.2
|
||||||
github.com/testcontainers/testcontainers-go v0.13.0
|
github.com/testcontainers/testcontainers-go v0.13.0
|
||||||
github.com/valyala/fasthttp v1.34.0
|
github.com/valyala/fasthttp v1.34.0
|
||||||
|
go.opentelemetry.io/otel v1.14.0
|
||||||
|
go.opentelemetry.io/otel/trace v1.14.0
|
||||||
go.uber.org/atomic v1.10.0
|
go.uber.org/atomic v1.10.0
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
)
|
)
|
||||||
|
@ -86,13 +88,11 @@ require (
|
||||||
github.com/urfave/cli v1.22.5 // indirect
|
github.com/urfave/cli v1.22.5 // indirect
|
||||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||||
go.opencensus.io v0.24.0 // indirect
|
go.opencensus.io v0.24.0 // indirect
|
||||||
go.opentelemetry.io/otel v1.14.0 // indirect
|
|
||||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
|
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/trace v1.14.0 // indirect
|
|
||||||
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
||||||
go.uber.org/multierr v1.9.0 // indirect
|
go.uber.org/multierr v1.9.0 // indirect
|
||||||
golang.org/x/crypto v0.4.0 // indirect
|
golang.org/x/crypto v0.4.0 // indirect
|
||||||
|
|
|
@ -47,6 +47,11 @@ const (
|
||||||
cfgPprofEnabled = "pprof.enabled"
|
cfgPprofEnabled = "pprof.enabled"
|
||||||
cfgPprofAddress = "pprof.address"
|
cfgPprofAddress = "pprof.address"
|
||||||
|
|
||||||
|
// Tracing ...
|
||||||
|
cfgTracingEnabled = "tracing.enabled"
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We should add this new config parameters to config example and documentation We should add this new config parameters to [config example](https://git.frostfs.info/dstepanov-yadro/frostfs-http-gw/src/branch/feat/OBJECT-3311/config) and [documentation](https://git.frostfs.info/dstepanov-yadro/frostfs-http-gw/src/branch/feat/OBJECT-3311/docs/gate-configuration.md)
dstepanov-yadro
commented
done done
|
|||||||
|
cfgTracingExporter = "tracing.exporter"
|
||||||
|
cfgTracingEndpoint = "tracing.endpoint"
|
||||||
|
|
||||||
// Pool config.
|
// Pool config.
|
||||||
cfgConTimeout = "connect_timeout"
|
cfgConTimeout = "connect_timeout"
|
||||||
cfgStreamTimeout = "stream_timeout"
|
cfgStreamTimeout = "stream_timeout"
|
||||||
|
|
82
utils/tracing.go
Normal file
82
utils/tracing.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
type httpCarrier struct {
|
||||||
|
r *fasthttp.RequestCtx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpCarrier) Get(key string) string {
|
||||||
|
bytes := c.r.Request.Header.Peek(key)
|
||||||
|
if len(bytes) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return string(bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpCarrier) Set(key string, value string) {
|
||||||
|
c.r.Response.Header.Set(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
No empty line between methods? No empty line between methods?
dstepanov-yadro
commented
fixed fixed
|
|||||||
|
func (c *httpCarrier) Keys() []string {
|
||||||
|
dict := make(map[string]interface{})
|
||||||
|
c.r.Request.Header.VisitAll(
|
||||||
|
func(key, value []byte) {
|
||||||
|
dict[string(key)] = true
|
||||||
|
},
|
||||||
|
)
|
||||||
|
c.r.Response.Header.VisitAll(
|
||||||
|
func(key, value []byte) {
|
||||||
|
dict[string(key)] = true
|
||||||
|
},
|
||||||
|
)
|
||||||
|
result := make([]string, 0, len(dict))
|
||||||
|
for key := range dict {
|
||||||
|
result = append(result, key)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractHTTPTraceInfo(ctx context.Context, req *fasthttp.RequestCtx) context.Context {
|
||||||
|
if req == nil {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
carrier := &httpCarrier{r: req}
|
||||||
|
return tracing.Propagator.Extract(ctx, carrier)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetHTTPTraceInfo saves trace headers to response.
|
||||||
|
func SetHTTPTraceInfo(ctx context.Context, span trace.Span, req *fasthttp.RequestCtx) {
|
||||||
|
if req == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := req.Err(); err != nil {
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
}
|
||||||
|
span.SetAttributes(
|
||||||
|
semconv.HTTPStatusCode(req.Response.StatusCode()),
|
||||||
|
)
|
||||||
|
carrier := &httpCarrier{r: req}
|
||||||
|
tracing.Propagator.Inject(ctx, carrier)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartHTTPServerSpan starts root HTTP server span.
|
||||||
|
func StartHTTPServerSpan(ctx context.Context, req *fasthttp.RequestCtx, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
||||||
|
ctx = extractHTTPTraceInfo(ctx, req)
|
||||||
|
opts = append(opts, trace.WithAttributes(
|
||||||
|
attribute.String("http.client_address", req.RemoteAddr().String()),
|
||||||
|
attribute.String("http.path", string(req.Path())),
|
||||||
|
semconv.HTTPMethod(string(req.Method())),
|
||||||
|
semconv.RPCService("frostfs-http-gw"),
|
||||||
|
), trace.WithSpanKind(trace.SpanKindServer))
|
||||||
|
return tracing.StartSpanFromContext(ctx, operationName, opts...)
|
||||||
|
}
|
Loading…
Reference in a new issue
Should we call
tracing.Shutdown()
neara.stopServices()
call?Sure, fixed