forked from TrueCloudLab/frostfs-http-gw
[#44] add tracing support refactoring
Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
This commit is contained in:
parent
8a22991326
commit
cdaab4feab
9 changed files with 78 additions and 137 deletions
|
@ -28,8 +28,6 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -97,15 +95,10 @@ func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddres
|
|||
start = time.Now()
|
||||
filename string
|
||||
)
|
||||
if err = tokens.StoreBearerToken(req.RequestCtx); err != nil {
|
||||
req.log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||
response.Error(req.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var prm pool.PrmObjectGet
|
||||
prm.SetAddress(objectAddress)
|
||||
if btoken := bearerToken(req.RequestCtx); btoken != nil {
|
||||
if btoken := bearerToken(ctx); btoken != nil {
|
||||
prm.UseBearer(*btoken)
|
||||
}
|
||||
|
||||
|
@ -208,7 +201,6 @@ func (r *request) handleFrostFSErr(err error, start time.Time) {
|
|||
|
||||
// Downloader is a download request handler.
|
||||
type Downloader struct {
|
||||
appCtx context.Context
|
||||
log *zap.Logger
|
||||
pool *pool.Pool
|
||||
containerResolver *resolver.ContainerResolver
|
||||
|
@ -230,9 +222,8 @@ func (s *Settings) SetZipCompression(val bool) {
|
|||
}
|
||||
|
||||
// New creates an instance of Downloader using specified options.
|
||||
func New(ctx context.Context, params *utils.AppParams, settings *Settings, tree *tree.Tree) *Downloader {
|
||||
func New(params *utils.AppParams, settings *Settings, tree *tree.Tree) *Downloader {
|
||||
return &Downloader{
|
||||
appCtx: ctx,
|
||||
log: params.Logger,
|
||||
pool: params.Pool,
|
||||
settings: settings,
|
||||
|
@ -269,15 +260,7 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, r
|
|||
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
||||
)
|
||||
|
||||
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
|
||||
trace.WithAttributes(
|
||||
attribute.String("cid", idCnr),
|
||||
attribute.String("oid", idObj),
|
||||
))
|
||||
defer func() {
|
||||
utils.SetHTTPTraceInfo(ctx, span, c)
|
||||
span.End()
|
||||
}()
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver)
|
||||
if err != nil {
|
||||
|
@ -309,22 +292,7 @@ func (d *Downloader) byBucketname(req *fasthttp.RequestCtx, f func(context.Conte
|
|||
log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
||||
)
|
||||
|
||||
ctx, err := tokens.StoreBearerTokenAppCtx(req, d.appCtx)
|
||||
if err != nil {
|
||||
log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, span := utils.StartHTTPServerSpan(ctx, req, "GET Object by bucket name",
|
||||
trace.WithAttributes(
|
||||
attribute.String("bucketname", bucketname),
|
||||
attribute.String("objectKey", key),
|
||||
))
|
||||
defer func() {
|
||||
utils.SetHTTPTraceInfo(ctx, span, req)
|
||||
span.End()
|
||||
}()
|
||||
ctx := utils.GetContextFromRequest(req)
|
||||
|
||||
cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver)
|
||||
if err != nil {
|
||||
|
@ -366,16 +334,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
|
|||
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||
)
|
||||
|
||||
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object",
|
||||
trace.WithAttributes(
|
||||
attribute.String("attr_key", key),
|
||||
attribute.String("attr_val", val),
|
||||
attribute.String("cid", scid),
|
||||
))
|
||||
defer func() {
|
||||
utils.SetHTTPTraceInfo(ctx, span, c)
|
||||
span.End()
|
||||
}()
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
||||
if err != nil {
|
||||
|
@ -384,7 +343,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
|
|||
return
|
||||
}
|
||||
|
||||
res, err := d.search(c, ctx, containerID, key, val, object.MatchStringEqual)
|
||||
res, err := d.search(ctx, containerID, key, val, object.MatchStringEqual)
|
||||
if err != nil {
|
||||
log.Error("could not search for objects", zap.Error(err))
|
||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
|
@ -415,7 +374,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
|
|||
f(ctx, *d.newRequest(c, log), d.pool, addrObj)
|
||||
}
|
||||
|
||||
func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
||||
func (d *Downloader) search(ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
||||
filters := object.NewSearchFilters()
|
||||
filters.AddRootFilter()
|
||||
filters.AddFilter(key, val, op)
|
||||
|
@ -423,7 +382,7 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *ci
|
|||
var prm pool.PrmObjectSearch
|
||||
prm.SetContainerID(*cid)
|
||||
prm.SetFilters(filters)
|
||||
if btoken := bearerToken(c); btoken != nil {
|
||||
if btoken := bearerToken(ctx); btoken != nil {
|
||||
prm.UseBearer(*btoken)
|
||||
}
|
||||
|
||||
|
@ -461,15 +420,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
|
||||
log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
||||
|
||||
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "DOWNLOAD ZIP Object",
|
||||
trace.WithAttributes(
|
||||
attribute.String("prefix", prefix),
|
||||
attribute.String("cid", scid),
|
||||
))
|
||||
defer func() {
|
||||
utils.SetHTTPTraceInfo(ctx, span, c)
|
||||
span.End()
|
||||
}()
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
||||
if err != nil {
|
||||
|
@ -478,12 +429,6 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
return
|
||||
}
|
||||
|
||||
if err = tokens.StoreBearerToken(c); err != nil {
|
||||
log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||
response.Error(c, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// check if container exists here to be able to return 404 error,
|
||||
// otherwise we get this error only in object iteration step
|
||||
// and client get 200 OK.
|
||||
|
@ -497,7 +442,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
return
|
||||
}
|
||||
|
||||
resSearch, err := d.search(c, ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
resSearch, err := d.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
if err != nil {
|
||||
log.Error("could not search for objects", zap.Error(err))
|
||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
|
@ -518,7 +463,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
|
||||
empty := true
|
||||
called := false
|
||||
btoken := bearerToken(c)
|
||||
btoken := bearerToken(ctx)
|
||||
addr.SetContainer(*containerID)
|
||||
|
||||
errIter := resSearch.Iterate(func(id oid.ID) bool {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue