[#44] add tracing support for upload #51
|
@ -302,43 +302,54 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, r
|
|||
|
||||
// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (d *Downloader) byBucketname(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
||||
func (d *Downloader) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
||||
var (
|
||||
bucketname = c.UserValue("cid").(string)
|
||||
key = c.UserValue("oid").(string)
|
||||
bucketname = req.UserValue("cid").(string)
|
||||
key = req.UserValue("oid").(string)
|
||||
log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
||||
)
|
||||
|
||||
cnrID, err := utils.GetContainerID(d.appCtx, bucketname, d.containerResolver)
|
||||
ctx, err := tokens.StoreBearerTokenAppCtx(req, d.appCtx)
|
||||
if err != nil {
|
||||
log.Error("wrong container id", zap.Error(err))
|
||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||
log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, err := tokens.StoreBearerTokenAppCtx(c, d.appCtx)
|
||||
ctx, span := utils.StartHTTPServerSpan(ctx, req, "GET Object by bucket name",
|
||||
trace.WithAttributes(
|
||||
attribute.String("bucketname", bucketname),
|
||||
attribute.String("objectKey", key),
|
||||
))
|
||||
defer func() {
|
||||
utils.SetHTTPTraceInfo(ctx, span, req)
|
||||
span.End()
|
||||
}()
|
||||
|
||||
cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver)
|
||||
if err != nil {
|
||||
log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||
response.Error(c, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
log.Error("wrong container id", zap.Error(err))
|
||||
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
foundOid, err := d.tree.GetLatestVersion(ctx, cnrID, key)
|
||||
if err != nil {
|
||||
log.Error("object wasn't found", zap.Error(err))
|
||||
pogpp marked this conversation as resolved
Outdated
|
||||
response.Error(c, "object wasn't found", fasthttp.StatusNotFound)
|
||||
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if foundOid.DeleteMarker {
|
||||
log.Error("object was deleted")
|
||||
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
||||
response.Error(req, "object deleted", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(*cnrID)
|
||||
addr.SetObject(foundOid.OID)
|
||||
|
||||
f(ctx, *d.newRequest(c, log), d.pool, addr)
|
||||
f(ctx, *d.newRequest(req, log), d.pool, addr)
|
||||
}
|
||||
|
||||
// DownloadByAttribute handles attribute-based download requests.
|
||||
|
@ -373,7 +384,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
|
|||
return
|
||||
}
|
||||
|
||||
res, err := d.search(c, containerID, key, val, object.MatchStringEqual)
|
||||
res, err := d.search(c, ctx, containerID, key, val, object.MatchStringEqual)
|
||||
if err != nil {
|
||||
log.Error("could not search for objects", zap.Error(err))
|
||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
|
@ -404,7 +415,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
|
|||
f(ctx, *d.newRequest(c, log), d.pool, addrObj)
|
||||
}
|
||||
|
||||
func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
||||
func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
||||
pogpp marked this conversation as resolved
Outdated
dkirillov
commented
Drop
This also require to add
to Also, having Drop `c *fasthttp.RequestCtx` and store bearer token in `ctx`.
So you can use the following code in `DownloadZipped` function (probably we can rename `StoreBearerTokenAppCtx` to `StoreBearerTokenToCtx`)
```golang
if ctx, err = tokens.StoreBearerTokenAppCtx(c, ctx); err != nil {
// ...
}
// ...
resSearch, err := d.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
// ...
```
This also require to add
```golang
if ctx, err = tokens.StoreBearerTokenAppCtx(c, ctx); err != nil {
// ...
}
```
to `byAttribute` before invoke [search](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/a0e6eba6f7ee730165dacc5968cd7d073cd58642/downloader/download.go#L387)
Also, having `tokens.StoreBearerTokenAppCtx` in `byAttribute` can simplify getting bearer token in [receiveFile](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/a0e6eba6f7ee730165dacc5968cd7d073cd58642/downloader/download.go#L100-L108) and [headObject](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/a0e6eba6f7ee730165dacc5968cd7d073cd58642/downloader/head.go#L43-L49).
We should use just `bearerToken(ctx)` there
dkirillov
commented
Now, we can drop [this](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/1776db289c54088ef4ba6bdec26a7c5f2294e790/downloader/download.go#L100-L104) and [this](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/1776db289c54088ef4ba6bdec26a7c5f2294e790/downloader/head.go#L43-L47)
|
||||
filters := object.NewSearchFilters()
|
||||
filters.AddRootFilter()
|
||||
filters.AddFilter(key, val, op)
|
||||
|
@ -416,14 +427,14 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string
|
|||
prm.UseBearer(*btoken)
|
||||
}
|
||||
|
||||
return d.pool.SearchObjects(d.appCtx, prm)
|
||||
return d.pool.SearchObjects(ctx, prm)
|
||||
}
|
||||
|
||||
func (d *Downloader) getContainer(cnrID cid.ID) (container.Container, error) {
|
||||
func (d *Downloader) getContainer(ctx context.Context, cnrID cid.ID) (container.Container, error) {
|
||||
var prm pool.PrmContainerGet
|
||||
prm.SetContainerID(cnrID)
|
||||
|
||||
return d.pool.GetContainer(d.appCtx, prm)
|
||||
return d.pool.GetContainer(ctx, prm)
|
||||
}
|
||||
|
||||
func (d *Downloader) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
||||
|
@ -450,7 +461,17 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
|
||||
log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
||||
|
||||
containerID, err := utils.GetContainerID(d.appCtx, scid, d.containerResolver)
|
||||
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "DOWNLOAD ZIP Object",
|
||||
trace.WithAttributes(
|
||||
attribute.String("prefix", prefix),
|
||||
attribute.String("cid", scid),
|
||||
))
|
||||
defer func() {
|
||||
utils.SetHTTPTraceInfo(ctx, span, c)
|
||||
span.End()
|
||||
}()
|
||||
|
||||
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
||||
if err != nil {
|
||||
log.Error("wrong container id", zap.Error(err))
|
||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||
|
@ -466,7 +487,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
// check if container exists here to be able to return 404 error,
|
||||
// otherwise we get this error only in object iteration step
|
||||
// and client get 200 OK.
|
||||
if _, err = d.getContainer(*containerID); err != nil {
|
||||
if _, err = d.getContainer(ctx, *containerID); err != nil {
|
||||
log.Error("could not check container existence", zap.Error(err))
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
response.Error(c, "Not Found", fasthttp.StatusNotFound)
|
||||
|
@ -476,7 +497,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
return
|
||||
}
|
||||
|
||||
resSearch, err := d.search(c, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
resSearch, err := d.search(c, ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
if err != nil {
|
||||
log.Error("could not search for objects", zap.Error(err))
|
||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
|
@ -509,7 +530,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
empty = false
|
||||
|
||||
addr.SetObject(id)
|
||||
if err = d.zipObject(zipWriter, addr, btoken, bufZip); err != nil {
|
||||
if err = d.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil {
|
||||
log.Error("failed to add object to archive", zap.String("oid", id.EncodeToString()), zap.Error(err))
|
||||
}
|
||||
|
||||
|
@ -527,14 +548,14 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
})
|
||||
}
|
||||
|
||||
func (d *Downloader) zipObject(zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
||||
func (d *Downloader) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
||||
var prm pool.PrmObjectGet
|
||||
prm.SetAddress(addr)
|
||||
if btoken != nil {
|
||||
prm.UseBearer(*btoken)
|
||||
}
|
||||
|
||||
resGet, err := d.pool.GetObject(d.appCtx, prm)
|
||||
resGet, err := d.pool.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get FrostFS object: %v", err)
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
|
@ -29,11 +30,10 @@ const (
|
|||
)
|
||||
|
||||
func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
||||
ctx, span := utils.StartHTTPServerSpan(ctx, req.RequestCtx, "HEAD Object",
|
||||
trace.WithAttributes(
|
||||
attribute.String("cid", objectAddress.Container().EncodeToString()),
|
||||
attribute.String("oid", objectAddress.Object().EncodeToString()),
|
||||
))
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "HEAD Object", trace.WithAttributes(
|
||||
dkirillov marked this conversation as resolved
dkirillov
commented
Do we really need this new span if we have already started one here and the second will be started in SDK right in Do we really need this new span if we have already started one [here](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/a0e6eba6f7ee730165dacc5968cd7d073cd58642/downloader/download.go#L369) and the second will be started in SDK right in `clnt.HeadObject(ctx, prm)`
alexvanin
commented
In fact, client does not start a new span. Storage node will start a new span by receiving new request, providing I suggest to leave this code until we refactor span initialization as a middleware as we discussed before. In fact, client does not start a new span. Storage node will start a new span by receiving new request, providing `cid` and `oid` as attributes.
I suggest to leave this code until we refactor span initialization as a middleware as we discussed before.
dkirillov
commented
Shouldn't we add the similar span to receiveFile then, to be more consistent ? Shouldn't we add the similar span to [receiveFile](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/1776db289c54088ef4ba6bdec26a7c5f2294e790/downloader/download.go#L93) then, to be more consistent ?
pogpp
commented
Like that? ```
func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
var (
err error
dis = "inline"
start = time.Now()
filename string
)
ctx, span := utils.StartHTTPServerSpan(ctx, req.RequestCtx, "RECEIVE Object",
trace.WithAttributes(
attribute.String("objectAddress", objectAddress.String()),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, c)
span.End()
}()
```
Like that?
dkirillov
commented
Yes, or just exact copy from
Yes, or just exact copy from `headObject`:
```golang
ctx, span := tracing.StartSpanFromContext(ctx,"GET Object",trace.WithAttributes(
attribute.String("cid", objectAddress.Container().EncodeToString()),
attribute.String("oid", objectAddress.Object().EncodeToString()),
))
defer func() {
utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx)
span.End()
}()
```
|
||||
attribute.String("cid", objectAddress.Container().EncodeToString()),
|
||||
attribute.String("oid", objectAddress.Object().EncodeToString()),
|
||||
))
|
||||
defer func() {
|
||||
utils.SetHTTPTraceInfo(ctx, span, req.RequestCtx)
|
||||
span.End()
|
||||
|
|
|
@ -18,6 +18,8 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -64,27 +66,36 @@ func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Uplo
|
|||
}
|
||||
|
||||
// Upload handles multipart upload request.
|
||||
func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||
func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
|
||||
pogpp marked this conversation as resolved
Outdated
dkirillov
commented
Do we really change this? If so, then let's change in Do we really change this? If so, then let's change in `(d *Downloader) byBucketname` method
|
||||
var (
|
||||
file MultipartFile
|
||||
idObj oid.ID
|
||||
addr oid.Address
|
||||
scid, _ = c.UserValue("cid").(string)
|
||||
scid, _ = req.UserValue("cid").(string)
|
||||
log = u.log.With(zap.String("cid", scid))
|
||||
bodyStream = c.RequestBodyStream()
|
||||
bodyStream = req.RequestBodyStream()
|
||||
drainBuf = make([]byte, drainBufSize)
|
||||
)
|
||||
|
||||
if err := tokens.StoreBearerToken(c); err != nil {
|
||||
if err := tokens.StoreBearerToken(req); err != nil {
|
||||
log.Error("could not fetch bearer token", zap.Error(err))
|
||||
response.Error(c, "could not fetch bearer token", fasthttp.StatusBadRequest)
|
||||
response.Error(req, "could not fetch bearer token", fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
idCnr, err := utils.GetContainerID(u.appCtx, scid, u.containerResolver)
|
||||
ctx, span := utils.StartHTTPServerSpan(u.appCtx, req, "UPLOAD Object",
|
||||
trace.WithAttributes(
|
||||
attribute.String("cid", scid),
|
||||
pogpp marked this conversation as resolved
Outdated
dkirillov
commented
It's just
It's just `cid`, so let's write:
```golang
attribute.String("cid", scid),
```
|
||||
))
|
||||
defer func() {
|
||||
utils.SetHTTPTraceInfo(ctx, span, req)
|
||||
span.End()
|
||||
}()
|
||||
|
||||
idCnr, err := utils.GetContainerID(ctx, scid, u.containerResolver)
|
||||
if err != nil {
|
||||
log.Error("wrong container id", zap.Error(err))
|
||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -101,21 +112,21 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
|||
zap.Error(err),
|
||||
)
|
||||
}()
|
||||
boundary := string(c.Request.Header.MultipartFormBoundary())
|
||||
boundary := string(req.Request.Header.MultipartFormBoundary())
|
||||
if file, err = fetchMultipartFile(u.log, bodyStream, boundary); err != nil {
|
||||
log.Error("could not receive multipart/form", zap.Error(err))
|
||||
response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
response.Error(req, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
filtered, err := filterHeaders(u.log, &c.Request.Header)
|
||||
filtered, err := filterHeaders(u.log, &req.Request.Header)
|
||||
if err != nil {
|
||||
log.Error("could not process headers", zap.Error(err))
|
||||
response.Error(c, err.Error(), fasthttp.StatusBadRequest)
|
||||
response.Error(req, err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||
if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
||||
log.Warn("could not parse client time", zap.String("Date header", string(rawHeader)), zap.Error(err))
|
||||
} else {
|
||||
|
@ -123,9 +134,9 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
if err = utils.PrepareExpirationHeader(c, u.pool, filtered, now); err != nil {
|
||||
if err = utils.PrepareExpirationHeader(req, u.pool, filtered, now); err != nil {
|
||||
log.Error("could not prepare expiration header", zap.Error(err))
|
||||
response.Error(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
response.Error(req, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -151,7 +162,7 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
|||
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
||||
attributes = append(attributes, *timestamp)
|
||||
}
|
||||
id, bt := u.fetchOwnerAndBearerToken(c)
|
||||
id, bt := u.fetchOwnerAndBearerToken(req)
|
||||
|
||||
obj := object.New()
|
||||
obj.SetContainerID(*idCnr)
|
||||
|
@ -166,8 +177,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
|||
prm.UseBearer(*bt)
|
||||
}
|
||||
|
||||
pogpp marked this conversation as resolved
Outdated
dkirillov
commented
It seemsa we have to place this span creation before It seemsa we have to place this span creation before `line:86 ... utils.GetContainerID(...)` and don't use `u.appCtx` after span be created
|
||||
if idObj, err = u.pool.PutObject(u.appCtx, prm); err != nil {
|
||||
u.handlePutFrostFSErr(c, err)
|
||||
if idObj, err = u.pool.PutObject(ctx, prm); err != nil {
|
||||
u.handlePutFrostFSErr(req, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -175,9 +186,9 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
|||
addr.SetContainer(*idCnr)
|
||||
|
||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||
if err = newPutResponse(addr).encode(c); err != nil {
|
||||
if err = newPutResponse(addr).encode(req); err != nil {
|
||||
log.Error("could not encode response", zap.Error(err))
|
||||
response.Error(c, "could not encode response", fasthttp.StatusBadRequest)
|
||||
response.Error(req, "could not encode response", fasthttp.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -194,8 +205,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
|||
}
|
||||
}
|
||||
// Report status code and content type.
|
||||
c.Response.SetStatusCode(fasthttp.StatusOK)
|
||||
c.Response.Header.SetContentType(jsonHeader)
|
||||
req.Response.SetStatusCode(fasthttp.StatusOK)
|
||||
req.Response.Header.SetContentType(jsonHeader)
|
||||
}
|
||||
|
||||
func (u *Uploader) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error) {
|
||||
|
|
The same as previous comment, we have to place this before any requests to other services, to be able provide trace info to storage node