forked from TrueCloudLab/frostfs-http-gw
[#20] get/head: Add tracing support
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
ad05f1eb82
commit
a945cdd42c
9 changed files with 234 additions and 50 deletions
41
app.go
41
app.go
|
@ -10,7 +10,9 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
|
@ -166,6 +168,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
||||||
a.initAppSettings()
|
a.initAppSettings()
|
||||||
a.initResolver()
|
a.initResolver()
|
||||||
a.initMetrics()
|
a.initMetrics()
|
||||||
|
a.initTracing(ctx)
|
||||||
|
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
@ -375,7 +378,7 @@ LOOP:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break LOOP
|
break LOOP
|
||||||
case <-sigs:
|
case <-sigs:
|
||||||
a.configReload()
|
a.configReload(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -383,11 +386,22 @@ LOOP:
|
||||||
|
|
||||||
a.metrics.Shutdown()
|
a.metrics.Shutdown()
|
||||||
a.stopServices()
|
a.stopServices()
|
||||||
|
a.shutdownTracing()
|
||||||
|
|
||||||
close(a.webDone)
|
close(a.webDone)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) configReload() {
|
func (a *app) shutdownTracing() {
|
||||||
|
const tracingShutdownTimeout = 5 * time.Second
|
||||||
|
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := tracing.Shutdown(shdnCtx); err != nil {
|
||||||
|
a.log.Warn("failed to shutdown tracing", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) configReload(ctx context.Context) {
|
||||||
a.log.Info("SIGHUP config reload started")
|
a.log.Info("SIGHUP config reload started")
|
||||||
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
||||||
a.log.Warn("failed to reload config because it's missed")
|
a.log.Warn("failed to reload config because it's missed")
|
||||||
|
@ -418,6 +432,7 @@ func (a *app) configReload() {
|
||||||
a.updateSettings()
|
a.updateSettings()
|
||||||
|
|
||||||
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
||||||
|
a.initTracing(ctx)
|
||||||
a.setHealthStatus()
|
a.setHealthStatus()
|
||||||
|
|
||||||
a.log.Info("SIGHUP config reload completed")
|
a.log.Info("SIGHUP config reload completed")
|
||||||
|
@ -549,3 +564,25 @@ func (a *app) serverIndex(address string) int {
|
||||||
}
|
}
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *app) initTracing(ctx context.Context) {
|
||||||
|
instanceID := ""
|
||||||
|
if len(a.servers) > 0 {
|
||||||
|
instanceID = a.servers[0].Address()
|
||||||
|
}
|
||||||
|
cfg := tracing.Config{
|
||||||
|
Enabled: a.cfg.GetBool(cfgTracingEnabled),
|
||||||
|
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
|
||||||
|
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
|
||||||
|
Service: "frostfs-http-gw",
|
||||||
|
InstanceID: instanceID,
|
||||||
|
Version: Version,
|
||||||
|
}
|
||||||
|
updated, err := tracing.Setup(ctx, cfg)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Warn("failed to initialize tracing", zap.Error(err))
|
||||||
|
}
|
||||||
|
if updated {
|
||||||
|
a.log.Info("tracing config updated")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -92,3 +92,7 @@ HTTP_GW_POOL_ERROR_THRESHOLD=100
|
||||||
|
|
||||||
# Enable zip compression to download files by common prefix.
|
# Enable zip compression to download files by common prefix.
|
||||||
HTTP_GW_ZIP_COMPRESSION=false
|
HTTP_GW_ZIP_COMPRESSION=false
|
||||||
|
|
||||||
|
HTTP_GW_TRACING_ENABLED=true
|
||||||
|
HTTP_GW_TRACING_ENDPOINT="localhost:4317"
|
||||||
|
HTTP_GW_TRACING_EXPORTER="otlp_grpc"
|
|
@ -9,6 +9,10 @@ pprof:
|
||||||
prometheus:
|
prometheus:
|
||||||
enabled: false # Enable metrics.
|
enabled: false # Enable metrics.
|
||||||
address: localhost:8084
|
address: localhost:8084
|
||||||
|
tracing:
|
||||||
|
enabled: true
|
||||||
|
exporter: "otlp_grpc"
|
||||||
|
endpoint: "localhost:4317"
|
||||||
|
|
||||||
logger:
|
logger:
|
||||||
level: debug # Log level.
|
level: debug # Log level.
|
||||||
|
|
|
@ -52,6 +52,7 @@ $ cat http.log
|
||||||
| `zip` | [ZIP configuration](#zip-section) |
|
| `zip` | [ZIP configuration](#zip-section) |
|
||||||
| `pprof` | [Pprof configuration](#pprof-section) |
|
| `pprof` | [Pprof configuration](#pprof-section) |
|
||||||
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
||||||
|
| `tracing` | [Tracing configuration](#tracing-section) |
|
||||||
|
|
||||||
|
|
||||||
# General section
|
# General section
|
||||||
|
@ -238,3 +239,20 @@ prometheus:
|
||||||
|-----------|----------|---------------|------------------|-----------------------------------------|
|
|-----------|----------|---------------|------------------|-----------------------------------------|
|
||||||
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
|
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
|
||||||
| `address` | `string` | yes | `localhost:8084` | Address that service listener binds to. |
|
| `address` | `string` | yes | `localhost:8084` | Address that service listener binds to. |
|
||||||
|
|
||||||
|
# `tracing` section
|
||||||
|
|
||||||
|
Contains configuration for the `tracing` service.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
tracing:
|
||||||
|
enabled: true
|
||||||
|
exporter: "otlp_grpc"
|
||||||
|
endpoint: "localhost:4317"
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
|------------|----------|---------------|------------------|---------------------------------------------------------------|
|
||||||
|
| `enabled` | `bool` | yes | `false` | Flag to enable the tracing. |
|
||||||
|
| `exporter` | `string` | yes | | Trace collector type (`stdout` or `otlp_grpc` are supported). |
|
||||||
|
| `endpoint` | `string` | yes | | Address of collector endpoint for OTLP exporters. |
|
||||||
|
|
|
@ -27,14 +27,15 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type request struct {
|
type request struct {
|
||||||
*fasthttp.RequestCtx
|
*fasthttp.RequestCtx
|
||||||
appCtx context.Context
|
log *zap.Logger
|
||||||
log *zap.Logger
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func isValidToken(s string) bool {
|
func isValidToken(s string) bool {
|
||||||
|
@ -88,40 +89,40 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
|
||||||
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
|
func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
dis = "inline"
|
dis = "inline"
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
filename string
|
filename string
|
||||||
)
|
)
|
||||||
if err = tokens.StoreBearerToken(r.RequestCtx); err != nil {
|
if err = tokens.StoreBearerToken(req.RequestCtx); err != nil {
|
||||||
r.log.Error("could not fetch and store bearer token", zap.Error(err))
|
req.log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||||
response.Error(r.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm pool.PrmObjectGet
|
var prm pool.PrmObjectGet
|
||||||
prm.SetAddress(objectAddress)
|
prm.SetAddress(objectAddress)
|
||||||
if btoken := bearerToken(r.RequestCtx); btoken != nil {
|
if btoken := bearerToken(req.RequestCtx); btoken != nil {
|
||||||
prm.UseBearer(*btoken)
|
prm.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
rObj, err := clnt.GetObject(r.appCtx, prm)
|
rObj, err := clnt.GetObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.handleFrostFSErr(err, start)
|
req.handleFrostFSErr(err, start)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// we can't close reader in this function, so how to do it?
|
// we can't close reader in this function, so how to do it?
|
||||||
|
|
||||||
if r.Request.URI().QueryArgs().GetBool("download") {
|
if req.Request.URI().QueryArgs().GetBool("download") {
|
||||||
dis = "attachment"
|
dis = "attachment"
|
||||||
}
|
}
|
||||||
|
|
||||||
payloadSize := rObj.Header.PayloadSize()
|
payloadSize := rObj.Header.PayloadSize()
|
||||||
|
|
||||||
r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||||
var contentType string
|
var contentType string
|
||||||
for _, attr := range rObj.Header.Attributes() {
|
for _, attr := range rObj.Header.Attributes() {
|
||||||
key := attr.Key()
|
key := attr.Key()
|
||||||
|
@ -132,27 +133,27 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
|
|
||||||
key = utils.BackwardTransformIfSystem(key)
|
key = utils.BackwardTransformIfSystem(key)
|
||||||
|
|
||||||
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||||
switch key {
|
switch key {
|
||||||
case object.AttributeFileName:
|
case object.AttributeFileName:
|
||||||
filename = val
|
filename = val
|
||||||
case object.AttributeTimestamp:
|
case object.AttributeTimestamp:
|
||||||
value, err := strconv.ParseInt(val, 10, 64)
|
value, err := strconv.ParseInt(val, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Info("couldn't parse creation date",
|
req.log.Info("couldn't parse creation date",
|
||||||
zap.String("key", key),
|
zap.String("key", key),
|
||||||
zap.String("val", val),
|
zap.String("val", val),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
r.Response.Header.Set(fasthttp.HeaderLastModified,
|
req.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||||
case object.AttributeContentType:
|
case object.AttributeContentType:
|
||||||
contentType = val
|
contentType = val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
idsToResponse(&r.Response, &rObj.Header)
|
idsToResponse(&req.Response, &rObj.Header)
|
||||||
|
|
||||||
if len(contentType) == 0 {
|
if len(contentType) == 0 {
|
||||||
// determine the Content-Type from the payload head
|
// determine the Content-Type from the payload head
|
||||||
|
@ -162,8 +163,8 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
return rObj.Payload, nil
|
return rObj.Payload, nil
|
||||||
})
|
})
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
r.log.Error("could not detect Content-Type from payload", zap.Error(err))
|
req.log.Error("could not detect Content-Type from payload", zap.Error(err))
|
||||||
response.Error(r.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,11 +179,11 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
// if it implements io.Closer and that's useful for us.
|
// if it implements io.Closer and that's useful for us.
|
||||||
rObj.Payload = readCloser{headReader, rObj.Payload}
|
rObj.Payload = readCloser{headReader, rObj.Payload}
|
||||||
}
|
}
|
||||||
r.SetContentType(contentType)
|
req.SetContentType(contentType)
|
||||||
|
|
||||||
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||||
|
|
||||||
r.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
func bearerToken(ctx context.Context) *bearer.Token {
|
func bearerToken(ctx context.Context) *bearer.Token {
|
||||||
|
@ -240,26 +241,35 @@ func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Down
|
||||||
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
||||||
return &request{
|
return &request{
|
||||||
RequestCtx: ctx,
|
RequestCtx: ctx,
|
||||||
appCtx: d.appCtx,
|
|
||||||
log: log,
|
log: log,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadByAddress handles download requests using simple cid/oid format.
|
// DownloadByAddress handles download requests using simple cid/oid format.
|
||||||
func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
|
func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
|
||||||
d.byAddress(c, request.receiveFile)
|
d.byAddress(c, receiveFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
// prepares request and object address to it.
|
// prepares request and object address to it.
|
||||||
func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) {
|
func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
||||||
var (
|
var (
|
||||||
idCnr, _ = c.UserValue("cid").(string)
|
idCnr, _ = c.UserValue("cid").(string)
|
||||||
idObj, _ = c.UserValue("oid").(string)
|
idObj, _ = c.UserValue("oid").(string)
|
||||||
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
||||||
)
|
)
|
||||||
|
|
||||||
cnrID, err := utils.GetContainerID(d.appCtx, idCnr, d.containerResolver)
|
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("cid", idCnr),
|
||||||
|
attribute.String("oid", idObj),
|
||||||
|
))
|
||||||
|
defer func() {
|
||||||
|
utils.SetHTTPTraceInfo(ctx, span, c)
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
|
||||||
|
cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("wrong container id", zap.Error(err))
|
log.Error("wrong container id", zap.Error(err))
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
@ -277,16 +287,16 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Poo
|
||||||
addr.SetContainer(*cnrID)
|
addr.SetContainer(*cnrID)
|
||||||
addr.SetObject(*objID)
|
addr.SetObject(*objID)
|
||||||
|
|
||||||
f(*d.newRequest(c, log), d.pool, addr)
|
f(ctx, *d.newRequest(c, log), d.pool, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadByAttribute handles attribute-based download requests.
|
// DownloadByAttribute handles attribute-based download requests.
|
||||||
func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
d.byAttribute(c, request.receiveFile)
|
d.byAttribute(c, receiveFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byAttribute is a wrapper similar to byAddress.
|
// byAttribute is a wrapper similar to byAddress.
|
||||||
func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) {
|
func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
||||||
var (
|
var (
|
||||||
scid, _ = c.UserValue("cid").(string)
|
scid, _ = c.UserValue("cid").(string)
|
||||||
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
|
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
|
||||||
|
@ -294,7 +304,18 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P
|
||||||
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||||
)
|
)
|
||||||
|
|
||||||
containerID, err := utils.GetContainerID(d.appCtx, scid, d.containerResolver)
|
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("attr_key", key),
|
||||||
|
attribute.String("attr_val", val),
|
||||||
|
attribute.String("cid", scid),
|
||||||
|
))
|
||||||
|
defer func() {
|
||||||
|
utils.SetHTTPTraceInfo(ctx, span, c)
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
|
||||||
|
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("wrong container id", zap.Error(err))
|
log.Error("wrong container id", zap.Error(err))
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
@ -329,7 +350,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P
|
||||||
addrObj.SetContainer(*containerID)
|
addrObj.SetContainer(*containerID)
|
||||||
addrObj.SetObject(buf[0])
|
addrObj.SetObject(buf[0])
|
||||||
|
|
||||||
f(*d.newRequest(c, log), d.pool, addrObj)
|
f(ctx, *d.newRequest(c, log), d.pool, addrObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package downloader
|
package downloader
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -13,6 +14,8 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -25,15 +28,25 @@ const (
|
||||||
hdrContainerID = "X-Container-Id"
|
hdrContainerID = "X-Container-Id"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
|
func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
|
ctx, span := utils.StartHTTPServerSpan(ctx, req.RequestCtx, "HEAD Object",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("cid", objectAddress.Container().EncodeToString()),
|
||||||
|
attribute.String("oid", objectAddress.Object().EncodeToString()),
|
||||||
|
))
|
||||||
|
defer func() {
|
||||||
|
utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx)
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
|
||||||
var start = time.Now()
|
var start = time.Now()
|
||||||
if err := tokens.StoreBearerToken(r.RequestCtx); err != nil {
|
if err := tokens.StoreBearerToken(req.RequestCtx); err != nil {
|
||||||
r.log.Error("could not fetch and store bearer token", zap.Error(err))
|
req.log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||||
response.Error(r.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest)
|
response.Error(req.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
btoken := bearerToken(r.RequestCtx)
|
btoken := bearerToken(req.RequestCtx)
|
||||||
|
|
||||||
var prm pool.PrmObjectHead
|
var prm pool.PrmObjectHead
|
||||||
prm.SetAddress(objectAddress)
|
prm.SetAddress(objectAddress)
|
||||||
|
@ -41,13 +54,13 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
prm.UseBearer(*btoken)
|
prm.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := clnt.HeadObject(r.appCtx, prm)
|
obj, err := clnt.HeadObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.handleFrostFSErr(err, start)
|
req.handleFrostFSErr(err, start)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
|
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
|
||||||
var contentType string
|
var contentType string
|
||||||
for _, attr := range obj.Attributes() {
|
for _, attr := range obj.Attributes() {
|
||||||
key := attr.Key()
|
key := attr.Key()
|
||||||
|
@ -58,24 +71,24 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
|
|
||||||
key = utils.BackwardTransformIfSystem(key)
|
key = utils.BackwardTransformIfSystem(key)
|
||||||
|
|
||||||
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||||
switch key {
|
switch key {
|
||||||
case object.AttributeTimestamp:
|
case object.AttributeTimestamp:
|
||||||
value, err := strconv.ParseInt(val, 10, 64)
|
value, err := strconv.ParseInt(val, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Info("couldn't parse creation date",
|
req.log.Info("couldn't parse creation date",
|
||||||
zap.String("key", key),
|
zap.String("key", key),
|
||||||
zap.String("val", val),
|
zap.String("val", val),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
r.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||||
case object.AttributeContentType:
|
case object.AttributeContentType:
|
||||||
contentType = val
|
contentType = val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
idsToResponse(&r.Response, &obj)
|
idsToResponse(&req.Response, &obj)
|
||||||
|
|
||||||
if len(contentType) == 0 {
|
if len(contentType) == 0 {
|
||||||
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
|
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
|
||||||
|
@ -86,18 +99,18 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
prmRange.UseBearer(*btoken)
|
prmRange.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
resObj, err := clnt.ObjectRange(r.appCtx, prmRange)
|
resObj, err := clnt.ObjectRange(ctx, prmRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &resObj, nil
|
return &resObj, nil
|
||||||
})
|
})
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
r.handleFrostFSErr(err, start)
|
req.handleFrostFSErr(err, start)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
r.SetContentType(contentType)
|
req.SetContentType(contentType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
||||||
|
@ -110,10 +123,10 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
||||||
|
|
||||||
// HeadByAddress handles head requests using simple cid/oid format.
|
// HeadByAddress handles head requests using simple cid/oid format.
|
||||||
func (d *Downloader) HeadByAddress(c *fasthttp.RequestCtx) {
|
func (d *Downloader) HeadByAddress(c *fasthttp.RequestCtx) {
|
||||||
d.byAddress(c, request.headObject)
|
d.byAddress(c, headObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeadByAttribute handles attribute-based head requests.
|
// HeadByAttribute handles attribute-based head requests.
|
||||||
func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) {
|
func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
d.byAttribute(c, request.headObject)
|
d.byAttribute(c, headObject)
|
||||||
}
|
}
|
||||||
|
|
4
go.mod
4
go.mod
|
@ -14,6 +14,8 @@ require (
|
||||||
github.com/stretchr/testify v1.8.2
|
github.com/stretchr/testify v1.8.2
|
||||||
github.com/testcontainers/testcontainers-go v0.13.0
|
github.com/testcontainers/testcontainers-go v0.13.0
|
||||||
github.com/valyala/fasthttp v1.34.0
|
github.com/valyala/fasthttp v1.34.0
|
||||||
|
go.opentelemetry.io/otel v1.14.0
|
||||||
|
go.opentelemetry.io/otel/trace v1.14.0
|
||||||
go.uber.org/atomic v1.10.0
|
go.uber.org/atomic v1.10.0
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
)
|
)
|
||||||
|
@ -86,13 +88,11 @@ require (
|
||||||
github.com/urfave/cli v1.22.5 // indirect
|
github.com/urfave/cli v1.22.5 // indirect
|
||||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||||
go.opencensus.io v0.24.0 // indirect
|
go.opencensus.io v0.24.0 // indirect
|
||||||
go.opentelemetry.io/otel v1.14.0 // indirect
|
|
||||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
|
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
|
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
|
||||||
go.opentelemetry.io/otel/trace v1.14.0 // indirect
|
|
||||||
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
||||||
go.uber.org/multierr v1.9.0 // indirect
|
go.uber.org/multierr v1.9.0 // indirect
|
||||||
golang.org/x/crypto v0.4.0 // indirect
|
golang.org/x/crypto v0.4.0 // indirect
|
||||||
|
|
|
@ -47,6 +47,11 @@ const (
|
||||||
cfgPprofEnabled = "pprof.enabled"
|
cfgPprofEnabled = "pprof.enabled"
|
||||||
cfgPprofAddress = "pprof.address"
|
cfgPprofAddress = "pprof.address"
|
||||||
|
|
||||||
|
// Tracing ...
|
||||||
|
cfgTracingEnabled = "tracing.enabled"
|
||||||
|
cfgTracingExporter = "tracing.exporter"
|
||||||
|
cfgTracingEndpoint = "tracing.endpoint"
|
||||||
|
|
||||||
// Pool config.
|
// Pool config.
|
||||||
cfgConTimeout = "connect_timeout"
|
cfgConTimeout = "connect_timeout"
|
||||||
cfgStreamTimeout = "stream_timeout"
|
cfgStreamTimeout = "stream_timeout"
|
||||||
|
|
82
utils/tracing.go
Normal file
82
utils/tracing.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
type httpCarrier struct {
|
||||||
|
r *fasthttp.RequestCtx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpCarrier) Get(key string) string {
|
||||||
|
bytes := c.r.Request.Header.Peek(key)
|
||||||
|
if len(bytes) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return string(bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpCarrier) Set(key string, value string) {
|
||||||
|
c.r.Response.Header.Set(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpCarrier) Keys() []string {
|
||||||
|
dict := make(map[string]interface{})
|
||||||
|
c.r.Request.Header.VisitAll(
|
||||||
|
func(key, value []byte) {
|
||||||
|
dict[string(key)] = true
|
||||||
|
},
|
||||||
|
)
|
||||||
|
c.r.Response.Header.VisitAll(
|
||||||
|
func(key, value []byte) {
|
||||||
|
dict[string(key)] = true
|
||||||
|
},
|
||||||
|
)
|
||||||
|
result := make([]string, 0, len(dict))
|
||||||
|
for key := range dict {
|
||||||
|
result = append(result, key)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractHTTPTraceInfo(ctx context.Context, req *fasthttp.RequestCtx) context.Context {
|
||||||
|
if req == nil {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
carrier := &httpCarrier{r: req}
|
||||||
|
return tracing.Propagator.Extract(ctx, carrier)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetHTTPTraceInfo saves trace headers to response.
|
||||||
|
func SetHTTPTraceInfo(ctx context.Context, span trace.Span, req *fasthttp.RequestCtx) {
|
||||||
|
if req == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := req.Err(); err != nil {
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
}
|
||||||
|
span.SetAttributes(
|
||||||
|
semconv.HTTPStatusCode(req.Response.StatusCode()),
|
||||||
|
)
|
||||||
|
carrier := &httpCarrier{r: req}
|
||||||
|
tracing.Propagator.Inject(ctx, carrier)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartHTTPServerSpan starts root HTTP server span.
|
||||||
|
func StartHTTPServerSpan(ctx context.Context, req *fasthttp.RequestCtx, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
||||||
|
ctx = extractHTTPTraceInfo(ctx, req)
|
||||||
|
opts = append(opts, trace.WithAttributes(
|
||||||
|
attribute.String("http.client_address", req.RemoteAddr().String()),
|
||||||
|
attribute.String("http.path", string(req.Path())),
|
||||||
|
semconv.HTTPMethod(string(req.Method())),
|
||||||
|
semconv.RPCService("frostfs-http-gw"),
|
||||||
|
), trace.WithSpanKind(trace.SpanKindServer))
|
||||||
|
return tracing.StartSpanFromContext(ctx, operationName, opts...)
|
||||||
|
}
|
Loading…
Reference in a new issue