[#44] add tracing support refactoring #53

Merged
alexvanin merged 1 commit from pogpp/frostfs-http-gw:feature/tracing_refactoring into master 2023-05-31 12:56:13 +00:00
9 changed files with 78 additions and 137 deletions

62
app.go
View file

@ -17,6 +17,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
@ -37,6 +38,7 @@ import (
type ( type (
app struct { app struct {
ctx context.Context
log *zap.Logger log *zap.Logger
logLevel zap.AtomicLevel logLevel zap.AtomicLevel
pool *pool.Pool pool *pool.Pool
@ -60,7 +62,7 @@ type (
// App is an interface for the main gateway function. // App is an interface for the main gateway function.
App interface { App interface {
Wait() Wait()
Serve(context.Context) Serve()
} }
// Option is an application option. // Option is an application option.
@ -101,6 +103,7 @@ func newApp(ctx context.Context, opt ...Option) App {
) )
a := &app{ a := &app{
ctx: ctx,

If we store appCtx to this, we now can:

  • drop appCtx from here and here
  • don't provide context here (use a.ctx inside a.initTracing, a.initTree, a.initServers and other similar places)
If we store appCtx to this, we now can: * drop `appCtx` from [here](https://git.frostfs.info/pogpp/frostfs-http-gw/src/commit/70cbf5285cce99a35139f722b091ebbacad3646a/uploader/upload.go#L32) and [here](https://git.frostfs.info/pogpp/frostfs-http-gw/src/commit/70cbf5285cce99a35139f722b091ebbacad3646a/downloader/download.go#L204) * don't provide context [here](https://git.frostfs.info/pogpp/frostfs-http-gw/src/commit/70cbf5285cce99a35139f722b091ebbacad3646a/main.go#L15) (use `a.ctx` inside `a.initTracing`, `a.initTree`, `a.initServers` and other similar places)
log: zap.L(), log: zap.L(),
cfg: viper.GetViper(), cfg: viper.GetViper(),
webServer: new(fasthttp.Server), webServer: new(fasthttp.Server),
@ -353,16 +356,16 @@ func (a *app) setHealthStatus() {
a.metrics.SetHealth(metrics.HealthStatusReady) a.metrics.SetHealth(metrics.HealthStatusReady)
} }
func (a *app) Serve(ctx context.Context) { func (a *app) Serve() {
treeClient := a.initTree(ctx) treeClient := a.initTree(a.ctx)
uploadRoutes := uploader.New(ctx, a.AppParams(), a.settings.Uploader) uploadRoutes := uploader.New(a.AppParams(), a.settings.Uploader)
downloadRoutes := downloader.New(ctx, a.AppParams(), a.settings.Downloader, treeClient) downloadRoutes := downloader.New(a.AppParams(), a.settings.Downloader, treeClient)
// Configure router. // Configure router.
a.configureRouter(uploadRoutes, downloadRoutes) a.configureRouter(uploadRoutes, downloadRoutes)
a.startServices() a.startServices()
a.initServers(ctx) a.initServers(a.ctx)
for i := range a.servers { for i := range a.servers {
go func(i int) { go func(i int) {
@ -379,10 +382,10 @@ func (a *app) Serve(ctx context.Context) {
LOOP: LOOP:
for { for {
select { select {
case <-ctx.Done(): case <-a.ctx.Done():
break LOOP break LOOP
case <-sigs: case <-sigs:
a.configReload(ctx) a.configReload(a.ctx)
} }
} }
@ -477,28 +480,55 @@ func (a *app) configureRouter(uploadRoutes *uploader.Uploader, downloadRoutes *d
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) { r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed) response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
} }
r.POST("/upload/{cid}", a.logger(uploadRoutes.Upload)) r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(uploadRoutes.Upload))))
a.log.Info("added path /upload/{cid}") a.log.Info("added path /upload/{cid}")
r.GET("/get/{cid}/{oid:*}", a.logger(downloadRoutes.DownloadByAddressOrBucketName)) r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadByAddressOrBucketName))))
r.HEAD("/get/{cid}/{oid:*}", a.logger(downloadRoutes.HeadByAddressOrBucketName)) r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.HeadByAddressOrBucketName))))
a.log.Info("added path /get/{cid}/{oid}") a.log.Info("added path /get/{cid}/{oid}")
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(downloadRoutes.DownloadByAttribute)) r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadByAttribute))))
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(downloadRoutes.HeadByAttribute)) r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.HeadByAttribute))))
a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}") a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}")
r.GET("/zip/{cid}/{prefix:*}", a.logger(downloadRoutes.DownloadZipped)) r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadZipped))))
a.log.Info("added path /zip/{cid}/{prefix}") a.log.Info("added path /zip/{cid}/{prefix}")
a.webServer.Handler = r.Handler a.webServer.Handler = r.Handler
} }
func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler { func (a *app) logger(req fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(ctx *fasthttp.RequestCtx) { return func(ctx *fasthttp.RequestCtx) {
a.log.Info("request", zap.String("remote", ctx.RemoteAddr().String()), a.log.Info("request", zap.String("remote", ctx.RemoteAddr().String()),
zap.ByteString("method", ctx.Method()), zap.ByteString("method", ctx.Method()),
zap.ByteString("path", ctx.Path()), zap.ByteString("path", ctx.Path()),
zap.ByteString("query", ctx.QueryArgs().QueryString()), zap.ByteString("query", ctx.QueryArgs().QueryString()),
zap.Uint64("id", ctx.ID())) zap.Uint64("id", ctx.ID()))
h(ctx) req(ctx)
}
}
func (a *app) tokenizer(req fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(ctx *fasthttp.RequestCtx) {

Since we started to rename variables with type *fasthttp.RequestCtx to req let's stick on it.

Since we started to rename variables with type `*fasthttp.RequestCtx` to `req` let's stick on it.
appCtx, err := tokens.StoreBearerTokenAppCtx(ctx, a.ctx)
if err != nil {
a.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(ctx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
}
utils.SetContextToRequest(appCtx, ctx)
req(ctx)
}
}
func (a *app) tracer(req fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(ctx *fasthttp.RequestCtx) {

Since we started to rename variables with type *fasthttp.RequestCtx to req let's stick on it.

Since we started to rename variables with type `*fasthttp.RequestCtx` to `req` let's stick on it.
appCtx := utils.GetContextFromRequest(ctx)
appCtx, span := utils.StartHTTPServerSpan(appCtx, ctx, "REQUEST")

Maybe we should write REQUEST to http-gw instead of OPERATION With Object?

Maybe we should write `REQUEST to http-gw` instead of `OPERATION With Object`?
defer func() {
utils.SetHTTPTraceInfo(appCtx, span, ctx)
span.End()
}()
utils.SetContextToRequest(appCtx, ctx)
req(ctx)
} }
} }

View file

@ -28,8 +28,6 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -97,15 +95,10 @@ func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddres
start = time.Now() start = time.Now()
filename string filename string
) )
if err = tokens.StoreBearerToken(req.RequestCtx); err != nil {
req.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(req.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
return
}
var prm pool.PrmObjectGet var prm pool.PrmObjectGet
prm.SetAddress(objectAddress) prm.SetAddress(objectAddress)
if btoken := bearerToken(req.RequestCtx); btoken != nil { if btoken := bearerToken(ctx); btoken != nil {
prm.UseBearer(*btoken) prm.UseBearer(*btoken)
} }
@ -208,7 +201,6 @@ func (r *request) handleFrostFSErr(err error, start time.Time) {
// Downloader is a download request handler. // Downloader is a download request handler.
type Downloader struct { type Downloader struct {
appCtx context.Context
log *zap.Logger log *zap.Logger
pool *pool.Pool pool *pool.Pool
containerResolver *resolver.ContainerResolver containerResolver *resolver.ContainerResolver
@ -230,9 +222,8 @@ func (s *Settings) SetZipCompression(val bool) {
} }
// New creates an instance of Downloader using specified options. // New creates an instance of Downloader using specified options.
func New(ctx context.Context, params *utils.AppParams, settings *Settings, tree *tree.Tree) *Downloader { func New(params *utils.AppParams, settings *Settings, tree *tree.Tree) *Downloader {
return &Downloader{ return &Downloader{
appCtx: ctx,
log: params.Logger, log: params.Logger,
pool: params.Pool, pool: params.Pool,
settings: settings, settings: settings,
@ -269,15 +260,7 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, r
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj)) log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
) )
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object", ctx := utils.GetContextFromRequest(c)
trace.WithAttributes(
attribute.String("cid", idCnr),
attribute.String("oid", idObj),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, c)
span.End()
}()
cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver) cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver)
if err != nil { if err != nil {
@ -309,22 +292,7 @@ func (d *Downloader) byBucketname(req *fasthttp.RequestCtx, f func(context.Conte
log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key)) log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
) )
ctx, err := tokens.StoreBearerTokenAppCtx(req, d.appCtx) ctx := utils.GetContextFromRequest(req)
if err != nil {
log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
return
}
ctx, span := utils.StartHTTPServerSpan(ctx, req, "GET Object by bucket name",
trace.WithAttributes(
attribute.String("bucketname", bucketname),
attribute.String("objectKey", key),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, req)
span.End()
}()
cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver) cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver)
if err != nil { if err != nil {
@ -366,16 +334,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
) )
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object", ctx := utils.GetContextFromRequest(c)
trace.WithAttributes(
attribute.String("attr_key", key),
attribute.String("attr_val", val),
attribute.String("cid", scid),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, c)
span.End()
}()
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver) containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
if err != nil { if err != nil {
@ -384,7 +343,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
return return
} }
res, err := d.search(c, ctx, containerID, key, val, object.MatchStringEqual) res, err := d.search(ctx, containerID, key, val, object.MatchStringEqual)
if err != nil { if err != nil {
log.Error("could not search for objects", zap.Error(err)) log.Error("could not search for objects", zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
@ -415,7 +374,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
f(ctx, *d.newRequest(c, log), d.pool, addrObj) f(ctx, *d.newRequest(c, log), d.pool, addrObj)
} }
func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) { func (d *Downloader) search(ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
filters := object.NewSearchFilters() filters := object.NewSearchFilters()
filters.AddRootFilter() filters.AddRootFilter()
filters.AddFilter(key, val, op) filters.AddFilter(key, val, op)
@ -423,7 +382,7 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *ci
var prm pool.PrmObjectSearch var prm pool.PrmObjectSearch
prm.SetContainerID(*cid) prm.SetContainerID(*cid)
prm.SetFilters(filters) prm.SetFilters(filters)
if btoken := bearerToken(c); btoken != nil { if btoken := bearerToken(ctx); btoken != nil {
prm.UseBearer(*btoken) prm.UseBearer(*btoken)
} }
@ -461,15 +420,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string)) prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix)) log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "DOWNLOAD ZIP Object", ctx := utils.GetContextFromRequest(c)
trace.WithAttributes(
attribute.String("prefix", prefix),
attribute.String("cid", scid),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, c)
span.End()
}()
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver) containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
if err != nil { if err != nil {
@ -478,12 +429,6 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
return return
} }
if err = tokens.StoreBearerToken(c); err != nil {
log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(c, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
return
}
// check if container exists here to be able to return 404 error, // check if container exists here to be able to return 404 error,
// otherwise we get this error only in object iteration step // otherwise we get this error only in object iteration step
// and client get 200 OK. // and client get 200 OK.
@ -497,7 +442,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
return return
} }
resSearch, err := d.search(c, ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) resSearch, err := d.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil { if err != nil {
log.Error("could not search for objects", zap.Error(err)) log.Error("could not search for objects", zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
@ -518,7 +463,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
empty := true empty := true
called := false called := false
btoken := bearerToken(c) btoken := bearerToken(ctx)
addr.SetContainer(*containerID) addr.SetContainer(*containerID)
errIter := resSearch.Iterate(func(id oid.ID) bool { errIter := resSearch.Iterate(func(id oid.ID) bool {

View file

@ -7,16 +7,11 @@ import (
"strconv" "strconv"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -30,23 +25,9 @@ const (
) )
func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) { func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
ctx, span := tracing.StartSpanFromContext(ctx, "HEAD Object", trace.WithAttributes(
attribute.String("cid", objectAddress.Container().EncodeToString()),
attribute.String("oid", objectAddress.Object().EncodeToString()),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx)
span.End()
}()
var start = time.Now() var start = time.Now()
if err := tokens.StoreBearerToken(req.RequestCtx); err != nil {
req.log.Error("could not fetch and store bearer token", zap.Error(err))
response.Error(req.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest)
return
}
btoken := bearerToken(req.RequestCtx) btoken := bearerToken(ctx)
var prm pool.PrmObjectHead var prm pool.PrmObjectHead
prm.SetAddress(objectAddress) prm.SetAddress(objectAddress)

View file

@ -12,6 +12,6 @@ func main() {
logger, atomicLevel := newLogger(v) logger, atomicLevel := newLogger(v)
application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v)) application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v))
go application.Serve(globalContext) go application.Serve()
application.Wait() application.Wait()
} }

View file

@ -48,18 +48,6 @@ func BearerTokenFromCookie(h *fasthttp.RequestHeader) []byte {
return auth return auth
} }
// StoreBearerToken extracts a bearer token from the header or cookie and stores
// it in the request context.
func StoreBearerToken(ctx *fasthttp.RequestCtx) error {
tkn, err := fetchBearerToken(ctx)
if err != nil {
return err
}
// This is an analog of context.WithValue.
ctx.SetUserValue(bearerTokenKey, tkn)
return nil
}
// StoreBearerTokenAppCtx extracts a bearer token from the header or cookie and stores // StoreBearerTokenAppCtx extracts a bearer token from the header or cookie and stores
// it in the application context. // it in the application context.
func StoreBearerTokenAppCtx(ctx *fasthttp.RequestCtx, appCtx context.Context) (context.Context, error) { func StoreBearerTokenAppCtx(ctx *fasthttp.RequestCtx, appCtx context.Context) (context.Context, error) {

View file

@ -3,6 +3,7 @@
package tokens package tokens
import ( import (
"context"
"encoding/base64" "encoding/base64"
"testing" "testing"
@ -153,10 +154,11 @@ func Test_checkAndPropagateBearerToken(t *testing.T) {
ctx := makeTestRequest(t64, "") ctx := makeTestRequest(t64, "")
// Expect to see the token within the context. // Expect to see the token within the context.
require.NoError(t, StoreBearerToken(ctx)) appCtx, err := StoreBearerTokenAppCtx(ctx, context.Background())
require.NoError(t, err)
// Expect to see the same token without errors. // Expect to see the same token without errors.
actual, err := LoadBearerToken(ctx) actual, err := LoadBearerToken(appCtx)
alexvanin marked this conversation as resolved Outdated

We can replace StoreBearerToken call with StoreBearerTokenAppCtx.

	// Expect to see the token within the context.
	appCtx, err := StoreBearerTokenAppCtx(ctx, context.Background())
	require.NoError(t, err)

	// Expect to see the same token without errors.
	actual, err := LoadBearerToken(appCtx)
	require.NoError(t, err)
	require.Equal(t, tkn, actual)
We can replace `StoreBearerToken` call with `StoreBearerTokenAppCtx`. ```go // Expect to see the token within the context. appCtx, err := StoreBearerTokenAppCtx(ctx, context.Background()) require.NoError(t, err) // Expect to see the same token without errors. actual, err := LoadBearerToken(appCtx) require.NoError(t, err) require.Equal(t, tkn, actual) ```
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tkn, actual) require.Equal(t, tkn, actual)
} }

View file

@ -18,8 +18,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -31,7 +29,6 @@ const (
// Uploader is an upload request handler. // Uploader is an upload request handler.
type Uploader struct { type Uploader struct {
appCtx context.Context
log *zap.Logger log *zap.Logger
pool *pool.Pool pool *pool.Pool
ownerID *user.ID ownerID *user.ID
@ -54,9 +51,8 @@ func (s *Settings) SetDefaultTimestamp(val bool) {
// New creates a new Uploader using specified logger, connection pool and // New creates a new Uploader using specified logger, connection pool and
// other options. // other options.
func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Uploader { func New(params *utils.AppParams, settings *Settings) *Uploader {
return &Uploader{ return &Uploader{
appCtx: ctx,
log: params.Logger, log: params.Logger,
pool: params.Pool, pool: params.Pool,
ownerID: params.Owner, ownerID: params.Owner,
@ -77,20 +73,7 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
drainBuf = make([]byte, drainBufSize) drainBuf = make([]byte, drainBufSize)
) )
if err := tokens.StoreBearerToken(req); err != nil { ctx := utils.GetContextFromRequest(req)
log.Error("could not fetch bearer token", zap.Error(err))
response.Error(req, "could not fetch bearer token", fasthttp.StatusBadRequest)
return
}
ctx, span := utils.StartHTTPServerSpan(u.appCtx, req, "UPLOAD Object",
trace.WithAttributes(
attribute.String("cid", scid),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, req)
span.End()
}()
idCnr, err := utils.GetContainerID(ctx, scid, u.containerResolver) idCnr, err := utils.GetContainerID(ctx, scid, u.containerResolver)
if err != nil { if err != nil {
@ -162,7 +145,7 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
attributes = append(attributes, *timestamp) attributes = append(attributes, *timestamp)
} }
id, bt := u.fetchOwnerAndBearerToken(req) id, bt := u.fetchOwnerAndBearerToken(ctx)
obj := object.New() obj := object.New()
obj.SetContainerID(*idCnr) obj.SetContainerID(*idCnr)

View file

@ -77,6 +77,7 @@ func StartHTTPServerSpan(ctx context.Context, req *fasthttp.RequestCtx, operatio
attribute.String("http.path", string(req.Path())), attribute.String("http.path", string(req.Path())),
semconv.HTTPMethod(string(req.Method())), semconv.HTTPMethod(string(req.Method())),
semconv.RPCService("frostfs-http-gw"), semconv.RPCService("frostfs-http-gw"),
attribute.String("http.query", req.QueryArgs().String()),
), trace.WithSpanKind(trace.SpanKindServer)) ), trace.WithSpanKind(trace.SpanKindServer))
return tracing.StartSpanFromContext(ctx, operationName, opts...) return tracing.StartSpanFromContext(ctx, operationName, opts...)
} }

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp"
) )
// GetContainerID decode container id, if it's not a valid container id // GetContainerID decode container id, if it's not a valid container id
@ -43,3 +44,13 @@ func GetEpochDurations(ctx context.Context, p *pool.Pool) (*EpochDurations, erro
} }
return res, nil return res, nil
} }
// SetContextToRequest adds new context to fasthttp request.
func SetContextToRequest(ctx context.Context, c *fasthttp.RequestCtx) {
alexvanin marked this conversation as resolved Outdated

Put context into constant. Also it would be nice to add comments to a public functions in the package!

// AddToContext adds new context to fasthttp request.
Put `context` into constant. Also it would be nice to add comments to a public functions in the package! ``` // AddToContext adds new context to fasthttp request. ```
c.SetUserValue("context", ctx)
}
// GetContextFromRequest returns main context from fasthttp request context.
func GetContextFromRequest(c *fasthttp.RequestCtx) context.Context {

Let's rename these functions to

  • GetContext and SetContext or
  • GetFromRequest and SetToRequest or
  • GetContextFromRequest and SetContextToRequest
    ?
Let's rename these functions to * `GetContext` and `SetContext` or * `GetFromRequest` and `SetToRequest` or * `GetContextFromRequest` and `SetContextToRequest` ?

Last one seems good to me.

Last one seems good to me.
return c.UserValue("context").(context.Context)
}