[#44] add tracing support refactoring #53
9 changed files with 78 additions and 137 deletions
62
app.go
62
app.go
|
@ -17,6 +17,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
|
@ -37,6 +38,7 @@ import (
|
||||||
|
|
||||||
type (
|
type (
|
||||||
app struct {
|
app struct {
|
||||||
|
ctx context.Context
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
logLevel zap.AtomicLevel
|
logLevel zap.AtomicLevel
|
||||||
pool *pool.Pool
|
pool *pool.Pool
|
||||||
|
@ -60,7 +62,7 @@ type (
|
||||||
// App is an interface for the main gateway function.
|
// App is an interface for the main gateway function.
|
||||||
App interface {
|
App interface {
|
||||||
Wait()
|
Wait()
|
||||||
Serve(context.Context)
|
Serve()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option is an application option.
|
// Option is an application option.
|
||||||
|
@ -101,6 +103,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
||||||
)
|
)
|
||||||
|
|
||||||
a := &app{
|
a := &app{
|
||||||
|
ctx: ctx,
|
||||||
|
|||||||
log: zap.L(),
|
log: zap.L(),
|
||||||
cfg: viper.GetViper(),
|
cfg: viper.GetViper(),
|
||||||
webServer: new(fasthttp.Server),
|
webServer: new(fasthttp.Server),
|
||||||
|
@ -353,16 +356,16 @@ func (a *app) setHealthStatus() {
|
||||||
a.metrics.SetHealth(metrics.HealthStatusReady)
|
a.metrics.SetHealth(metrics.HealthStatusReady)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) Serve(ctx context.Context) {
|
func (a *app) Serve() {
|
||||||
treeClient := a.initTree(ctx)
|
treeClient := a.initTree(a.ctx)
|
||||||
uploadRoutes := uploader.New(ctx, a.AppParams(), a.settings.Uploader)
|
uploadRoutes := uploader.New(a.AppParams(), a.settings.Uploader)
|
||||||
downloadRoutes := downloader.New(ctx, a.AppParams(), a.settings.Downloader, treeClient)
|
downloadRoutes := downloader.New(a.AppParams(), a.settings.Downloader, treeClient)
|
||||||
|
|
||||||
// Configure router.
|
// Configure router.
|
||||||
a.configureRouter(uploadRoutes, downloadRoutes)
|
a.configureRouter(uploadRoutes, downloadRoutes)
|
||||||
|
|
||||||
a.startServices()
|
a.startServices()
|
||||||
a.initServers(ctx)
|
a.initServers(a.ctx)
|
||||||
|
|
||||||
for i := range a.servers {
|
for i := range a.servers {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
|
@ -379,10 +382,10 @@ func (a *app) Serve(ctx context.Context) {
|
||||||
LOOP:
|
LOOP:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-a.ctx.Done():
|
||||||
break LOOP
|
break LOOP
|
||||||
case <-sigs:
|
case <-sigs:
|
||||||
a.configReload(ctx)
|
a.configReload(a.ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,28 +480,55 @@ func (a *app) configureRouter(uploadRoutes *uploader.Uploader, downloadRoutes *d
|
||||||
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
|
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
|
||||||
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||||||
}
|
}
|
||||||
r.POST("/upload/{cid}", a.logger(uploadRoutes.Upload))
|
r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(uploadRoutes.Upload))))
|
||||||
a.log.Info("added path /upload/{cid}")
|
a.log.Info("added path /upload/{cid}")
|
||||||
r.GET("/get/{cid}/{oid:*}", a.logger(downloadRoutes.DownloadByAddressOrBucketName))
|
r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadByAddressOrBucketName))))
|
||||||
r.HEAD("/get/{cid}/{oid:*}", a.logger(downloadRoutes.HeadByAddressOrBucketName))
|
r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.HeadByAddressOrBucketName))))
|
||||||
a.log.Info("added path /get/{cid}/{oid}")
|
a.log.Info("added path /get/{cid}/{oid}")
|
||||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(downloadRoutes.DownloadByAttribute))
|
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadByAttribute))))
|
||||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(downloadRoutes.HeadByAttribute))
|
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.HeadByAttribute))))
|
||||||
a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}")
|
a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}")
|
||||||
r.GET("/zip/{cid}/{prefix:*}", a.logger(downloadRoutes.DownloadZipped))
|
r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadZipped))))
|
||||||
a.log.Info("added path /zip/{cid}/{prefix}")
|
a.log.Info("added path /zip/{cid}/{prefix}")
|
||||||
|
|
||||||
a.webServer.Handler = r.Handler
|
a.webServer.Handler = r.Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
func (a *app) logger(req fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
return func(ctx *fasthttp.RequestCtx) {
|
return func(ctx *fasthttp.RequestCtx) {
|
||||||
a.log.Info("request", zap.String("remote", ctx.RemoteAddr().String()),
|
a.log.Info("request", zap.String("remote", ctx.RemoteAddr().String()),
|
||||||
zap.ByteString("method", ctx.Method()),
|
zap.ByteString("method", ctx.Method()),
|
||||||
zap.ByteString("path", ctx.Path()),
|
zap.ByteString("path", ctx.Path()),
|
||||||
zap.ByteString("query", ctx.QueryArgs().QueryString()),
|
zap.ByteString("query", ctx.QueryArgs().QueryString()),
|
||||||
zap.Uint64("id", ctx.ID()))
|
zap.Uint64("id", ctx.ID()))
|
||||||
h(ctx)
|
req(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) tokenizer(req fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
|
return func(ctx *fasthttp.RequestCtx) {
|
||||||
dkirillov
commented
Since we started to rename variables with type Since we started to rename variables with type `*fasthttp.RequestCtx` to `req` let's stick on it.
|
|||||||
|
appCtx, err := tokens.StoreBearerTokenAppCtx(ctx, a.ctx)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||||
|
response.Error(ctx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
}
|
||||||
|
utils.SetContextToRequest(appCtx, ctx)
|
||||||
|
req(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) tracer(req fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
|
return func(ctx *fasthttp.RequestCtx) {
|
||||||
dkirillov
commented
Since we started to rename variables with type Since we started to rename variables with type `*fasthttp.RequestCtx` to `req` let's stick on it.
|
|||||||
|
appCtx := utils.GetContextFromRequest(ctx)
|
||||||
|
|
||||||
|
appCtx, span := utils.StartHTTPServerSpan(appCtx, ctx, "REQUEST")
|
||||||
dkirillov
commented
Maybe we should write Maybe we should write `REQUEST to http-gw` instead of `OPERATION With Object`?
|
|||||||
|
defer func() {
|
||||||
|
utils.SetHTTPTraceInfo(appCtx, span, ctx)
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
|
||||||
|
utils.SetContextToRequest(appCtx, ctx)
|
||||||
|
req(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,8 +28,6 @@ import (
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -97,15 +95,10 @@ func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddres
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
filename string
|
filename string
|
||||||
)
|
)
|
||||||
if err = tokens.StoreBearerToken(req.RequestCtx); err != nil {
|
|
||||||
req.log.Error("could not fetch and store bearer token", zap.Error(err))
|
|
||||||
response.Error(req.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var prm pool.PrmObjectGet
|
var prm pool.PrmObjectGet
|
||||||
prm.SetAddress(objectAddress)
|
prm.SetAddress(objectAddress)
|
||||||
if btoken := bearerToken(req.RequestCtx); btoken != nil {
|
if btoken := bearerToken(ctx); btoken != nil {
|
||||||
prm.UseBearer(*btoken)
|
prm.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,7 +201,6 @@ func (r *request) handleFrostFSErr(err error, start time.Time) {
|
||||||
|
|
||||||
// Downloader is a download request handler.
|
// Downloader is a download request handler.
|
||||||
type Downloader struct {
|
type Downloader struct {
|
||||||
appCtx context.Context
|
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
pool *pool.Pool
|
pool *pool.Pool
|
||||||
containerResolver *resolver.ContainerResolver
|
containerResolver *resolver.ContainerResolver
|
||||||
|
@ -230,9 +222,8 @@ func (s *Settings) SetZipCompression(val bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates an instance of Downloader using specified options.
|
// New creates an instance of Downloader using specified options.
|
||||||
func New(ctx context.Context, params *utils.AppParams, settings *Settings, tree *tree.Tree) *Downloader {
|
func New(params *utils.AppParams, settings *Settings, tree *tree.Tree) *Downloader {
|
||||||
return &Downloader{
|
return &Downloader{
|
||||||
appCtx: ctx,
|
|
||||||
log: params.Logger,
|
log: params.Logger,
|
||||||
pool: params.Pool,
|
pool: params.Pool,
|
||||||
settings: settings,
|
settings: settings,
|
||||||
|
@ -269,15 +260,7 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, r
|
||||||
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
|
ctx := utils.GetContextFromRequest(c)
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("cid", idCnr),
|
|
||||||
attribute.String("oid", idObj),
|
|
||||||
))
|
|
||||||
defer func() {
|
|
||||||
utils.SetHTTPTraceInfo(ctx, span, c)
|
|
||||||
span.End()
|
|
||||||
}()
|
|
||||||
|
|
||||||
cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver)
|
cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -309,22 +292,7 @@ func (d *Downloader) byBucketname(req *fasthttp.RequestCtx, f func(context.Conte
|
||||||
log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx, err := tokens.StoreBearerTokenAppCtx(req, d.appCtx)
|
ctx := utils.GetContextFromRequest(req)
|
||||||
if err != nil {
|
|
||||||
log.Error("could not fetch and store bearer token", zap.Error(err))
|
|
||||||
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, span := utils.StartHTTPServerSpan(ctx, req, "GET Object by bucket name",
|
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("bucketname", bucketname),
|
|
||||||
attribute.String("objectKey", key),
|
|
||||||
))
|
|
||||||
defer func() {
|
|
||||||
utils.SetHTTPTraceInfo(ctx, span, req)
|
|
||||||
span.End()
|
|
||||||
}()
|
|
||||||
|
|
||||||
cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver)
|
cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -366,16 +334,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
|
||||||
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
|
ctx := utils.GetContextFromRequest(c)
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("attr_key", key),
|
|
||||||
attribute.String("attr_val", val),
|
|
||||||
attribute.String("cid", scid),
|
|
||||||
))
|
|
||||||
defer func() {
|
|
||||||
utils.SetHTTPTraceInfo(ctx, span, c)
|
|
||||||
span.End()
|
|
||||||
}()
|
|
||||||
|
|
||||||
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -384,7 +343,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := d.search(c, ctx, containerID, key, val, object.MatchStringEqual)
|
res, err := d.search(ctx, containerID, key, val, object.MatchStringEqual)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not search for objects", zap.Error(err))
|
log.Error("could not search for objects", zap.Error(err))
|
||||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
@ -415,7 +374,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
|
||||||
f(ctx, *d.newRequest(c, log), d.pool, addrObj)
|
f(ctx, *d.newRequest(c, log), d.pool, addrObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
func (d *Downloader) search(ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
||||||
filters := object.NewSearchFilters()
|
filters := object.NewSearchFilters()
|
||||||
filters.AddRootFilter()
|
filters.AddRootFilter()
|
||||||
filters.AddFilter(key, val, op)
|
filters.AddFilter(key, val, op)
|
||||||
|
@ -423,7 +382,7 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *ci
|
||||||
var prm pool.PrmObjectSearch
|
var prm pool.PrmObjectSearch
|
||||||
prm.SetContainerID(*cid)
|
prm.SetContainerID(*cid)
|
||||||
prm.SetFilters(filters)
|
prm.SetFilters(filters)
|
||||||
if btoken := bearerToken(c); btoken != nil {
|
if btoken := bearerToken(ctx); btoken != nil {
|
||||||
prm.UseBearer(*btoken)
|
prm.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -461,15 +420,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
|
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
|
||||||
log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
||||||
|
|
||||||
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "DOWNLOAD ZIP Object",
|
ctx := utils.GetContextFromRequest(c)
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("prefix", prefix),
|
|
||||||
attribute.String("cid", scid),
|
|
||||||
))
|
|
||||||
defer func() {
|
|
||||||
utils.SetHTTPTraceInfo(ctx, span, c)
|
|
||||||
span.End()
|
|
||||||
}()
|
|
||||||
|
|
||||||
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -478,12 +429,6 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = tokens.StoreBearerToken(c); err != nil {
|
|
||||||
log.Error("could not fetch and store bearer token", zap.Error(err))
|
|
||||||
response.Error(c, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if container exists here to be able to return 404 error,
|
// check if container exists here to be able to return 404 error,
|
||||||
// otherwise we get this error only in object iteration step
|
// otherwise we get this error only in object iteration step
|
||||||
// and client get 200 OK.
|
// and client get 200 OK.
|
||||||
|
@ -497,7 +442,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resSearch, err := d.search(c, ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
resSearch, err := d.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not search for objects", zap.Error(err))
|
log.Error("could not search for objects", zap.Error(err))
|
||||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
@ -518,7 +463,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
|
|
||||||
empty := true
|
empty := true
|
||||||
called := false
|
called := false
|
||||||
btoken := bearerToken(c)
|
btoken := bearerToken(ctx)
|
||||||
addr.SetContainer(*containerID)
|
addr.SetContainer(*containerID)
|
||||||
|
|
||||||
errIter := resSearch.Iterate(func(id oid.ID) bool {
|
errIter := resSearch.Iterate(func(id oid.ID) bool {
|
||||||
|
|
|
@ -7,16 +7,11 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,23 +25,9 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "HEAD Object", trace.WithAttributes(
|
|
||||||
attribute.String("cid", objectAddress.Container().EncodeToString()),
|
|
||||||
attribute.String("oid", objectAddress.Object().EncodeToString()),
|
|
||||||
))
|
|
||||||
defer func() {
|
|
||||||
utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx)
|
|
||||||
span.End()
|
|
||||||
}()
|
|
||||||
|
|
||||||
var start = time.Now()
|
var start = time.Now()
|
||||||
if err := tokens.StoreBearerToken(req.RequestCtx); err != nil {
|
|
||||||
req.log.Error("could not fetch and store bearer token", zap.Error(err))
|
|
||||||
response.Error(req.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
btoken := bearerToken(req.RequestCtx)
|
btoken := bearerToken(ctx)
|
||||||
|
|
||||||
var prm pool.PrmObjectHead
|
var prm pool.PrmObjectHead
|
||||||
prm.SetAddress(objectAddress)
|
prm.SetAddress(objectAddress)
|
||||||
|
|
2
main.go
2
main.go
|
@ -12,6 +12,6 @@ func main() {
|
||||||
logger, atomicLevel := newLogger(v)
|
logger, atomicLevel := newLogger(v)
|
||||||
|
|
||||||
application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v))
|
application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v))
|
||||||
go application.Serve(globalContext)
|
go application.Serve()
|
||||||
application.Wait()
|
application.Wait()
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,18 +48,6 @@ func BearerTokenFromCookie(h *fasthttp.RequestHeader) []byte {
|
||||||
return auth
|
return auth
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreBearerToken extracts a bearer token from the header or cookie and stores
|
|
||||||
// it in the request context.
|
|
||||||
func StoreBearerToken(ctx *fasthttp.RequestCtx) error {
|
|
||||||
tkn, err := fetchBearerToken(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// This is an analog of context.WithValue.
|
|
||||||
ctx.SetUserValue(bearerTokenKey, tkn)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// StoreBearerTokenAppCtx extracts a bearer token from the header or cookie and stores
|
// StoreBearerTokenAppCtx extracts a bearer token from the header or cookie and stores
|
||||||
// it in the application context.
|
// it in the application context.
|
||||||
func StoreBearerTokenAppCtx(ctx *fasthttp.RequestCtx, appCtx context.Context) (context.Context, error) {
|
func StoreBearerTokenAppCtx(ctx *fasthttp.RequestCtx, appCtx context.Context) (context.Context, error) {
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
package tokens
|
package tokens
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -153,10 +154,11 @@ func Test_checkAndPropagateBearerToken(t *testing.T) {
|
||||||
ctx := makeTestRequest(t64, "")
|
ctx := makeTestRequest(t64, "")
|
||||||
|
|
||||||
// Expect to see the token within the context.
|
// Expect to see the token within the context.
|
||||||
require.NoError(t, StoreBearerToken(ctx))
|
appCtx, err := StoreBearerTokenAppCtx(ctx, context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Expect to see the same token without errors.
|
// Expect to see the same token without errors.
|
||||||
actual, err := LoadBearerToken(ctx)
|
actual, err := LoadBearerToken(appCtx)
|
||||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
We can replace
We can replace `StoreBearerToken` call with `StoreBearerTokenAppCtx`.
```go
// Expect to see the token within the context.
appCtx, err := StoreBearerTokenAppCtx(ctx, context.Background())
require.NoError(t, err)
// Expect to see the same token without errors.
actual, err := LoadBearerToken(appCtx)
require.NoError(t, err)
require.Equal(t, tkn, actual)
```
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, tkn, actual)
|
require.Equal(t, tkn, actual)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,8 +18,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -31,7 +29,6 @@ const (
|
||||||
|
|
||||||
// Uploader is an upload request handler.
|
// Uploader is an upload request handler.
|
||||||
type Uploader struct {
|
type Uploader struct {
|
||||||
appCtx context.Context
|
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
pool *pool.Pool
|
pool *pool.Pool
|
||||||
ownerID *user.ID
|
ownerID *user.ID
|
||||||
|
@ -54,9 +51,8 @@ func (s *Settings) SetDefaultTimestamp(val bool) {
|
||||||
|
|
||||||
// New creates a new Uploader using specified logger, connection pool and
|
// New creates a new Uploader using specified logger, connection pool and
|
||||||
// other options.
|
// other options.
|
||||||
func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Uploader {
|
func New(params *utils.AppParams, settings *Settings) *Uploader {
|
||||||
return &Uploader{
|
return &Uploader{
|
||||||
appCtx: ctx,
|
|
||||||
log: params.Logger,
|
log: params.Logger,
|
||||||
pool: params.Pool,
|
pool: params.Pool,
|
||||||
ownerID: params.Owner,
|
ownerID: params.Owner,
|
||||||
|
@ -77,20 +73,7 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
|
||||||
drainBuf = make([]byte, drainBufSize)
|
drainBuf = make([]byte, drainBufSize)
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := tokens.StoreBearerToken(req); err != nil {
|
ctx := utils.GetContextFromRequest(req)
|
||||||
log.Error("could not fetch bearer token", zap.Error(err))
|
|
||||||
response.Error(req, "could not fetch bearer token", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, span := utils.StartHTTPServerSpan(u.appCtx, req, "UPLOAD Object",
|
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("cid", scid),
|
|
||||||
))
|
|
||||||
defer func() {
|
|
||||||
utils.SetHTTPTraceInfo(ctx, span, req)
|
|
||||||
span.End()
|
|
||||||
}()
|
|
||||||
|
|
||||||
idCnr, err := utils.GetContainerID(ctx, scid, u.containerResolver)
|
idCnr, err := utils.GetContainerID(ctx, scid, u.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -162,7 +145,7 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
|
||||||
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
||||||
attributes = append(attributes, *timestamp)
|
attributes = append(attributes, *timestamp)
|
||||||
}
|
}
|
||||||
id, bt := u.fetchOwnerAndBearerToken(req)
|
id, bt := u.fetchOwnerAndBearerToken(ctx)
|
||||||
|
|
||||||
obj := object.New()
|
obj := object.New()
|
||||||
obj.SetContainerID(*idCnr)
|
obj.SetContainerID(*idCnr)
|
||||||
|
|
|
@ -77,6 +77,7 @@ func StartHTTPServerSpan(ctx context.Context, req *fasthttp.RequestCtx, operatio
|
||||||
attribute.String("http.path", string(req.Path())),
|
attribute.String("http.path", string(req.Path())),
|
||||||
semconv.HTTPMethod(string(req.Method())),
|
semconv.HTTPMethod(string(req.Method())),
|
||||||
semconv.RPCService("frostfs-http-gw"),
|
semconv.RPCService("frostfs-http-gw"),
|
||||||
|
attribute.String("http.query", req.QueryArgs().String()),
|
||||||
), trace.WithSpanKind(trace.SpanKindServer))
|
), trace.WithSpanKind(trace.SpanKindServer))
|
||||||
return tracing.StartSpanFromContext(ctx, operationName, opts...)
|
return tracing.StartSpanFromContext(ctx, operationName, opts...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetContainerID decode container id, if it's not a valid container id
|
// GetContainerID decode container id, if it's not a valid container id
|
||||||
|
@ -43,3 +44,13 @@ func GetEpochDurations(ctx context.Context, p *pool.Pool) (*EpochDurations, erro
|
||||||
}
|
}
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetContextToRequest adds new context to fasthttp request.
|
||||||
|
func SetContextToRequest(ctx context.Context, c *fasthttp.RequestCtx) {
|
||||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
Put
Put `context` into constant. Also it would be nice to add comments to a public functions in the package!
```
// AddToContext adds new context to fasthttp request.
```
|
|||||||
|
c.SetUserValue("context", ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetContextFromRequest returns main context from fasthttp request context.
|
||||||
|
func GetContextFromRequest(c *fasthttp.RequestCtx) context.Context {
|
||||||
dkirillov
commented
Let's rename these functions to
Let's rename these functions to
* `GetContext` and `SetContext` or
* `GetFromRequest` and `SetToRequest` or
* `GetContextFromRequest` and `SetContextToRequest`
?
alexvanin
commented
Last one seems good to me. Last one seems good to me.
|
|||||||
|
return c.UserValue("context").(context.Context)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue
If we store appCtx to this, we now can:
appCtx
from here and herea.ctx
insidea.initTracing
,a.initTree
,a.initServers
and other similar places)