From 8a229913260fe37cdb67b45197cdd8f96e61d437 Mon Sep 17 00:00:00 2001 From: Pavel Pogodaev Date: Wed, 24 May 2023 12:19:15 +0300 Subject: [PATCH] [#44] add tracing support for upload Signed-off-by: Pavel Pogodaev --- downloader/download.go | 67 +++++++++++++++++++++++++++--------------- downloader/head.go | 10 +++---- uploader/upload.go | 53 ++++++++++++++++++++------------- 3 files changed, 81 insertions(+), 49 deletions(-) diff --git a/downloader/download.go b/downloader/download.go index b7c242b..8a62dff 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -302,43 +302,54 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, r // byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that // prepares request and object address to it. -func (d *Downloader) byBucketname(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) { +func (d *Downloader) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) { var ( - bucketname = c.UserValue("cid").(string) - key = c.UserValue("oid").(string) + bucketname = req.UserValue("cid").(string) + key = req.UserValue("oid").(string) log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key)) ) - cnrID, err := utils.GetContainerID(d.appCtx, bucketname, d.containerResolver) + ctx, err := tokens.StoreBearerTokenAppCtx(req, d.appCtx) if err != nil { - log.Error("wrong container id", zap.Error(err)) - response.Error(c, "wrong container id", fasthttp.StatusBadRequest) + log.Error("could not fetch and store bearer token", zap.Error(err)) + response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) return } - ctx, err := tokens.StoreBearerTokenAppCtx(c, d.appCtx) + ctx, span := utils.StartHTTPServerSpan(ctx, req, "GET Object by bucket name", + trace.WithAttributes( + attribute.String("bucketname", bucketname), + attribute.String("objectKey", key), + )) + defer func() { + utils.SetHTTPTraceInfo(ctx, span, req) + span.End() + }() + + cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver) if err != nil { - log.Error("could not fetch and store bearer token", zap.Error(err)) - response.Error(c, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) + log.Error("wrong container id", zap.Error(err)) + response.Error(req, "wrong container id", fasthttp.StatusBadRequest) return } foundOid, err := d.tree.GetLatestVersion(ctx, cnrID, key) if err != nil { log.Error("object wasn't found", zap.Error(err)) - response.Error(c, "object wasn't found", fasthttp.StatusNotFound) + response.Error(req, "object wasn't found", fasthttp.StatusNotFound) return } if foundOid.DeleteMarker { log.Error("object was deleted") - response.Error(c, "object deleted", fasthttp.StatusNotFound) + response.Error(req, "object deleted", fasthttp.StatusNotFound) return } + var addr oid.Address addr.SetContainer(*cnrID) addr.SetObject(foundOid.OID) - f(ctx, *d.newRequest(c, log), d.pool, addr) + f(ctx, *d.newRequest(req, log), d.pool, addr) } // DownloadByAttribute handles attribute-based download requests. @@ -373,7 +384,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, return } - res, err := d.search(c, containerID, key, val, object.MatchStringEqual) + res, err := d.search(c, ctx, containerID, key, val, object.MatchStringEqual) if err != nil { log.Error("could not search for objects", zap.Error(err)) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) @@ -404,7 +415,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, f(ctx, *d.newRequest(c, log), d.pool, addrObj) } -func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) { +func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) { filters := object.NewSearchFilters() filters.AddRootFilter() filters.AddFilter(key, val, op) @@ -416,14 +427,14 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string prm.UseBearer(*btoken) } - return d.pool.SearchObjects(d.appCtx, prm) + return d.pool.SearchObjects(ctx, prm) } -func (d *Downloader) getContainer(cnrID cid.ID) (container.Container, error) { +func (d *Downloader) getContainer(ctx context.Context, cnrID cid.ID) (container.Container, error) { var prm pool.PrmContainerGet prm.SetContainerID(cnrID) - return d.pool.GetContainer(d.appCtx, prm) + return d.pool.GetContainer(ctx, prm) } func (d *Downloader) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) { @@ -450,7 +461,17 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string)) log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix)) - containerID, err := utils.GetContainerID(d.appCtx, scid, d.containerResolver) + ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "DOWNLOAD ZIP Object", + trace.WithAttributes( + attribute.String("prefix", prefix), + attribute.String("cid", scid), + )) + defer func() { + utils.SetHTTPTraceInfo(ctx, span, c) + span.End() + }() + + containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver) if err != nil { log.Error("wrong container id", zap.Error(err)) response.Error(c, "wrong container id", fasthttp.StatusBadRequest) @@ -466,7 +487,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { // check if container exists here to be able to return 404 error, // otherwise we get this error only in object iteration step // and client get 200 OK. - if _, err = d.getContainer(*containerID); err != nil { + if _, err = d.getContainer(ctx, *containerID); err != nil { log.Error("could not check container existence", zap.Error(err)) if client.IsErrContainerNotFound(err) { response.Error(c, "Not Found", fasthttp.StatusNotFound) @@ -476,7 +497,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { return } - resSearch, err := d.search(c, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) + resSearch, err := d.search(c, ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) if err != nil { log.Error("could not search for objects", zap.Error(err)) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) @@ -509,7 +530,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { empty = false addr.SetObject(id) - if err = d.zipObject(zipWriter, addr, btoken, bufZip); err != nil { + if err = d.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil { log.Error("failed to add object to archive", zap.String("oid", id.EncodeToString()), zap.Error(err)) } @@ -527,14 +548,14 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { }) } -func (d *Downloader) zipObject(zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error { +func (d *Downloader) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error { var prm pool.PrmObjectGet prm.SetAddress(addr) if btoken != nil { prm.UseBearer(*btoken) } - resGet, err := d.pool.GetObject(d.appCtx, prm) + resGet, err := d.pool.GetObject(ctx, prm) if err != nil { return fmt.Errorf("get FrostFS object: %v", err) } diff --git a/downloader/head.go b/downloader/head.go index a81a275..4d2f9dd 100644 --- a/downloader/head.go +++ b/downloader/head.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" @@ -29,11 +30,10 @@ const ( ) func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) { - ctx, span := utils.StartHTTPServerSpan(ctx, req.RequestCtx, "HEAD Object", - trace.WithAttributes( - attribute.String("cid", objectAddress.Container().EncodeToString()), - attribute.String("oid", objectAddress.Object().EncodeToString()), - )) + ctx, span := tracing.StartSpanFromContext(ctx, "HEAD Object", trace.WithAttributes( + attribute.String("cid", objectAddress.Container().EncodeToString()), + attribute.String("oid", objectAddress.Object().EncodeToString()), + )) defer func() { utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx) span.End() diff --git a/uploader/upload.go b/uploader/upload.go index 4e21f08..3569a0e 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -18,6 +18,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/valyala/fasthttp" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -64,27 +66,36 @@ func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Uplo } // Upload handles multipart upload request. -func (u *Uploader) Upload(c *fasthttp.RequestCtx) { +func (u *Uploader) Upload(req *fasthttp.RequestCtx) { var ( file MultipartFile idObj oid.ID addr oid.Address - scid, _ = c.UserValue("cid").(string) + scid, _ = req.UserValue("cid").(string) log = u.log.With(zap.String("cid", scid)) - bodyStream = c.RequestBodyStream() + bodyStream = req.RequestBodyStream() drainBuf = make([]byte, drainBufSize) ) - if err := tokens.StoreBearerToken(c); err != nil { + if err := tokens.StoreBearerToken(req); err != nil { log.Error("could not fetch bearer token", zap.Error(err)) - response.Error(c, "could not fetch bearer token", fasthttp.StatusBadRequest) + response.Error(req, "could not fetch bearer token", fasthttp.StatusBadRequest) return } - idCnr, err := utils.GetContainerID(u.appCtx, scid, u.containerResolver) + ctx, span := utils.StartHTTPServerSpan(u.appCtx, req, "UPLOAD Object", + trace.WithAttributes( + attribute.String("cid", scid), + )) + defer func() { + utils.SetHTTPTraceInfo(ctx, span, req) + span.End() + }() + + idCnr, err := utils.GetContainerID(ctx, scid, u.containerResolver) if err != nil { log.Error("wrong container id", zap.Error(err)) - response.Error(c, "wrong container id", fasthttp.StatusBadRequest) + response.Error(req, "wrong container id", fasthttp.StatusBadRequest) return } @@ -101,21 +112,21 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { zap.Error(err), ) }() - boundary := string(c.Request.Header.MultipartFormBoundary()) + boundary := string(req.Request.Header.MultipartFormBoundary()) if file, err = fetchMultipartFile(u.log, bodyStream, boundary); err != nil { log.Error("could not receive multipart/form", zap.Error(err)) - response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) + response.Error(req, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) return } - filtered, err := filterHeaders(u.log, &c.Request.Header) + filtered, err := filterHeaders(u.log, &req.Request.Header) if err != nil { log.Error("could not process headers", zap.Error(err)) - response.Error(c, err.Error(), fasthttp.StatusBadRequest) + response.Error(req, err.Error(), fasthttp.StatusBadRequest) return } now := time.Now() - if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil { + if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil { if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil { log.Warn("could not parse client time", zap.String("Date header", string(rawHeader)), zap.Error(err)) } else { @@ -123,9 +134,9 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { } } - if err = utils.PrepareExpirationHeader(c, u.pool, filtered, now); err != nil { + if err = utils.PrepareExpirationHeader(req, u.pool, filtered, now); err != nil { log.Error("could not prepare expiration header", zap.Error(err)) - response.Error(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest) + response.Error(req, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest) return } @@ -151,7 +162,7 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) attributes = append(attributes, *timestamp) } - id, bt := u.fetchOwnerAndBearerToken(c) + id, bt := u.fetchOwnerAndBearerToken(req) obj := object.New() obj.SetContainerID(*idCnr) @@ -166,8 +177,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { prm.UseBearer(*bt) } - if idObj, err = u.pool.PutObject(u.appCtx, prm); err != nil { - u.handlePutFrostFSErr(c, err) + if idObj, err = u.pool.PutObject(ctx, prm); err != nil { + u.handlePutFrostFSErr(req, err) return } @@ -175,9 +186,9 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { addr.SetContainer(*idCnr) // Try to return the response, otherwise, if something went wrong, throw an error. - if err = newPutResponse(addr).encode(c); err != nil { + if err = newPutResponse(addr).encode(req); err != nil { log.Error("could not encode response", zap.Error(err)) - response.Error(c, "could not encode response", fasthttp.StatusBadRequest) + response.Error(req, "could not encode response", fasthttp.StatusBadRequest) return } @@ -194,8 +205,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { } } // Report status code and content type. - c.Response.SetStatusCode(fasthttp.StatusOK) - c.Response.Header.SetContentType(jsonHeader) + req.Response.SetStatusCode(fasthttp.StatusOK) + req.Response.Header.SetContentType(jsonHeader) } func (u *Uploader) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error) {