From cdaab4feabb84a7fc97537fcd1599cca8c4e4c16 Mon Sep 17 00:00:00 2001 From: Pavel Pogodaev Date: Tue, 30 May 2023 17:01:20 +0300 Subject: [PATCH] [#44] add tracing support refactoring Signed-off-by: Pavel Pogodaev --- app.go | 62 +++++++++++++++++++++-------- downloader/download.go | 77 ++++++------------------------------- downloader/head.go | 21 +--------- main.go | 2 +- tokens/bearer-token.go | 12 ------ tokens/bearer-token_test.go | 6 ++- uploader/upload.go | 23 ++--------- utils/tracing.go | 1 + utils/util.go | 11 ++++++ 9 files changed, 78 insertions(+), 137 deletions(-) diff --git a/app.go b/app.go index a989c90..6a52a55 100644 --- a/app.go +++ b/app.go @@ -17,6 +17,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" @@ -37,6 +38,7 @@ import ( type ( app struct { + ctx context.Context log *zap.Logger logLevel zap.AtomicLevel pool *pool.Pool @@ -60,7 +62,7 @@ type ( // App is an interface for the main gateway function. App interface { Wait() - Serve(context.Context) + Serve() } // Option is an application option. @@ -101,6 +103,7 @@ func newApp(ctx context.Context, opt ...Option) App { ) a := &app{ + ctx: ctx, log: zap.L(), cfg: viper.GetViper(), webServer: new(fasthttp.Server), @@ -353,16 +356,16 @@ func (a *app) setHealthStatus() { a.metrics.SetHealth(metrics.HealthStatusReady) } -func (a *app) Serve(ctx context.Context) { - treeClient := a.initTree(ctx) - uploadRoutes := uploader.New(ctx, a.AppParams(), a.settings.Uploader) - downloadRoutes := downloader.New(ctx, a.AppParams(), a.settings.Downloader, treeClient) +func (a *app) Serve() { + treeClient := a.initTree(a.ctx) + uploadRoutes := uploader.New(a.AppParams(), a.settings.Uploader) + downloadRoutes := downloader.New(a.AppParams(), a.settings.Downloader, treeClient) // Configure router. a.configureRouter(uploadRoutes, downloadRoutes) a.startServices() - a.initServers(ctx) + a.initServers(a.ctx) for i := range a.servers { go func(i int) { @@ -379,10 +382,10 @@ func (a *app) Serve(ctx context.Context) { LOOP: for { select { - case <-ctx.Done(): + case <-a.ctx.Done(): break LOOP case <-sigs: - a.configReload(ctx) + a.configReload(a.ctx) } } @@ -477,28 +480,55 @@ func (a *app) configureRouter(uploadRoutes *uploader.Uploader, downloadRoutes *d r.MethodNotAllowed = func(r *fasthttp.RequestCtx) { response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed) } - r.POST("/upload/{cid}", a.logger(uploadRoutes.Upload)) + r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(uploadRoutes.Upload)))) a.log.Info("added path /upload/{cid}") - r.GET("/get/{cid}/{oid:*}", a.logger(downloadRoutes.DownloadByAddressOrBucketName)) - r.HEAD("/get/{cid}/{oid:*}", a.logger(downloadRoutes.HeadByAddressOrBucketName)) + r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadByAddressOrBucketName)))) + r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.HeadByAddressOrBucketName)))) a.log.Info("added path /get/{cid}/{oid}") - r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(downloadRoutes.DownloadByAttribute)) - r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(downloadRoutes.HeadByAttribute)) + r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadByAttribute)))) + r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.HeadByAttribute)))) a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}") - r.GET("/zip/{cid}/{prefix:*}", a.logger(downloadRoutes.DownloadZipped)) + r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadZipped)))) a.log.Info("added path /zip/{cid}/{prefix}") a.webServer.Handler = r.Handler } -func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler { +func (a *app) logger(req fasthttp.RequestHandler) fasthttp.RequestHandler { return func(ctx *fasthttp.RequestCtx) { a.log.Info("request", zap.String("remote", ctx.RemoteAddr().String()), zap.ByteString("method", ctx.Method()), zap.ByteString("path", ctx.Path()), zap.ByteString("query", ctx.QueryArgs().QueryString()), zap.Uint64("id", ctx.ID())) - h(ctx) + req(ctx) + } +} + +func (a *app) tokenizer(req fasthttp.RequestHandler) fasthttp.RequestHandler { + return func(ctx *fasthttp.RequestCtx) { + appCtx, err := tokens.StoreBearerTokenAppCtx(ctx, a.ctx) + if err != nil { + a.log.Error("could not fetch and store bearer token", zap.Error(err)) + response.Error(ctx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) + } + utils.SetContextToRequest(appCtx, ctx) + req(ctx) + } +} + +func (a *app) tracer(req fasthttp.RequestHandler) fasthttp.RequestHandler { + return func(ctx *fasthttp.RequestCtx) { + appCtx := utils.GetContextFromRequest(ctx) + + appCtx, span := utils.StartHTTPServerSpan(appCtx, ctx, "REQUEST") + defer func() { + utils.SetHTTPTraceInfo(appCtx, span, ctx) + span.End() + }() + + utils.SetContextToRequest(appCtx, ctx) + req(ctx) } } diff --git a/downloader/download.go b/downloader/download.go index 8a62dff..2a08c82 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -28,8 +28,6 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "github.com/valyala/fasthttp" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -97,15 +95,10 @@ func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddres start = time.Now() filename string ) - if err = tokens.StoreBearerToken(req.RequestCtx); err != nil { - req.log.Error("could not fetch and store bearer token", zap.Error(err)) - response.Error(req.RequestCtx, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) - return - } var prm pool.PrmObjectGet prm.SetAddress(objectAddress) - if btoken := bearerToken(req.RequestCtx); btoken != nil { + if btoken := bearerToken(ctx); btoken != nil { prm.UseBearer(*btoken) } @@ -208,7 +201,6 @@ func (r *request) handleFrostFSErr(err error, start time.Time) { // Downloader is a download request handler. type Downloader struct { - appCtx context.Context log *zap.Logger pool *pool.Pool containerResolver *resolver.ContainerResolver @@ -230,9 +222,8 @@ func (s *Settings) SetZipCompression(val bool) { } // New creates an instance of Downloader using specified options. -func New(ctx context.Context, params *utils.AppParams, settings *Settings, tree *tree.Tree) *Downloader { +func New(params *utils.AppParams, settings *Settings, tree *tree.Tree) *Downloader { return &Downloader{ - appCtx: ctx, log: params.Logger, pool: params.Pool, settings: settings, @@ -269,15 +260,7 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, r log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj)) ) - ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object", - trace.WithAttributes( - attribute.String("cid", idCnr), - attribute.String("oid", idObj), - )) - defer func() { - utils.SetHTTPTraceInfo(ctx, span, c) - span.End() - }() + ctx := utils.GetContextFromRequest(c) cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver) if err != nil { @@ -309,22 +292,7 @@ func (d *Downloader) byBucketname(req *fasthttp.RequestCtx, f func(context.Conte log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key)) ) - ctx, err := tokens.StoreBearerTokenAppCtx(req, d.appCtx) - if err != nil { - log.Error("could not fetch and store bearer token", zap.Error(err)) - response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) - return - } - - ctx, span := utils.StartHTTPServerSpan(ctx, req, "GET Object by bucket name", - trace.WithAttributes( - attribute.String("bucketname", bucketname), - attribute.String("objectKey", key), - )) - defer func() { - utils.SetHTTPTraceInfo(ctx, span, req) - span.End() - }() + ctx := utils.GetContextFromRequest(req) cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver) if err != nil { @@ -366,16 +334,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) ) - ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "GET Object", - trace.WithAttributes( - attribute.String("attr_key", key), - attribute.String("attr_val", val), - attribute.String("cid", scid), - )) - defer func() { - utils.SetHTTPTraceInfo(ctx, span, c) - span.End() - }() + ctx := utils.GetContextFromRequest(c) containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver) if err != nil { @@ -384,7 +343,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, return } - res, err := d.search(c, ctx, containerID, key, val, object.MatchStringEqual) + res, err := d.search(ctx, containerID, key, val, object.MatchStringEqual) if err != nil { log.Error("could not search for objects", zap.Error(err)) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) @@ -415,7 +374,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, f(ctx, *d.newRequest(c, log), d.pool, addrObj) } -func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) { +func (d *Downloader) search(ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) { filters := object.NewSearchFilters() filters.AddRootFilter() filters.AddFilter(key, val, op) @@ -423,7 +382,7 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *ci var prm pool.PrmObjectSearch prm.SetContainerID(*cid) prm.SetFilters(filters) - if btoken := bearerToken(c); btoken != nil { + if btoken := bearerToken(ctx); btoken != nil { prm.UseBearer(*btoken) } @@ -461,15 +420,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string)) log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix)) - ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "DOWNLOAD ZIP Object", - trace.WithAttributes( - attribute.String("prefix", prefix), - attribute.String("cid", scid), - )) - defer func() { - utils.SetHTTPTraceInfo(ctx, span, c) - span.End() - }() + ctx := utils.GetContextFromRequest(c) containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver) if err != nil { @@ -478,12 +429,6 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { return } - if err = tokens.StoreBearerToken(c); err != nil { - log.Error("could not fetch and store bearer token", zap.Error(err)) - response.Error(c, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) - return - } - // check if container exists here to be able to return 404 error, // otherwise we get this error only in object iteration step // and client get 200 OK. @@ -497,7 +442,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { return } - resSearch, err := d.search(c, ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) + resSearch, err := d.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) if err != nil { log.Error("could not search for objects", zap.Error(err)) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) @@ -518,7 +463,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { empty := true called := false - btoken := bearerToken(c) + btoken := bearerToken(ctx) addr.SetContainer(*containerID) errIter := resSearch.Iterate(func(id oid.ID) bool { diff --git a/downloader/head.go b/downloader/head.go index 4d2f9dd..3e5a92a 100644 --- a/downloader/head.go +++ b/downloader/head.go @@ -7,16 +7,11 @@ import ( "strconv" "time" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "github.com/valyala/fasthttp" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -30,23 +25,9 @@ const ( ) func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) { - ctx, span := tracing.StartSpanFromContext(ctx, "HEAD Object", trace.WithAttributes( - attribute.String("cid", objectAddress.Container().EncodeToString()), - attribute.String("oid", objectAddress.Object().EncodeToString()), - )) - defer func() { - utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx) - span.End() - }() - var start = time.Now() - if err := tokens.StoreBearerToken(req.RequestCtx); err != nil { - req.log.Error("could not fetch and store bearer token", zap.Error(err)) - response.Error(req.RequestCtx, "could not fetch and store bearer token", fasthttp.StatusBadRequest) - return - } - btoken := bearerToken(req.RequestCtx) + btoken := bearerToken(ctx) var prm pool.PrmObjectHead prm.SetAddress(objectAddress) diff --git a/main.go b/main.go index f997955..5762675 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,6 @@ func main() { logger, atomicLevel := newLogger(v) application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v)) - go application.Serve(globalContext) + go application.Serve() application.Wait() } diff --git a/tokens/bearer-token.go b/tokens/bearer-token.go index f43cb85..fd4404b 100644 --- a/tokens/bearer-token.go +++ b/tokens/bearer-token.go @@ -48,18 +48,6 @@ func BearerTokenFromCookie(h *fasthttp.RequestHeader) []byte { return auth } -// StoreBearerToken extracts a bearer token from the header or cookie and stores -// it in the request context. -func StoreBearerToken(ctx *fasthttp.RequestCtx) error { - tkn, err := fetchBearerToken(ctx) - if err != nil { - return err - } - // This is an analog of context.WithValue. - ctx.SetUserValue(bearerTokenKey, tkn) - return nil -} - // StoreBearerTokenAppCtx extracts a bearer token from the header or cookie and stores // it in the application context. func StoreBearerTokenAppCtx(ctx *fasthttp.RequestCtx, appCtx context.Context) (context.Context, error) { diff --git a/tokens/bearer-token_test.go b/tokens/bearer-token_test.go index d5706cc..170246c 100644 --- a/tokens/bearer-token_test.go +++ b/tokens/bearer-token_test.go @@ -3,6 +3,7 @@ package tokens import ( + "context" "encoding/base64" "testing" @@ -153,10 +154,11 @@ func Test_checkAndPropagateBearerToken(t *testing.T) { ctx := makeTestRequest(t64, "") // Expect to see the token within the context. - require.NoError(t, StoreBearerToken(ctx)) + appCtx, err := StoreBearerTokenAppCtx(ctx, context.Background()) + require.NoError(t, err) // Expect to see the same token without errors. - actual, err := LoadBearerToken(ctx) + actual, err := LoadBearerToken(appCtx) require.NoError(t, err) require.Equal(t, tkn, actual) } diff --git a/uploader/upload.go b/uploader/upload.go index 3569a0e..43996ea 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -18,8 +18,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/valyala/fasthttp" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -31,7 +29,6 @@ const ( // Uploader is an upload request handler. type Uploader struct { - appCtx context.Context log *zap.Logger pool *pool.Pool ownerID *user.ID @@ -54,9 +51,8 @@ func (s *Settings) SetDefaultTimestamp(val bool) { // New creates a new Uploader using specified logger, connection pool and // other options. -func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Uploader { +func New(params *utils.AppParams, settings *Settings) *Uploader { return &Uploader{ - appCtx: ctx, log: params.Logger, pool: params.Pool, ownerID: params.Owner, @@ -77,20 +73,7 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) { drainBuf = make([]byte, drainBufSize) ) - if err := tokens.StoreBearerToken(req); err != nil { - log.Error("could not fetch bearer token", zap.Error(err)) - response.Error(req, "could not fetch bearer token", fasthttp.StatusBadRequest) - return - } - - ctx, span := utils.StartHTTPServerSpan(u.appCtx, req, "UPLOAD Object", - trace.WithAttributes( - attribute.String("cid", scid), - )) - defer func() { - utils.SetHTTPTraceInfo(ctx, span, req) - span.End() - }() + ctx := utils.GetContextFromRequest(req) idCnr, err := utils.GetContainerID(ctx, scid, u.containerResolver) if err != nil { @@ -162,7 +145,7 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) { timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) attributes = append(attributes, *timestamp) } - id, bt := u.fetchOwnerAndBearerToken(req) + id, bt := u.fetchOwnerAndBearerToken(ctx) obj := object.New() obj.SetContainerID(*idCnr) diff --git a/utils/tracing.go b/utils/tracing.go index 4bb74df..75a4486 100644 --- a/utils/tracing.go +++ b/utils/tracing.go @@ -77,6 +77,7 @@ func StartHTTPServerSpan(ctx context.Context, req *fasthttp.RequestCtx, operatio attribute.String("http.path", string(req.Path())), semconv.HTTPMethod(string(req.Method())), semconv.RPCService("frostfs-http-gw"), + attribute.String("http.query", req.QueryArgs().String()), ), trace.WithSpanKind(trace.SpanKindServer)) return tracing.StartSpanFromContext(ctx, operationName, opts...) } diff --git a/utils/util.go b/utils/util.go index d5a476a..e54c98d 100644 --- a/utils/util.go +++ b/utils/util.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" + "github.com/valyala/fasthttp" ) // GetContainerID decode container id, if it's not a valid container id @@ -43,3 +44,13 @@ func GetEpochDurations(ctx context.Context, p *pool.Pool) (*EpochDurations, erro } return res, nil } + +// SetContextToRequest adds new context to fasthttp request. +func SetContextToRequest(ctx context.Context, c *fasthttp.RequestCtx) { + c.SetUserValue("context", ctx) +} + +// GetContextFromRequest returns main context from fasthttp request context. +func GetContextFromRequest(c *fasthttp.RequestCtx) context.Context { + return c.UserValue("context").(context.Context) +}