[#44] add tracing support for upload
Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
This commit is contained in:
parent
1776db289c
commit
8a22991326
3 changed files with 81 additions and 49 deletions
|
@ -302,43 +302,54 @@ func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, r
|
||||||
|
|
||||||
// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
// prepares request and object address to it.
|
// prepares request and object address to it.
|
||||||
func (d *Downloader) byBucketname(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
func (d *Downloader) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
||||||
var (
|
var (
|
||||||
bucketname = c.UserValue("cid").(string)
|
bucketname = req.UserValue("cid").(string)
|
||||||
key = c.UserValue("oid").(string)
|
key = req.UserValue("oid").(string)
|
||||||
log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
||||||
)
|
)
|
||||||
|
|
||||||
cnrID, err := utils.GetContainerID(d.appCtx, bucketname, d.containerResolver)
|
ctx, err := tokens.StoreBearerTokenAppCtx(req, d.appCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("wrong container id", zap.Error(err))
|
log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, err := tokens.StoreBearerTokenAppCtx(c, d.appCtx)
|
ctx, span := utils.StartHTTPServerSpan(ctx, req, "GET Object by bucket name",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("bucketname", bucketname),
|
||||||
|
attribute.String("objectKey", key),
|
||||||
|
))
|
||||||
|
defer func() {
|
||||||
|
utils.SetHTTPTraceInfo(ctx, span, req)
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
|
||||||
|
cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not fetch and store bearer token", zap.Error(err))
|
log.Error("wrong container id", zap.Error(err))
|
||||||
response.Error(c, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
foundOid, err := d.tree.GetLatestVersion(ctx, cnrID, key)
|
foundOid, err := d.tree.GetLatestVersion(ctx, cnrID, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("object wasn't found", zap.Error(err))
|
log.Error("object wasn't found", zap.Error(err))
|
||||||
response.Error(c, "object wasn't found", fasthttp.StatusNotFound)
|
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if foundOid.DeleteMarker {
|
if foundOid.DeleteMarker {
|
||||||
log.Error("object was deleted")
|
log.Error("object was deleted")
|
||||||
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
response.Error(req, "object deleted", fasthttp.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(*cnrID)
|
addr.SetContainer(*cnrID)
|
||||||
addr.SetObject(foundOid.OID)
|
addr.SetObject(foundOid.OID)
|
||||||
|
|
||||||
f(ctx, *d.newRequest(c, log), d.pool, addr)
|
f(ctx, *d.newRequest(req, log), d.pool, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadByAttribute handles attribute-based download requests.
|
// DownloadByAttribute handles attribute-based download requests.
|
||||||
|
@ -373,7 +384,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := d.search(c, containerID, key, val, object.MatchStringEqual)
|
res, err := d.search(c, ctx, containerID, key, val, object.MatchStringEqual)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not search for objects", zap.Error(err))
|
log.Error("could not search for objects", zap.Error(err))
|
||||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
@ -404,7 +415,7 @@ func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context,
|
||||||
f(ctx, *d.newRequest(c, log), d.pool, addrObj)
|
f(ctx, *d.newRequest(c, log), d.pool, addrObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
func (d *Downloader) search(c *fasthttp.RequestCtx, ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
||||||
filters := object.NewSearchFilters()
|
filters := object.NewSearchFilters()
|
||||||
filters.AddRootFilter()
|
filters.AddRootFilter()
|
||||||
filters.AddFilter(key, val, op)
|
filters.AddFilter(key, val, op)
|
||||||
|
@ -416,14 +427,14 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string
|
||||||
prm.UseBearer(*btoken)
|
prm.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
return d.pool.SearchObjects(d.appCtx, prm)
|
return d.pool.SearchObjects(ctx, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) getContainer(cnrID cid.ID) (container.Container, error) {
|
func (d *Downloader) getContainer(ctx context.Context, cnrID cid.ID) (container.Container, error) {
|
||||||
var prm pool.PrmContainerGet
|
var prm pool.PrmContainerGet
|
||||||
prm.SetContainerID(cnrID)
|
prm.SetContainerID(cnrID)
|
||||||
|
|
||||||
return d.pool.GetContainer(d.appCtx, prm)
|
return d.pool.GetContainer(ctx, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
func (d *Downloader) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
||||||
|
@ -450,7 +461,17 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
|
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
|
||||||
log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
||||||
|
|
||||||
containerID, err := utils.GetContainerID(d.appCtx, scid, d.containerResolver)
|
ctx, span := utils.StartHTTPServerSpan(d.appCtx, c, "DOWNLOAD ZIP Object",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("prefix", prefix),
|
||||||
|
attribute.String("cid", scid),
|
||||||
|
))
|
||||||
|
defer func() {
|
||||||
|
utils.SetHTTPTraceInfo(ctx, span, c)
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
|
||||||
|
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("wrong container id", zap.Error(err))
|
log.Error("wrong container id", zap.Error(err))
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
@ -466,7 +487,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
// check if container exists here to be able to return 404 error,
|
// check if container exists here to be able to return 404 error,
|
||||||
// otherwise we get this error only in object iteration step
|
// otherwise we get this error only in object iteration step
|
||||||
// and client get 200 OK.
|
// and client get 200 OK.
|
||||||
if _, err = d.getContainer(*containerID); err != nil {
|
if _, err = d.getContainer(ctx, *containerID); err != nil {
|
||||||
log.Error("could not check container existence", zap.Error(err))
|
log.Error("could not check container existence", zap.Error(err))
|
||||||
if client.IsErrContainerNotFound(err) {
|
if client.IsErrContainerNotFound(err) {
|
||||||
response.Error(c, "Not Found", fasthttp.StatusNotFound)
|
response.Error(c, "Not Found", fasthttp.StatusNotFound)
|
||||||
|
@ -476,7 +497,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resSearch, err := d.search(c, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
resSearch, err := d.search(c, ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not search for objects", zap.Error(err))
|
log.Error("could not search for objects", zap.Error(err))
|
||||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
@ -509,7 +530,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
empty = false
|
empty = false
|
||||||
|
|
||||||
addr.SetObject(id)
|
addr.SetObject(id)
|
||||||
if err = d.zipObject(zipWriter, addr, btoken, bufZip); err != nil {
|
if err = d.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil {
|
||||||
log.Error("failed to add object to archive", zap.String("oid", id.EncodeToString()), zap.Error(err))
|
log.Error("failed to add object to archive", zap.String("oid", id.EncodeToString()), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -527,14 +548,14 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) zipObject(zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
func (d *Downloader) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
||||||
var prm pool.PrmObjectGet
|
var prm pool.PrmObjectGet
|
||||||
prm.SetAddress(addr)
|
prm.SetAddress(addr)
|
||||||
if btoken != nil {
|
if btoken != nil {
|
||||||
prm.UseBearer(*btoken)
|
prm.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
resGet, err := d.pool.GetObject(d.appCtx, prm)
|
resGet, err := d.pool.GetObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get FrostFS object: %v", err)
|
return fmt.Errorf("get FrostFS object: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
|
@ -29,8 +30,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
||||||
ctx, span := utils.StartHTTPServerSpan(ctx, req.RequestCtx, "HEAD Object",
|
ctx, span := tracing.StartSpanFromContext(ctx, "HEAD Object", trace.WithAttributes(
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("cid", objectAddress.Container().EncodeToString()),
|
attribute.String("cid", objectAddress.Container().EncodeToString()),
|
||||||
attribute.String("oid", objectAddress.Object().EncodeToString()),
|
attribute.String("oid", objectAddress.Object().EncodeToString()),
|
||||||
))
|
))
|
||||||
|
|
|
@ -18,6 +18,8 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -64,27 +66,36 @@ func New(ctx context.Context, params *utils.AppParams, settings *Settings) *Uplo
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upload handles multipart upload request.
|
// Upload handles multipart upload request.
|
||||||
func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
|
||||||
var (
|
var (
|
||||||
file MultipartFile
|
file MultipartFile
|
||||||
idObj oid.ID
|
idObj oid.ID
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
scid, _ = c.UserValue("cid").(string)
|
scid, _ = req.UserValue("cid").(string)
|
||||||
log = u.log.With(zap.String("cid", scid))
|
log = u.log.With(zap.String("cid", scid))
|
||||||
bodyStream = c.RequestBodyStream()
|
bodyStream = req.RequestBodyStream()
|
||||||
drainBuf = make([]byte, drainBufSize)
|
drainBuf = make([]byte, drainBufSize)
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := tokens.StoreBearerToken(c); err != nil {
|
if err := tokens.StoreBearerToken(req); err != nil {
|
||||||
log.Error("could not fetch bearer token", zap.Error(err))
|
log.Error("could not fetch bearer token", zap.Error(err))
|
||||||
response.Error(c, "could not fetch bearer token", fasthttp.StatusBadRequest)
|
response.Error(req, "could not fetch bearer token", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
idCnr, err := utils.GetContainerID(u.appCtx, scid, u.containerResolver)
|
ctx, span := utils.StartHTTPServerSpan(u.appCtx, req, "UPLOAD Object",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("cid", scid),
|
||||||
|
))
|
||||||
|
defer func() {
|
||||||
|
utils.SetHTTPTraceInfo(ctx, span, req)
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
|
||||||
|
idCnr, err := utils.GetContainerID(ctx, scid, u.containerResolver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("wrong container id", zap.Error(err))
|
log.Error("wrong container id", zap.Error(err))
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,21 +112,21 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
}()
|
}()
|
||||||
boundary := string(c.Request.Header.MultipartFormBoundary())
|
boundary := string(req.Request.Header.MultipartFormBoundary())
|
||||||
if file, err = fetchMultipartFile(u.log, bodyStream, boundary); err != nil {
|
if file, err = fetchMultipartFile(u.log, bodyStream, boundary); err != nil {
|
||||||
log.Error("could not receive multipart/form", zap.Error(err))
|
log.Error("could not receive multipart/form", zap.Error(err))
|
||||||
response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
filtered, err := filterHeaders(u.log, &c.Request.Header)
|
filtered, err := filterHeaders(u.log, &req.Request.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not process headers", zap.Error(err))
|
log.Error("could not process headers", zap.Error(err))
|
||||||
response.Error(c, err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req, err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||||
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
||||||
log.Warn("could not parse client time", zap.String("Date header", string(rawHeader)), zap.Error(err))
|
log.Warn("could not parse client time", zap.String("Date header", string(rawHeader)), zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
|
@ -123,9 +134,9 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = utils.PrepareExpirationHeader(c, u.pool, filtered, now); err != nil {
|
if err = utils.PrepareExpirationHeader(req, u.pool, filtered, now); err != nil {
|
||||||
log.Error("could not prepare expiration header", zap.Error(err))
|
log.Error("could not prepare expiration header", zap.Error(err))
|
||||||
response.Error(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,7 +162,7 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
||||||
attributes = append(attributes, *timestamp)
|
attributes = append(attributes, *timestamp)
|
||||||
}
|
}
|
||||||
id, bt := u.fetchOwnerAndBearerToken(c)
|
id, bt := u.fetchOwnerAndBearerToken(req)
|
||||||
|
|
||||||
obj := object.New()
|
obj := object.New()
|
||||||
obj.SetContainerID(*idCnr)
|
obj.SetContainerID(*idCnr)
|
||||||
|
@ -166,8 +177,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
prm.UseBearer(*bt)
|
prm.UseBearer(*bt)
|
||||||
}
|
}
|
||||||
|
|
||||||
if idObj, err = u.pool.PutObject(u.appCtx, prm); err != nil {
|
if idObj, err = u.pool.PutObject(ctx, prm); err != nil {
|
||||||
u.handlePutFrostFSErr(c, err)
|
u.handlePutFrostFSErr(req, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,9 +186,9 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
addr.SetContainer(*idCnr)
|
addr.SetContainer(*idCnr)
|
||||||
|
|
||||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||||
if err = newPutResponse(addr).encode(c); err != nil {
|
if err = newPutResponse(addr).encode(req); err != nil {
|
||||||
log.Error("could not encode response", zap.Error(err))
|
log.Error("could not encode response", zap.Error(err))
|
||||||
response.Error(c, "could not encode response", fasthttp.StatusBadRequest)
|
response.Error(req, "could not encode response", fasthttp.StatusBadRequest)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -194,8 +205,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Report status code and content type.
|
// Report status code and content type.
|
||||||
c.Response.SetStatusCode(fasthttp.StatusOK)
|
req.Response.SetStatusCode(fasthttp.StatusOK)
|
||||||
c.Response.Header.SetContentType(jsonHeader)
|
req.Response.Header.SetContentType(jsonHeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *Uploader) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error) {
|
func (u *Uploader) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error) {
|
||||||
|
|
Loading…
Reference in a new issue