GET/HEAD: Add tracing support #20

Merged
alexvanin merged 1 commit from dstepanov-yadro/frostfs-http-gw:feat/OBJECT-3311 into master 2023-05-04 14:01:57 +00:00
9 changed files with 234 additions and 50 deletions

41
app.go
View file

@ -10,7 +10,9 @@ import (
"strconv"
"sync"
"syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
@ -166,6 +168,7 @@ func newApp(ctx context.Context, opt ...Option) App {
a.initAppSettings()
a.initResolver()
a.initMetrics()
alexvanin marked this conversation as resolved Outdated

Should we call tracing.Shutdown() near a.stopServices() call?

Should we call `tracing.Shutdown()` near `a.stopServices()` call?

Sure, fixed

Sure, fixed
a.initTracing(ctx)
return a
}
@ -375,7 +378,7 @@ LOOP:
case <-ctx.Done():
break LOOP
case <-sigs:
a.configReload()
a.configReload(ctx)
}
}
@ -383,11 +386,22 @@ LOOP:
a.metrics.Shutdown()
a.stopServices()
a.shutdownTracing()
close(a.webDone)
}
func (a *app) configReload() {
func (a *app) shutdownTracing() {
dkirillov marked this conversation as resolved Outdated

Can we move this code (that shutdown tracing) to separate function?

Can we move this code (that shutdown tracing) to separate function?

done

done
const tracingShutdownTimeout = 5 * time.Second
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
defer cancel()
if err := tracing.Shutdown(shdnCtx); err != nil {
a.log.Warn("failed to shutdown tracing", zap.Error(err))
}
}
func (a *app) configReload(ctx context.Context) {
a.log.Info("SIGHUP config reload started")
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
a.log.Warn("failed to reload config because it's missed")
@ -418,6 +432,7 @@ func (a *app) configReload() {
a.updateSettings()
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.initTracing(ctx)
a.setHealthStatus()
a.log.Info("SIGHUP config reload completed")
@ -549,3 +564,25 @@ func (a *app) serverIndex(address string) int {
}
return -1
}
func (a *app) initTracing(ctx context.Context) {
instanceID := ""
if len(a.servers) > 0 {
dkirillov marked this conversation as resolved Outdated

Why don't we get context as parameter?

Why don't we get context as parameter?

fixed

fixed
dkirillov marked this conversation as resolved Outdated

Let's write:

if len(a.servers) > 0 {
	instanceID = a.servers[0].Address()
}
Let's write: ``` if len(a.servers) > 0 { instanceID = a.servers[0].Address() } ```

done

done
instanceID = a.servers[0].Address()
}
cfg := tracing.Config{
Enabled: a.cfg.GetBool(cfgTracingEnabled),
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
Service: "frostfs-http-gw",
InstanceID: instanceID,
Version: Version,
}
updated, err := tracing.Setup(ctx, cfg)
if err != nil {
a.log.Warn("failed to initialize tracing", zap.Error(err))
}
if updated {
a.log.Info("tracing config updated")
}
}

View file

@ -92,3 +92,7 @@ HTTP_GW_POOL_ERROR_THRESHOLD=100
# Enable zip compression to download files by common prefix.
HTTP_GW_ZIP_COMPRESSION=false
HTTP_GW_TRACING_ENABLED=true
HTTP_GW_TRACING_ENDPOINT="localhost:4317"
HTTP_GW_TRACING_EXPORTER="otlp_grpc"

View file

@ -9,6 +9,10 @@ pprof:
prometheus:
enabled: false # Enable metrics.
address: localhost:8084
tracing:
enabled: true
exporter: "otlp_grpc"
endpoint: "localhost:4317"
logger:
level: debug # Log level.

View file

@ -52,6 +52,7 @@ $ cat http.log
| `zip` | [ZIP configuration](#zip-section) |
| `pprof` | [Pprof configuration](#pprof-section) |
| `prometheus` | [Prometheus configuration](#prometheus-section) |
| `tracing` | [Tracing configuration](#tracing-section) |
# General section
@ -238,3 +239,20 @@ prometheus:
|-----------|----------|---------------|------------------|-----------------------------------------|
| `enabled` | `bool` | yes | `false` | Flag to enable the service. |
| `address` | `string` | yes | `localhost:8084` | Address that service listener binds to. |
# `tracing` section
Contains configuration for the `tracing` service.
```yaml
tracing:
enabled: true
exporter: "otlp_grpc"
endpoint: "localhost:4317"
dkirillov marked this conversation as resolved Outdated

Is there some default port? If so, let's specify it too

Is there some default port? If so, let's specify it too

done

done
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|------------|----------|---------------|------------------|---------------------------------------------------------------|
| `enabled` | `bool` | yes | `false` | Flag to enable the tracing. |
| `exporter` | `string` | yes | | Trace collector type (`stdout` or `otlp_grpc` are supported). |
| `endpoint` | `string` | yes | | Address of collector endpoint for OTLP exporters. |

View file

@ -27,14 +27,15 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
)
type request struct {
*fasthttp.RequestCtx
appCtx context.Context
log *zap.Logger
log *zap.Logger
}
func isValidToken(s string) bool {
@ -88,40 +89,40 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
return http.DetectContentType(buf), buf, err // to not lose io.EOF
}
func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
var (
dkirillov marked this conversation as resolved Outdated

With changing signature we can drop appCtx from request

With changing signature we can drop `appCtx` from `request`

Done

Done
err error
dis = "inline"
start = time.Now()
filename string
)
if err = tokens.StoreBearerToken(r.RequestCtx); err != nil {
r.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(r.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
if err = tokens.StoreBearerToken(req.RequestCtx); err != nil {
req.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(req.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
return
}
var prm pool.PrmObjectGet
prm.SetAddress(objectAddress)
if btoken := bearerToken(r.RequestCtx); btoken != nil {
if btoken := bearerToken(req.RequestCtx); btoken != nil {
prm.UseBearer(*btoken)
}
rObj, err := clnt.GetObject(r.appCtx, prm)
rObj, err := clnt.GetObject(ctx, prm)
if err != nil {
r.handleFrostFSErr(err, start)
req.handleFrostFSErr(err, start)
return
}
// we can't close reader in this function, so how to do it?
if r.Request.URI().QueryArgs().GetBool("download") {
if req.Request.URI().QueryArgs().GetBool("download") {
dis = "attachment"
}
payloadSize := rObj.Header.PayloadSize()
r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
var contentType string
for _, attr := range rObj.Header.Attributes() {
key := attr.Key()
@ -132,27 +133,27 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
key = utils.BackwardTransformIfSystem(key)
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
switch key {
case object.AttributeFileName:
filename = val
case object.AttributeTimestamp:
value, err := strconv.ParseInt(val, 10, 64)
if err != nil {
r.log.Info("couldn't parse creation date",
req.log.Info("couldn't parse creation date",
zap.String("key", key),
zap.String("val", val),
zap.Error(err))
continue
}
r.Response.Header.Set(fasthttp.HeaderLastModified,
req.Response.Header.Set(fasthttp.HeaderLastModified,
time.Unix(value, 0).UTC().Format(http.TimeFormat))
case object.AttributeContentType:
contentType = val
}
}
idsToResponse(&r.Response, &rObj.Header)
idsToResponse(&req.Response, &rObj.Header)
if len(contentType) == 0 {
// determine the Content-Type from the payload head
@ -162,8 +163,8 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
return rObj.Payload, nil
})
if err != nil && err != io.EOF {
r.log.Error("could not detect Content-Type from payload", zap.Error(err))
response.Error(r.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
req.log.Error("could not detect Content-Type from payload", zap.Error(err))
response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
return
}
@ -178,11 +179,11 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress oid.Address) {
// if it implements io.Closer and that's useful for us.
rObj.Payload = readCloser{headReader, rObj.Payload}
}
r.SetContentType(contentType)
req.SetContentType(contentType)
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
r.Response.SetBodyStream(rObj.Payload, int(payloadSize))
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
}
func bearerToken(ctx context.Context) *bearer.Token {
@ -240,26 +241,35 @@ func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Down
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
return &request{
RequestCtx: ctx,
appCtx: d.appCtx,
log: log,
}
}
// DownloadByAddress handles download requests using simple cid/oid format.
func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
d.byAddress(c, request.receiveFile)
d.byAddress(c, receiveFile)
}
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it.
func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) {
func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
var (
idCnr, _ = c.UserValue("cid").(string)
idObj, _ = c.UserValue("oid").(string)
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
)
cnrID, err := utils.GetContainerID(d.appCtx, idCnr, d.containerResolver)
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
trace.WithAttributes(
attribute.String("cid", idCnr),
attribute.String("oid", idObj),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, c)
span.End()
}()
cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver)
if err != nil {
log.Error("wrong container id", zap.Error(err))
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
@ -277,16 +287,16 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(request, *pool.Poo
addr.SetContainer(*cnrID)
addr.SetObject(*objID)
f(*d.newRequest(c, log), d.pool, addr)
f(ctx, *d.newRequest(c, log), d.pool, addr)
}
// DownloadByAttribute handles attribute-based download requests.
func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
d.byAttribute(c, request.receiveFile)
d.byAttribute(c, receiveFile)
}
// byAttribute is a wrapper similar to byAddress.
func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.Pool, oid.Address)) {
func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
var (
scid, _ = c.UserValue("cid").(string)
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
@ -294,7 +304,18 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
)
containerID, err := utils.GetContainerID(d.appCtx, scid, d.containerResolver)
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
trace.WithAttributes(
attribute.String("attr_key", key),
attribute.String("attr_val", val),
attribute.String("cid", scid),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, c)
span.End()
}()
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
if err != nil {
log.Error("wrong container id", zap.Error(err))
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
@ -329,7 +350,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(request, *pool.P
addrObj.SetContainer(*containerID)
addrObj.SetObject(buf[0])
f(*d.newRequest(c, log), d.pool, addrObj)
f(ctx, *d.newRequest(c, log), d.pool, addrObj)
}
func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {

View file

@ -1,6 +1,7 @@
package downloader
import (
"context"
"io"
"net/http"
"strconv"
@ -13,6 +14,8 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -25,15 +28,25 @@ const (
hdrContainerID = "X-Container-Id"
)
func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
ctx, span := utils.StartHTTPServerSpan(ctx, req.RequestCtx, "HEAD Object",
trace.WithAttributes(
attribute.String("cid", objectAddress.Container().EncodeToString()),
attribute.String("oid", objectAddress.Object().EncodeToString()),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx)
span.End()
}()
var start = time.Now()
if err := tokens.StoreBearerToken(r.RequestCtx); err != nil {
r.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(r.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest)
if err := tokens.StoreBearerToken(req.RequestCtx); err != nil {
req.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(req.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest)
return
}
btoken := bearerToken(r.RequestCtx)
btoken := bearerToken(req.RequestCtx)
var prm pool.PrmObjectHead
prm.SetAddress(objectAddress)
@ -41,13 +54,13 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
prm.UseBearer(*btoken)
}
obj, err := clnt.HeadObject(r.appCtx, prm)
obj, err := clnt.HeadObject(ctx, prm)
if err != nil {
r.handleFrostFSErr(err, start)
req.handleFrostFSErr(err, start)
return
}
r.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
var contentType string
for _, attr := range obj.Attributes() {
key := attr.Key()
@ -58,24 +71,24 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
key = utils.BackwardTransformIfSystem(key)
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
switch key {
case object.AttributeTimestamp:
value, err := strconv.ParseInt(val, 10, 64)
if err != nil {
r.log.Info("couldn't parse creation date",
req.log.Info("couldn't parse creation date",
zap.String("key", key),
zap.String("val", val),
zap.Error(err))
continue
}
r.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
case object.AttributeContentType:
contentType = val
}
}
idsToResponse(&r.Response, &obj)
idsToResponse(&req.Response, &obj)
if len(contentType) == 0 {
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
@ -86,18 +99,18 @@ func (r request) headObject(clnt *pool.Pool, objectAddress oid.Address) {
prmRange.UseBearer(*btoken)
}
resObj, err := clnt.ObjectRange(r.appCtx, prmRange)
resObj, err := clnt.ObjectRange(ctx, prmRange)
if err != nil {
return nil, err
}
return &resObj, nil
})
if err != nil && err != io.EOF {
r.handleFrostFSErr(err, start)
req.handleFrostFSErr(err, start)
return
}
}
r.SetContentType(contentType)
req.SetContentType(contentType)
}
func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
@ -110,10 +123,10 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
// HeadByAddress handles head requests using simple cid/oid format.
func (d *Downloader) HeadByAddress(c *fasthttp.RequestCtx) {
d.byAddress(c, request.headObject)
d.byAddress(c, headObject)
}
// HeadByAttribute handles attribute-based head requests.
func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) {
d.byAttribute(c, request.headObject)
d.byAttribute(c, headObject)
}

4
go.mod
View file

@ -14,6 +14,8 @@ require (
github.com/stretchr/testify v1.8.2
github.com/testcontainers/testcontainers-go v0.13.0
github.com/valyala/fasthttp v1.34.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0
)
@ -86,13 +88,11 @@ require (
github.com/urfave/cli v1.22.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.4.0 // indirect

View file

@ -47,6 +47,11 @@ const (
cfgPprofEnabled = "pprof.enabled"
cfgPprofAddress = "pprof.address"
// Tracing ...
cfgTracingEnabled = "tracing.enabled"
dkirillov marked this conversation as resolved Outdated

We should add this new config parameters to config example and documentation

We should add this new config parameters to [config example](https://git.frostfs.info/dstepanov-yadro/frostfs-http-gw/src/branch/feat/OBJECT-3311/config) and [documentation](https://git.frostfs.info/dstepanov-yadro/frostfs-http-gw/src/branch/feat/OBJECT-3311/docs/gate-configuration.md)

done

done
cfgTracingExporter = "tracing.exporter"
cfgTracingEndpoint = "tracing.endpoint"
// Pool config.
cfgConTimeout = "connect_timeout"
cfgStreamTimeout = "stream_timeout"

82
utils/tracing.go Normal file
View file

@ -0,0 +1,82 @@
package utils
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
type httpCarrier struct {
r *fasthttp.RequestCtx
}
func (c *httpCarrier) Get(key string) string {
bytes := c.r.Request.Header.Peek(key)
if len(bytes) == 0 {
return ""
}
return string(bytes)
}
func (c *httpCarrier) Set(key string, value string) {
c.r.Response.Header.Set(key, value)
}
alexvanin marked this conversation as resolved Outdated

No empty line between methods?

No empty line between methods?

fixed

fixed
func (c *httpCarrier) Keys() []string {
dict := make(map[string]interface{})
c.r.Request.Header.VisitAll(
func(key, value []byte) {
dict[string(key)] = true
},
)
c.r.Response.Header.VisitAll(
func(key, value []byte) {
dict[string(key)] = true
},
)
result := make([]string, 0, len(dict))
for key := range dict {
result = append(result, key)
}
return result
}
func extractHTTPTraceInfo(ctx context.Context, req *fasthttp.RequestCtx) context.Context {
if req == nil {
return ctx
}
carrier := &httpCarrier{r: req}
return tracing.Propagator.Extract(ctx, carrier)
}
// SetHTTPTraceInfo saves trace headers to response.
func SetHTTPTraceInfo(ctx context.Context, span trace.Span, req *fasthttp.RequestCtx) {
if req == nil {
return
}
if err := req.Err(); err != nil {
span.SetStatus(codes.Error, err.Error())
}
span.SetAttributes(
semconv.HTTPStatusCode(req.Response.StatusCode()),
)
carrier := &httpCarrier{r: req}
tracing.Propagator.Inject(ctx, carrier)
}
// StartHTTPServerSpan starts root HTTP server span.
func StartHTTPServerSpan(ctx context.Context, req *fasthttp.RequestCtx, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
ctx = extractHTTPTraceInfo(ctx, req)
opts = append(opts, trace.WithAttributes(
attribute.String("http.client_address", req.RemoteAddr().String()),
attribute.String("http.path", string(req.Path())),
semconv.HTTPMethod(string(req.Method())),
semconv.RPCService("frostfs-http-gw"),
), trace.WithSpanKind(trace.SpanKindServer))
return tracing.StartSpanFromContext(ctx, operationName, opts...)
}