[#191] Refactor error handling and logging #221

Open
dkirillov wants to merge 2 commits from dkirillov/frostfs-http-gw:bugfix/191-correct-status-codes into master
17 changed files with 393 additions and 374 deletions

View file

@ -606,7 +606,7 @@ func (a *app) Serve() {
close(a.webDone)
}()
handle := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool)
handle := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool), a.log), workerPool)
// Configure router.
a.configureRouter(handle)
@ -734,7 +734,7 @@ func (a *app) configureRouter(h *handler.Handler) {
r := router.New()
r.RedirectTrailingSlash = true
r.NotFound = func(r *fasthttp.RequestCtx) {
handler.ResponseError(r, "Not found", fasthttp.StatusNotFound)
handler.ResponseError(r, "Route Not found", fasthttp.StatusNotFound)
}
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
handler.ResponseError(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)

View file

@ -20,9 +20,11 @@ import (
containerv2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -94,6 +96,7 @@ func TestIntegration(t *testing.T) {
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID) })
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID) })
t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID) })
t.Run("test status codes "+version, func(t *testing.T) { checkStatusCodes(ctx, t, clientPool, ownerID, version) })
cancel()
server.Wait()
@ -260,7 +263,7 @@ func putWithDuplicateKeys(t *testing.T, CID cid.ID) {
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "key duplication error: "+attr+"\n", string(body))
require.Contains(t, string(body), "key duplication error: "+attr+"\n")
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
}
@ -429,7 +432,80 @@ func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
resp, err = http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, resp.StatusCode)
}
func checkStatusCodes(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, version string) {
cli := http.Client{Timeout: 30 * time.Second}
t.Run("container not found by name", func(t *testing.T) {
resp, err := cli.Get(testHost + "/get/unknown/object")
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, resp.StatusCode)
requireBodyContains(t, resp, "container not found")
})
t.Run("container not found by cid", func(t *testing.T) {
cnrIDTest := cidtest.ID()
resp, err := cli.Get(testHost + "/get/" + cnrIDTest.EncodeToString() + "/object")
require.NoError(t, err)
requireBodyContains(t, resp, "container not found")
require.Equal(t, http.StatusNotFound, resp.StatusCode)
})
t.Run("object not found in storage", func(t *testing.T) {
resp, err := cli.Get(testHost + "/get_by_attribute/" + testContainerName + "/FilePath/object2")
require.NoError(t, err)
requireBodyContains(t, resp, "object not found")
require.Equal(t, http.StatusNotFound, resp.StatusCode)
})
t.Run("access denied", func(t *testing.T) {
basicACL := acl.Private
var recs []*eacl.Record
if version == "1.2.7" {
basicACL = acl.PublicRWExtended
rec := eacl.NewRecord()
rec.SetAction(eacl.ActionDeny)
rec.SetOperation(eacl.OperationGet)
recs = append(recs, rec)
}
cnrID, err := createContainerBase(ctx, t, clientPool, ownerID, basicACL, "")
require.NoError(t, err)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
jsonToken, _ := makeBearerTokens(t, key, ownerID, version, recs...)
t.Run("get", func(t *testing.T) {
request, err := http.NewRequest(http.MethodGet, testHost+"/get/"+cnrID.EncodeToString()+"/object", nil)
require.NoError(t, err)
request.Header.Set("Authorization", "Bearer "+jsonToken)
resp, err := cli.Do(request)
require.NoError(t, err)
requireBodyContains(t, resp, "access denied")
require.Equal(t, http.StatusForbidden, resp.StatusCode)
})
t.Run("upload", func(t *testing.T) {
request, _, _ := makePutRequest(t, testHost+"/upload/"+cnrID.EncodeToString())
request.Header.Set("Authorization", "Bearer "+jsonToken)
resp, err := cli.Do(request)
require.NoError(t, err)
requireBodyContains(t, resp, "access denied")
require.Equal(t, http.StatusForbidden, resp.StatusCode)
})
})
}
func requireBodyContains(t *testing.T, resp *http.Response, msg string) {
data, err := io.ReadAll(resp.Body)
require.NoError(t, err)
defer resp.Body.Close()
require.Contains(t, strings.ToLower(string(data)), strings.ToLower(msg))
}
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
@ -478,6 +554,10 @@ func getPool(ctx context.Context, t *testing.T, key *keys.PrivateKey) *pool.Pool
}
func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID) (cid.ID, error) {
return createContainerBase(ctx, t, clientPool, ownerID, acl.PublicRWExtended, testContainerName)
}
func createContainerBase(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, basicACL acl.Basic, cnrName string) (cid.ID, error) {
var policy netmap.PlacementPolicy
err := policy.DecodeString("REP 1")
require.NoError(t, err)
@ -485,24 +565,28 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
var cnr container.Container
cnr.Init()
cnr.SetPlacementPolicy(policy)
cnr.SetBasicACL(acl.PublicRWExtended)
cnr.SetBasicACL(basicACL)
cnr.SetOwner(ownerID)
container.SetCreationTime(&cnr, time.Now())
var domain container.Domain
domain.SetName(testContainerName)
if cnrName != "" {
var domain container.Domain
domain.SetName(cnrName)
cnr.SetAttribute(containerv2.SysAttributeName, domain.Name())
cnr.SetAttribute(containerv2.SysAttributeZone, domain.Zone())
cnr.SetAttribute(containerv2.SysAttributeName, domain.Name())
cnr.SetAttribute(containerv2.SysAttributeZone, domain.Zone())
}
var waitPrm pool.WaitParams
waitPrm.SetTimeout(15 * time.Second)
waitPrm.SetPollInterval(3 * time.Second)
var prm pool.PrmContainerPut
prm.SetContainer(cnr)
prm.SetWaitParams(waitPrm)
prm := pool.PrmContainerPut{
ClientParams: client.PrmContainerPut{
Container: &cnr,
},
WaitParams: &pool.WaitParams{
Timeout: 15 * time.Second,
PollInterval: 3 * time.Second,
},
}
CID, err := clientPool.PutContainer(ctx, prm)
if err != nil {
@ -549,13 +633,18 @@ func registerUser(t *testing.T, ctx context.Context, aioContainer testcontainers
require.NoError(t, err)
}
func makeBearerTokens(t *testing.T, key *keys.PrivateKey, ownerID user.ID, version string) (jsonTokenBase64, binaryTokenBase64 string) {
func makeBearerTokens(t *testing.T, key *keys.PrivateKey, ownerID user.ID, version string, records ...*eacl.Record) (jsonTokenBase64, binaryTokenBase64 string) {
tkn := new(bearer.Token)
tkn.ForUser(ownerID)
tkn.SetExp(10000)
if version == "1.2.7" {
tkn.SetEACLTable(*eacl.NewTable())
table := eacl.NewTable()
for i := range records {
table.AddRecord(records[i])
}
tkn.SetEACLTable(*table)
} else {
tkn.SetImpersonate(true)
}

View file

@ -94,6 +94,8 @@ The `filename` field from the multipart form will be set as `FileName` attribute
|--------|----------------------------------------------|
| 200 | Object created successfully. |
| 400 | Some error occurred during object uploading. |
| 403 | Access denied. |
| 409 | Can not upload object due to quota reached. |
## Get object
@ -141,6 +143,7 @@ Get an object (payload and attributes) by an address.
|--------|------------------------------------------------|
| 200 | Object got successfully. |
| 400 | Some error occurred during object downloading. |
| 403 | Access denied. |
| 404 | Container or object not found. |
###### Body
@ -183,6 +186,7 @@ Get an object attributes by an address.
|--------|---------------------------------------------------|
| 200 | Object head successfully. |
| 400 | Some error occurred during object HEAD operation. |
| 403 | Access denied. |
| 404 | Container or object not found. |
## Search object
@ -233,6 +237,7 @@ If more than one object is found, an arbitrary one will be returned.
|--------|------------------------------------------------|
| 200 | Object got successfully. |
| 400 | Some error occurred during object downloading. |
| 403 | Access denied. |
| 404 | Container or object not found. |
#### HEAD
@ -269,6 +274,7 @@ If more than one object is found, an arbitrary one will be used to get attribute
|--------|---------------------------------------|
| 200 | Object head successfully. |
| 400 | Some error occurred during operation. |
| 403 | Access denied. |
| 404 | Container or object not found. |
## Download archive
@ -304,16 +310,17 @@ Archive can be compressed (see http-gw [configuration](gate-configuration.md#arc
###### Headers
| Header | Description |
|-----------------------|-------------------------------------------------------------------------------------------------------------------|
| `Content-Disposition` | Indicate how to browsers should treat file (`attachment`). Set `filename` as `archive.zip`. |
| `Content-Type` | Indicate content type of object. Set to `application/zip` |
| Header | Description |
|-----------------------|---------------------------------------------------------------------------------------------|
| `Content-Disposition` | Indicate how to browsers should treat file (`attachment`). Set `filename` as `archive.zip`. |
| `Content-Type` | Indicate content type of object. Set to `application/zip` |
###### Status codes
| Status | Description |
|--------|-----------------------------------------------------|
| 200 | Object got successfully. |
| 400 | Some error occurred during object downloading. |
| 404 | Container or objects not found. |
| 500 | Some inner error (e.g. error on streaming objects). |
| Status | Description |
|--------|------------------------------------------------|
| 200 | Object got successfully. |
| 400 | Some error occurred during object downloading. |
| 403 | Access denied. |
| 404 | Container or objects not found. |
| 409 | Can not upload object due to quota reached. |
Review

We can't seem to get a status of 409 when downloading the archive.

We can't seem to get a status of 409 when downloading the archive.

View file

@ -273,7 +273,7 @@ func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs Re
})
if err != nil {
wg.Done()
log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err), logs.TagField(logs.TagDatapath))
log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err), logs.TagField(logs.TagDatapath))
}
select {
case <-ctx.Done():
@ -328,20 +328,18 @@ type browseParams struct {
listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error)
}
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
func (h *Handler) browseObjects(ctx context.Context, req *fasthttp.RequestCtx, p browseParams) {
const S3Protocol = "s3"
const FrostfsProtocol = "frostfs"
ctx := utils.GetContextFromRequest(c)
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
zap.String("bucket", p.bucketInfo.Name),
zap.String("container", p.bucketInfo.CID.EncodeToString()),
zap.String("prefix", p.prefix),
)
))
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
if err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToListObjects, err)
return
}
@ -360,7 +358,7 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
"parentDir": parentDir,
}).Parse(h.config.IndexPageTemplate())
if err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToParseTemplate, err)
return
}
bucketName := p.bucketInfo.Name
@ -369,14 +367,14 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
bucketName = p.bucketInfo.CID.EncodeToString()
protocol = FrostfsProtocol
}
if err = tmpl.Execute(c, &BrowsePageData{
if err = tmpl.Execute(req, &BrowsePageData{
Container: bucketName,
Prefix: p.prefix,
Objects: objects,
Protocol: protocol,
HasErrors: resp.hasErrors,
}); err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToExecuteTemplate, err)
return
}
}

View file

@ -25,43 +25,38 @@ import (
)
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadByAddressOrBucketName")
func (h *Handler) DownloadByAddressOrBucketName(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadByAddressOrBucketName")
defer span.End()
utils.SetContextToRequest(ctx, c)
cidParam := c.UserValue("cid").(string)
oidParam := c.UserValue("oid").(string)
downloadParam := c.QueryArgs().GetBool("download")
cidParam := req.UserValue("cid").(string)
oidParam := req.UserValue("oid").(string)
downloadParam := req.QueryArgs().GetBool("download")
log := utils.GetReqLogOrDefault(ctx, h.log).With(
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
zap.String("cid", cidParam),
zap.String("oid", oidParam),
)
))
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
bktInfo, err := h.getBucketInfo(ctx, cidParam)
if err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return
}
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
log.Error(logs.FailedToCheckIfSettingsNodeExist, zap.String("cid", bktInfo.CID.String()),
zap.Error(checkS3Err), logs.TagField(logs.TagExternalStorageTree))
logAndSendBucketError(c, log, checkS3Err)
h.logAndSendError(ctx, req, logs.FailedToCheckIfSettingsNodeExist, checkS3Err)
return
}
req := newRequest(c, log)
var objID oid.ID
if checkS3Err == nil && shouldDownload(oidParam, downloadParam) {
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.receiveFile)
} else if err = objID.DecodeString(oidParam); err == nil {
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.receiveFile)
} else {
h.browseIndex(c, checkS3Err != nil)
h.browseIndex(ctx, req, cidParam, oidParam, checkS3Err != nil)
}
}
@ -70,12 +65,11 @@ func shouldDownload(oidParam string, downloadParam bool) bool {
}
// DownloadByAttribute handles attribute-based download requests.
func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadByAttribute")
func (h *Handler) DownloadByAttribute(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadByAttribute")
defer span.End()
utils.SetContextToRequest(ctx, c)
h.byAttribute(c, h.receiveFile)
h.byAttribute(ctx, req, h.receiveFile)
}
func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
@ -95,31 +89,33 @@ func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op
}
// DownloadZip handles zip by prefix requests.
func (h *Handler) DownloadZip(c *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadZip")
func (h *Handler) DownloadZip(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadZip")
defer span.End()
utils.SetContextToRequest(ctx, c)
scid, _ := c.UserValue("cid").(string)
scid, _ := req.UserValue("cid").(string)
prefix, _ := req.UserValue("prefix").(string)
log := utils.GetReqLogOrDefault(ctx, h.log)
bktInfo, err := h.getBucketInfo(ctx, scid, log)
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", scid), zap.String("prefix", prefix)))
bktInfo, err := h.getBucketInfo(ctx, scid)
if err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return
}
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
resSearch, err := h.searchObjectsByPrefix(ctx, bktInfo.CID, prefix)
if err != nil {
return
}
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
req.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
req.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
c.SetBodyStreamWriter(h.getZipResponseWriter(ctx, log, resSearch, bktInfo))
req.SetBodyStreamWriter(h.getZipResponseWriter(ctx, resSearch, bktInfo))
}
func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
func (h *Handler) getZipResponseWriter(ctx context.Context, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
return func(w *bufio.Writer) {
defer resSearch.Close()
@ -127,20 +123,20 @@ func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, res
zipWriter := zip.NewWriter(w)
var objectsWritten int
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, bktInfo.CID, buf,
func(obj *object.Object) (io.Writer, error) {
objectsWritten++
return h.createZipFile(zipWriter, obj)
}),
)
if errIter != nil {
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
h.reqLogger(ctx).Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
return
} else if objectsWritten == 0 {
log.Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
h.reqLogger(ctx).Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
}
if err := zipWriter.Close(); err != nil {
log.Error(logs.CloseZipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
h.reqLogger(ctx).Error(logs.CloseZipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
}
}
}
@ -164,31 +160,33 @@ func (h *Handler) createZipFile(zw *zip.Writer, obj *object.Object) (io.Writer,
}
// DownloadTar forms tar.gz from objects by prefix.
func (h *Handler) DownloadTar(c *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadTar")
func (h *Handler) DownloadTar(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadTar")
defer span.End()
utils.SetContextToRequest(ctx, c)
scid, _ := c.UserValue("cid").(string)
scid, _ := req.UserValue("cid").(string)
prefix, _ := req.UserValue("prefix").(string)
log := utils.GetReqLogOrDefault(ctx, h.log)
bktInfo, err := h.getBucketInfo(ctx, scid, log)
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", scid), zap.String("prefix", prefix)))
bktInfo, err := h.getBucketInfo(ctx, scid)
if err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return
}
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
resSearch, err := h.searchObjectsByPrefix(ctx, bktInfo.CID, prefix)
if err != nil {
return
}
c.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip")
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"")
req.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip")
req.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"")
c.SetBodyStreamWriter(h.getTarResponseWriter(ctx, log, resSearch, bktInfo))
req.SetBodyStreamWriter(h.getTarResponseWriter(ctx, resSearch, bktInfo))
}
func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
func (h *Handler) getTarResponseWriter(ctx context.Context, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
return func(w *bufio.Writer) {
defer resSearch.Close()
@ -203,26 +201,26 @@ func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, res
defer func() {
if err := tarWriter.Close(); err != nil {
log.Error(logs.CloseTarWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
h.reqLogger(ctx).Error(logs.CloseTarWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
}
if err := gzipWriter.Close(); err != nil {
log.Error(logs.CloseGzipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
h.reqLogger(ctx).Error(logs.CloseGzipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
}
}()
var objectsWritten int
buf := make([]byte, 3<<20) // the same as for upload
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, bktInfo.CID, buf,
func(obj *object.Object) (io.Writer, error) {
objectsWritten++
return h.createTarFile(tarWriter, obj)
}),
)
if errIter != nil {
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
h.reqLogger(ctx).Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
} else if objectsWritten == 0 {
log.Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
h.reqLogger(ctx).Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
}
}
}
@ -240,9 +238,9 @@ func (h *Handler) createTarFile(tw *tar.Writer, obj *object.Object) (io.Writer,
})
}
func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool {
func (h *Handler) putObjectToArchive(ctx context.Context, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool {
return func(id oid.ID) bool {
log = log.With(zap.String("oid", id.EncodeToString()))
logger := h.reqLogger(ctx).With(zap.String("oid", id.EncodeToString()))
prm := PrmObjectGet{
PrmAuth: PrmAuth{
@ -253,18 +251,18 @@ func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID
resGet, err := h.frostfs.GetObject(ctx, prm)
if err != nil {
log.Error(logs.FailedToGetObject, zap.Error(err), logs.TagField(logs.TagExternalStorage))
logger.Error(logs.FailedToGetObject, zap.Error(err), logs.TagField(logs.TagExternalStorage))
return false
}
fileWriter, err := createArchiveHeader(&resGet.Header)
if err != nil {
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
logger.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
return false
}
if err = writeToArchive(resGet, fileWriter, buf); err != nil {
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
logger.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
return false
}
@ -272,28 +270,17 @@ func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID
}
}
func (h *Handler) searchObjectsByPrefix(c *fasthttp.RequestCtx, log *zap.Logger, cnrID cid.ID) (ResObjectSearch, error) {
scid, _ := c.UserValue("cid").(string)
prefix, _ := c.UserValue("prefix").(string)
ctx := utils.GetContextFromRequest(c)
func (h *Handler) searchObjectsByPrefix(ctx context.Context, cnrID cid.ID, prefix string) (ResObjectSearch, error) {
prefix, err := url.QueryUnescape(prefix)
if err != nil {
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix),
zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
return nil, err
return nil, fmt.Errorf("unescape prefix: %w", err)
}
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage))
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
return nil, err
return nil, fmt.Errorf("search objects by prefix: %w", err)
}
return resSearch, nil
}

View file

@ -16,7 +16,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -142,6 +141,10 @@ var (
ErrGatewayTimeout = errors.New("gateway timeout")
// ErrQuotaLimitReached is returned from FrostFS in case of quota exceeded.
ErrQuotaLimitReached = errors.New("quota limit reached")
// ErrContainerNotFound is returned from FrostFS in case of container was not found.
ErrContainerNotFound = errors.New("container not found")
// ErrObjectNotFound is returned from FrostFS in case of object was not found.
ErrObjectNotFound = errors.New("object not found")
)
// FrostFS represents virtual connection to FrostFS network.
@ -195,7 +198,7 @@ func New(params *AppParams, config Config, tree layer.TreeService, workerPool *a
// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it.
func (h *Handler) byNativeAddress(ctx context.Context, req request, cnrID cid.ID, objID oid.ID, handler func(context.Context, request, oid.Address)) {
func (h *Handler) byNativeAddress(ctx context.Context, req *fasthttp.RequestCtx, cnrID cid.ID, objID oid.ID, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
ctx, span := tracing.StartSpanFromContext(ctx, "handler.byNativeAddress")
defer span.End()
@ -205,72 +208,59 @@ func (h *Handler) byNativeAddress(ctx context.Context, req request, cnrID cid.ID
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// resolves object address from S3-like path <bucket name>/<object key>.
func (h *Handler) byS3Path(ctx context.Context, req request, cnrID cid.ID, path string, handler func(context.Context, request, oid.Address)) {
func (h *Handler) byS3Path(ctx context.Context, req *fasthttp.RequestCtx, cnrID cid.ID, path string, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
ctx, span := tracing.StartSpanFromContext(ctx, "handler.byS3Path")
defer span.End()
c, log := req.RequestCtx, req.log
foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path)
if err != nil {
log.Error(logs.FailedToGetLatestVersionOfObject, zap.Error(err), zap.String("cid", cnrID.String()),
zap.String("path", path), logs.TagField(logs.TagExternalStorageTree))
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToGetLatestVersionOfObject, err, zap.String("path", path))
return
}
if foundOID.IsDeleteMarker {
log.Error(logs.ObjectWasDeleted, logs.TagField(logs.TagExternalStorageTree))
ResponseError(c, "object deleted", fasthttp.StatusNotFound)
h.logAndSendError(ctx, req, logs.ObjectWasDeleted, ErrObjectNotFound)
return
}
addr := newAddress(cnrID, foundOID.OID)
handler(ctx, newRequest(c, log), addr)
handler(ctx, req, addr)
}
// byAttribute is a wrapper similar to byNativeAddress.
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Context, request, oid.Address)) {
cidParam, _ := c.UserValue("cid").(string)
key, _ := c.UserValue("attr_key").(string)
val, _ := c.UserValue("attr_val").(string)
ctx := utils.GetContextFromRequest(c)
log := utils.GetReqLogOrDefault(ctx, h.log)
func (h *Handler) byAttribute(ctx context.Context, req *fasthttp.RequestCtx, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
cidParam, _ := req.UserValue("cid").(string)
key, _ := req.UserValue("attr_key").(string)
val, _ := req.UserValue("attr_val").(string)
key, err := url.QueryUnescape(key)
if err != nil {
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_key", key),
zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
h.logAndSendError(ctx, req, logs.FailedToUnescapeQuery, err, zap.String("cid", cidParam), zap.String("attr_key", key))
return
}
val, err = url.QueryUnescape(val)
if err != nil {
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_val", val),
zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
h.logAndSendError(ctx, req, logs.FailedToUnescapeQuery, err, zap.String("cid", cidParam), zap.String("attr_val", key))
return
}
val = prepareAtribute(key, val)
log = log.With(zap.String("cid", cidParam), zap.String("attr_key", key), zap.String("attr_val", val))
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", cidParam),
zap.String("attr_key", key), zap.String("attr_val", val)))
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
bktInfo, err := h.getBucketInfo(ctx, cidParam)
if err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return
}
objID, err := h.findObjectByAttribute(ctx, log, bktInfo.CID, key, val)
objID, err := h.findObjectByAttribute(ctx, bktInfo.CID, key, val)
if err != nil {
if errors.Is(err, io.EOF) {
ResponseError(c, err.Error(), fasthttp.StatusNotFound)
return
err = fmt.Errorf("%w: %s", ErrObjectNotFound, err.Error())
}
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
h.logAndSendError(ctx, req, logs.FailedToFindObjectByAttribute, err)
return
}
@ -278,14 +268,13 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte
addr.SetContainer(bktInfo.CID)
addr.SetObject(objID)
handler(ctx, newRequest(c, log), addr)
handler(ctx, req, addr)
}
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
func (h *Handler) findObjectByAttribute(ctx context.Context, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual)
if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage))
return oid.ID{}, fmt.Errorf("could not search for objects: %w", err)
return oid.ID{}, fmt.Errorf("search objects: %w", err)
}
defer res.Close()
@ -295,14 +284,14 @@ func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cn
if n == 0 {
switch {
case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal):
log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage))
h.reqLogger(ctx).Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage))
attrVal = prepareAtribute(attrFileName, attrVal)
return h.findObjectByAttribute(ctx, log, cnrID, attrFileName, attrVal)
return h.findObjectByAttribute(ctx, cnrID, attrFileName, attrVal)
case errors.Is(err, io.EOF):
log.Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage))
h.reqLogger(ctx).Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage))
return oid.ID{}, fmt.Errorf("object not found: %w", err)
default:
log.Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage))
h.reqLogger(ctx).Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage))
return oid.ID{}, fmt.Errorf("read object list failed: %w", err)
}
}
@ -354,13 +343,13 @@ func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*ci
if err != nil {
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
if err != nil && strings.Contains(err.Error(), "not found") {
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error())
err = fmt.Errorf("%w: %s", ErrContainerNotFound, err.Error())
}
}
return cnrID, err
}
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
func (h *Handler) getBucketInfo(ctx context.Context, containerName string) (*data.BucketInfo, error) {
ns, err := middleware.GetNamespace(ctx)
if err != nil {
return nil, err
@ -372,21 +361,16 @@ func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *
cnrID, err := h.resolveContainer(ctx, containerName)
if err != nil {
log.Error(logs.CouldNotResolveContainerID, zap.Error(err), zap.String("cnrName", containerName),
logs.TagField(logs.TagDatapath))
return nil, err
return nil, fmt.Errorf("resolve container: %w", err)
}
bktInfo, err := h.readContainer(ctx, *cnrID)
if err != nil {
log.Error(logs.CouldNotGetContainerInfo, zap.Error(err), zap.String("cnrName", containerName),
zap.String("cnrName", cnrID.String()),
logs.TagField(logs.TagExternalStorage))
return nil, err
return nil, fmt.Errorf("read container: %w", err)
}
if err = h.cache.Put(bktInfo); err != nil {
log.Warn(logs.CouldntPutBucketIntoCache,
h.reqLogger(ctx).Warn(logs.CouldntPutBucketIntoCache,
zap.String("bucket name", bktInfo.Name),
zap.Stringer("bucket cid", bktInfo.CID),
zap.Error(err),
@ -419,31 +403,24 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
return bktInfo, err
}
func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.browseIndex")
func (h *Handler) browseIndex(ctx context.Context, req *fasthttp.RequestCtx, cidParam, oidParam string, isNativeList bool) {
ctx, span := tracing.StartSpanFromContext(ctx, "handler.browseIndex")
defer span.End()
utils.SetContextToRequest(ctx, c)
if !h.config.IndexPageEnabled() {
c.SetStatusCode(fasthttp.StatusNotFound)
req.SetStatusCode(fasthttp.StatusNotFound)
return
}
cidURLParam := c.UserValue("cid").(string)
oidURLParam := c.UserValue("oid").(string)
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(zap.String("cid", cidURLParam), zap.String("oid", oidURLParam))
unescapedKey, err := url.QueryUnescape(oidURLParam)
unescapedKey, err := url.QueryUnescape(oidParam)
if err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToUnescapeOIDParam, err)
return
}
bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log)
bktInfo, err := h.getBucketInfo(ctx, cidParam)
if err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return
}
@ -453,7 +430,7 @@ func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) {
listFunc = h.getDirObjectsNative
}
h.browseObjects(c, browseParams{
h.browseObjects(ctx, req, browseParams{
bucketInfo: bktInfo,
prefix: unescapedKey,
listObjects: listFunc,

View file

@ -374,7 +374,7 @@ func TestFindObjectByAttribute(t *testing.T) {
obj.SetAttributes(tc.firstAttr, tc.secondAttr)
hc.cfg.additionalSearch = tc.additionalSearch
objID, err := hc.Handler().findObjectByAttribute(ctx, hc.Handler().log, cnrID, tc.reqAttrKey, tc.reqAttrValue)
objID, err := hc.Handler().findObjectByAttribute(ctx, cnrID, tc.reqAttrKey, tc.reqAttrValue)
if tc.err != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.err)

View file

@ -27,7 +27,7 @@ const (
hdrContainerID = "X-Container-Id"
)
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) {
func (h *Handler) headObject(ctx context.Context, req *fasthttp.RequestCtx, objectAddress oid.Address) {
var start = time.Now()
btoken := bearerToken(ctx)
@ -41,7 +41,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
obj, err := h.frostfs.HeadObject(ctx, prm)
if err != nil {
req.handleFrostFSErr(err, start)
h.logAndSendError(ctx, req, logs.FailedToHeadObject, err, zap.Stringer("elapsed", time.Since(start)))
return
}
@ -65,7 +65,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
case object.AttributeTimestamp:
value, err := strconv.ParseInt(val, 10, 64)
if err != nil {
req.log.Info(logs.CouldntParseCreationDate,
h.reqLogger(ctx).Info(logs.CouldntParseCreationDate,
zap.String("key", key),
zap.String("val", val),
zap.Error(err),
@ -100,7 +100,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
return h.frostfs.RangeObject(ctx, prmRange)
}, filename)
if err != nil && err != io.EOF {
req.handleFrostFSErr(err, start)
h.logAndSendError(ctx, req, logs.FailedToDetectContentTypeFromPayload, err, zap.Stringer("elapsed", time.Since(start)))
return
}
}
@ -116,40 +116,37 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
}
// HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format.
func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.HeadByAddressOrBucketName")
func (h *Handler) HeadByAddressOrBucketName(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.HeadByAddressOrBucketName")
defer span.End()
cidParam, _ := c.UserValue("cid").(string)
oidParam, _ := c.UserValue("oid").(string)
cidParam, _ := req.UserValue("cid").(string)
oidParam, _ := req.UserValue("oid").(string)
log := utils.GetReqLogOrDefault(ctx, h.log).With(
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
zap.String("cid", cidParam),
zap.String("oid", oidParam),
)
))
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
bktInfo, err := h.getBucketInfo(ctx, cidParam)
if err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return
}
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
log.Error(logs.FailedToCheckIfSettingsNodeExist, zap.String("cid", bktInfo.CID.String()),
zap.Error(checkS3Err), logs.TagField(logs.TagExternalStorageTree))
logAndSendBucketError(c, log, checkS3Err)
h.logAndSendError(ctx, req, logs.FailedToCheckIfSettingsNodeExist, checkS3Err)
return
}
req := newRequest(c, log)
var objID oid.ID
if checkS3Err == nil {
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.headObject)
} else if err = objID.DecodeString(oidParam); err == nil {
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.headObject)
} else {
logAndSendBucketError(c, log, checkS3Err)
h.logAndSendError(ctx, req, logs.InvalidOIDParam, err)
}
}
@ -157,7 +154,6 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
func (h *Handler) HeadByAttribute(c *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.HeadByAttribute")
defer span.End()
utils.SetContextToRequest(ctx, c)
h.byAttribute(c, h.headObject)
h.byAttribute(ctx, c, h.headObject)
}

View file

@ -1,6 +1,7 @@
package handler
import (
"context"
"errors"
"io"
"strconv"
@ -53,7 +54,7 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
}
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
func (h *Handler) getPayload(ctx context.Context, p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
cid, ok := p.obj.Header.ContainerID()
if !ok {
return nil, 0, errors.New("no container id set")
@ -66,7 +67,6 @@ func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64,
if err != nil {
return nil, 0, err
}
ctx := p.req.RequestCtx
params := PrmInitMultiObjectReader{
Addr: newAddress(cid, oid),
Bearer: bearerToken(ctx),

View file

@ -63,11 +63,10 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error), file
type getMultiobjectBodyParams struct {
obj *Object
req request
strSize string
}
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address) {
func (h *Handler) receiveFile(ctx context.Context, req *fasthttp.RequestCtx, objAddress oid.Address) {
var (
shouldDownload = req.QueryArgs().GetBool("download")
start = time.Now()
@ -85,12 +84,12 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
rObj, err := h.frostfs.GetObject(ctx, prm)
if err != nil {
req.handleFrostFSErr(err, start)
h.logAndSendError(ctx, req, logs.FailedToGetObject, err, zap.Stringer("elapsed", time.Since(start)))
return
}
// we can't close reader in this function, so how to do it?
req.setIDs(rObj.Header)
setIDs(req, rObj.Header)
payload := rObj.Payload
payloadSize := rObj.Header.PayloadSize()
for _, attr := range rObj.Header.Attributes() {
@ -107,8 +106,8 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
case object.AttributeFileName:
filename = val
case object.AttributeTimestamp:
if err = req.setTimestamp(val); err != nil {
req.log.Error(logs.CouldntParseCreationDate,
if err = setTimestamp(req, val); err != nil {
h.reqLogger(ctx).Error(logs.CouldntParseCreationDate,
zap.String("val", val),
zap.Error(err),
logs.TagField(logs.TagDatapath))
@ -118,13 +117,12 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
case object.AttributeFilePath:
filepath = val
case attributeMultipartObjectSize:
payload, payloadSize, err = h.getPayload(getMultiobjectBodyParams{
payload, payloadSize, err = h.getPayload(ctx, getMultiobjectBodyParams{
obj: rObj,
req: req,
strSize: val,
})
if err != nil {
req.handleFrostFSErr(err, start)
h.logAndSendError(ctx, req, logs.FailedToGetObjectPayload, err, zap.Stringer("elapsed", time.Since(start)))
return
}
}
@ -133,7 +131,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
filename = filepath
}
req.setDisposition(shouldDownload, filename)
setDisposition(req, shouldDownload, filename)
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
@ -145,8 +143,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
return payload, nil
}, filename)
if err != nil && err != io.EOF {
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
h.logAndSendError(ctx, req, logs.FailedToDetectContentTypeFromPayload, err, zap.Stringer("elapsed", time.Since(start)))
return
}
@ -165,7 +162,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
req.Response.SetBodyStream(payload, int(payloadSize))
}
func (r *request) setIDs(obj object.Object) {
func setIDs(r *fasthttp.RequestCtx, obj object.Object) {
objID, _ := obj.ID()
cnrID, _ := obj.ContainerID()
r.Response.Header.Set(hdrObjectID, objID.String())
@ -173,7 +170,7 @@ func (r *request) setIDs(obj object.Object) {
r.Response.Header.Set(hdrContainerID, cnrID.String())
}
func (r *request) setDisposition(shouldDownload bool, filename string) {
func setDisposition(r *fasthttp.RequestCtx, shouldDownload bool, filename string) {
const (
inlineDisposition = "inline"
attachmentDisposition = "attachment"
@ -187,7 +184,7 @@ func (r *request) setDisposition(shouldDownload bool, filename string) {
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
}
func (r *request) setTimestamp(timestamp string) error {
func setTimestamp(r *fasthttp.RequestCtx, timestamp string) error {
value, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return err

View file

@ -50,44 +50,41 @@ func (pr *putResponse) encode(w io.Writer) error {
}
// Upload handles multipart upload request.
func (h *Handler) Upload(c *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.Upload")
func (h *Handler) Upload(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.Upload")
defer span.End()
utils.SetContextToRequest(ctx, c)
var file MultipartFile
scid, _ := c.UserValue("cid").(string)
bodyStream := c.RequestBodyStream()
scid, _ := req.UserValue("cid").(string)
bodyStream := req.RequestBodyStream()
drainBuf := make([]byte, drainBufSize)
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(zap.String("cid", scid))
log := h.reqLogger(ctx)
ctx = utils.SetReqLog(ctx, log.With(zap.String("cid", scid)))
bktInfo, err := h.getBucketInfo(ctx, scid, log)
bktInfo, err := h.getBucketInfo(ctx, scid)
if err != nil {
logAndSendBucketError(c, log, err)
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return
}
boundary := string(c.Request.Header.MultipartFormBoundary())
boundary := string(req.Request.Header.MultipartFormBoundary())
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
h.logAndSendError(ctx, req, logs.CouldNotReceiveMultipartForm, err)
return
}
filtered, err := filterHeaders(log, &c.Request.Header)
filtered, err := filterHeaders(log, &req.Request.Header)
if err != nil {
log.Error(logs.FailedToFilterHeaders, zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
h.logAndSendError(ctx, req, logs.FailedToFilterHeaders, err)
return
}
if c.Request.Header.Peek(explodeArchiveHeader) != nil {
h.explodeArchive(request{c, log}, bktInfo, file, filtered)
if req.Request.Header.Peek(explodeArchiveHeader) != nil {
h.explodeArchive(ctx, req, bktInfo, file, filtered)
} else {
h.uploadSingleObject(request{c, log}, bktInfo, file, filtered)
h.uploadSingleObject(ctx, req, bktInfo, file, filtered)
}
// Multipart is multipart and thus can contain more than one part which
@ -104,46 +101,39 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
}
}
func (h *Handler) uploadSingleObject(req request, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
c, log := req.RequestCtx, req.log
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.uploadSingleObject")
func (h *Handler) uploadSingleObject(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
ctx, span := tracing.StartSpanFromContext(ctx, "handler.uploadSingleObject")
defer span.End()
utils.SetContextToRequest(ctx, c)
setIfNotExist(filtered, object.AttributeFileName, file.FileName())
attributes, err := h.extractAttributes(c, log, filtered)
attributes, err := h.extractAttributes(ctx, req, filtered)
if err != nil {
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err)
return
}
idObj, err := h.uploadObject(c, bkt, attributes, file)
idObj, err := h.uploadObject(ctx, bkt, attributes, file)
if err != nil {
h.handlePutFrostFSErr(c, err, log)
h.logAndSendError(ctx, req, logs.FailedToUploadObject, err)
return
}
log.Debug(logs.ObjectUploaded,
h.reqLogger(ctx).Debug(logs.ObjectUploaded,
zap.String("oid", idObj.EncodeToString()),
zap.String("FileName", file.FileName()),
logs.TagField(logs.TagExternalStorage),
)
addr := newAddress(bkt.CID, idObj)
c.Response.Header.SetContentType(jsonHeader)
req.Response.Header.SetContentType(jsonHeader)
// Try to return the response, otherwise, if something went wrong, throw an error.
if err = newPutResponse(addr).encode(c); err != nil {
log.Error(logs.CouldNotEncodeResponse, zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not encode response", fasthttp.StatusBadRequest)
if err = newPutResponse(addr).encode(req); err != nil {
h.logAndSendError(ctx, req, logs.CouldNotEncodeResponse, err)
return
}
}
func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
ctx := utils.GetContextFromRequest(c)
func (h *Handler) uploadObject(ctx context.Context, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
obj := object.New()
obj.SetContainerID(bkt.CID)
obj.SetOwnerID(*h.ownerID)
@ -168,19 +158,18 @@ func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, att
return idObj, nil
}
func (h *Handler) extractAttributes(c *fasthttp.RequestCtx, log *zap.Logger, filtered map[string]string) ([]object.Attribute, error) {
ctx := utils.GetContextFromRequest(c)
func (h *Handler) extractAttributes(ctx context.Context, req *fasthttp.RequestCtx, filtered map[string]string) ([]object.Attribute, error) {
now := time.Now()
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err),
h.reqLogger(ctx).Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err),
logs.TagField(logs.TagDatapath))
} else {
now = parsed
}
}
if err := utils.PrepareExpirationHeader(ctx, h.frostfs, filtered, now); err != nil {
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath))
h.reqLogger(ctx).Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath))
return nil, err
}
attributes := make([]object.Attribute, 0, len(filtered))
@ -207,38 +196,33 @@ func newAttribute(key string, val string) object.Attribute {
// explodeArchive read files from archive and creates objects for each of them.
// Sets FilePath attribute with name from tar.Header.
func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
c, log := req.RequestCtx, req.log
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.explodeArchive")
func (h *Handler) explodeArchive(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
ctx, span := tracing.StartSpanFromContext(ctx, "handler.explodeArchive")
defer span.End()
utils.SetContextToRequest(ctx, c)
// remove user attributes which vary for each file in archive
// to guarantee that they won't appear twice
delete(filtered, object.AttributeFileName)
delete(filtered, object.AttributeFilePath)
commonAttributes, err := h.extractAttributes(c, log, filtered)
commonAttributes, err := h.extractAttributes(ctx, req, filtered)
if err != nil {
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err)
return
}
attributes := commonAttributes
reader := file
if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
log.Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath))
if bytes.EqualFold(req.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
h.reqLogger(ctx).Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath))
gzipReader, err := gzip.NewReader(file)
if err != nil {
log.Error(logs.FailedToCreateGzipReader, zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could read gzip file: "+err.Error(), fasthttp.StatusBadRequest)
h.logAndSendError(ctx, req, logs.FailedToCreateGzipReader, err)
return
}
defer func() {
if err := gzipReader.Close(); err != nil {
log.Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
h.reqLogger(ctx).Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
}
}()
reader = gzipReader
@ -250,8 +234,7 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
if errors.Is(err, io.EOF) {
break
} else if err != nil {
log.Error(logs.FailedToReadFileFromTar, zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not get next entry: "+err.Error(), fasthttp.StatusBadRequest)
h.logAndSendError(ctx, req, logs.FailedToReadFileFromTar, err)
return
}
@ -265,13 +248,13 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name))
attributes = append(attributes, newAttribute(object.AttributeFileName, fileName))
idObj, err := h.uploadObject(c, bkt, attributes, tarReader)
idObj, err := h.uploadObject(ctx, bkt, attributes, tarReader)
if err != nil {
h.handlePutFrostFSErr(c, err, log)
h.logAndSendError(ctx, req, logs.FailedToUploadObject, err)
return
}
log.Debug(logs.ObjectUploaded,
h.reqLogger(ctx).Debug(logs.ObjectUploaded,
zap.String("oid", idObj.EncodeToString()),
zap.String("FileName", fileName),
logs.TagField(logs.TagExternalStorage),
@ -279,14 +262,6 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
}
}
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {
statusCode, msg, additionalFields := formErrorResponse("could not store file in frostfs", err)
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
log.Error(logs.CouldNotStoreFileInFrostfs, append(logFields, logs.TagField(logs.TagExternalStorage))...)
ResponseError(r, msg, statusCode)
}
func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token {
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
return tkn

View file

@ -5,13 +5,12 @@ import (
"errors"
"fmt"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
sdkstatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -19,30 +18,6 @@ import (
"go.uber.org/zap"
)
type request struct {
*fasthttp.RequestCtx
log *zap.Logger
}
func newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
return request{
RequestCtx: ctx,
log: log,
}
}
func (r *request) handleFrostFSErr(err error, start time.Time) {
logFields := []zap.Field{
zap.Stringer("elapsed", time.Since(start)),
zap.Error(err),
}
statusCode, msg, additionalFields := formErrorResponse("could not receive object", err)
logFields = append(logFields, additionalFields...)
r.log.Error(logs.CouldNotReceiveObject, append(logFields, logs.TagField(logs.TagExternalStorage))...)
ResponseError(r.RequestCtx, msg, statusCode)
}
func bearerToken(ctx context.Context) *bearer.Token {
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
return tkn
@ -84,14 +59,16 @@ func isValidValue(s string) bool {
return true
}
func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
log.Error(logs.CouldNotGetBucket, zap.Error(err), logs.TagField(logs.TagDatapath))
func (h *Handler) reqLogger(ctx context.Context) *zap.Logger {
Review

question: is this function intentionally not used here and here? It's just that if within the handler package we wrap the utils.GetReqLogOrDefault(ctx, h.log) function in reqLogger(ctx context.Context), don't we want to use it wherever possible?

question: is this function intentionally not used [here](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/75328dd9340d5c1e4320ce95dc118b00f3a5b8c1/internal/handler/browse.go#L226) and [here](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/75328dd9340d5c1e4320ce95dc118b00f3a5b8c1/internal/handler/browse.go#L261)? It's just that if within the handler package we wrap the `utils.GetReqLogOrDefault(ctx, h.log)` function in `reqLogger(ctx context.Context)`, don't we want to use it wherever possible?
return utils.GetReqLogOrDefault(ctx, h.log)
}
if client.IsErrContainerNotFound(err) {
ResponseError(c, "Not Found", fasthttp.StatusNotFound)
return
}
ResponseError(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
func (h *Handler) logAndSendError(ctx context.Context, c *fasthttp.RequestCtx, msg string, err error, additional ...zap.Field) {
utils.GetReqLogOrDefault(ctx, h.log).Error(msg,
append([]zap.Field{zap.Error(err), logs.TagField(logs.TagDatapath)}, additional...)...)
msg, code := formErrorResponse(err)
ResponseError(c, msg, code)
}
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
@ -112,31 +89,23 @@ func ResponseError(r *fasthttp.RequestCtx, msg string, code int) {
r.Error(msg+"\n", code)
}
func formErrorResponse(message string, err error) (int, string, []zap.Field) {
var (
msg string
statusCode int
logFields []zap.Field
)
st := new(sdkstatus.ObjectAccessDenied)
func formErrorResponse(err error) (string, int) {
switch {
case errors.As(err, &st):
statusCode = fasthttp.StatusForbidden
reason := st.Reason()
msg = fmt.Sprintf("%s: %v: %s", message, err, reason)
logFields = append(logFields, zap.String("error_detail", reason))
case errors.Is(err, ErrAccessDenied):
return fmt.Sprintf("Storage Access Denied:\n%v", err), fasthttp.StatusForbidden
case errors.Is(err, layer.ErrNodeAccessDenied):
return fmt.Sprintf("Tree Access Denied:\n%v", err), fasthttp.StatusForbidden
case errors.Is(err, ErrQuotaLimitReached):
statusCode = fasthttp.StatusConflict
msg = fmt.Sprintf("%s: %v", message, err)
case client.IsErrObjectNotFound(err) || client.IsErrContainerNotFound(err):
statusCode = fasthttp.StatusNotFound
msg = "Not Found"
return fmt.Sprintf("Quota Reached:\n%v", err), fasthttp.StatusConflict
case errors.Is(err, ErrContainerNotFound):
return fmt.Sprintf("Container Not Found:\n%v", err), fasthttp.StatusNotFound
case errors.Is(err, ErrObjectNotFound):
return fmt.Sprintf("Object Not Found:\n%v", err), fasthttp.StatusNotFound
case errors.Is(err, layer.ErrNodeNotFound):
return fmt.Sprintf("Tree Node Not Found:\n%v", err), fasthttp.StatusNotFound
case errors.Is(err, ErrGatewayTimeout):
return fmt.Sprintf("Gateway Timeout:\n%v", err), fasthttp.StatusGatewayTimeout
Review

Should we also note in the API documentation the possibility of returning a 504 GatewayTimeout?

Should we also note in the API documentation the possibility of returning a 504 GatewayTimeout?
default:
statusCode = fasthttp.StatusBadRequest
msg = fmt.Sprintf("%s: %v", message, err)
return fmt.Sprintf("Bad Request:\n%v", err), fasthttp.StatusBadRequest
}
return statusCode, msg, logFields
}

View file

@ -77,7 +77,7 @@ const (
// Log messages with the "datapath" tag.
const (
CouldntParseCreationDate = "couldn't parse creation date"
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload"
FailedToDetectContentTypeFromPayload = "failed to detect Content-Type from payload"
FailedToAddObjectToArchive = "failed to add object to archive"
CloseZipWriter = "close zip writer"
IgnorePartEmptyFormName = "ignore part, empty form name"
@ -104,28 +104,32 @@ const (
CouldNotReceiveMultipartForm = "could not receive multipart/form"
ObjectsNotFound = "objects not found"
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed"
CouldNotGetBucket = "could not get bucket"
CouldNotResolveContainerID = "could not resolve container id"
FailedToSumbitTaskToPool = "failed to submit task to pool"
FailedToGetBucketInfo = "could not get bucket info"
FailedToSubmitTaskToPool = "failed to submit task to pool"
ObjectWasDeleted = "object was deleted"
FailedToGetLatestVersionOfObject = "failed to get latest version of object"
FailedToCheckIfSettingsNodeExist = "failed to check if settings node exists"
FailedToListObjects = "failed to list objects"
FailedToParseTemplate = "failed to parse template"
FailedToExecuteTemplate = "failed to execute template"
FailedToUploadObject = "failed to upload object"
FailedToHeadObject = "failed to head object"
FailedToGetObject = "failed to get object"
FailedToGetObjectPayload = "failed to get object payload"
FailedToFindObjectByAttribute = "failed to get find object by attribute"
FailedToUnescapeOIDParam = "failed to unescape oid param"
InvalidOIDParam = "invalid oid param"
)
// Log messages with the "external_storage" tag.
const (
CouldNotReceiveObject = "could not receive object"
CouldNotSearchForObjects = "could not search for objects"
ObjectNotFound = "object not found"
ReadObjectListFailed = "read object list failed"
CouldNotStoreFileInFrostfs = "could not store file in frostfs"
FailedToHeadObject = "failed to head object"
ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName"
FailedToGetObject = "failed to get object"
ObjectUploaded = "object uploaded"
CouldNotGetContainerInfo = "could not get container info"
)
// Log messages with the "external_storage_tree" tag.
const (
ObjectWasDeleted = "object was deleted"
FailedToGetLatestVersionOfObject = "failed to get latest version of object"
FailedToCheckIfSettingsNodeExist = "Failed to check if settings node exists"
FoundSeveralSystemTreeNodes = "found several system tree nodes"
)

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
qostagging "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -48,7 +49,7 @@ func (x *FrostFS) Container(ctx context.Context, containerPrm handler.PrmContain
res, err := x.pool.GetContainer(ctx, prm)
if err != nil {
return nil, handleObjectError("read container via connection pool", err)
return nil, handleStorageError("read container via connection pool", err)
}
return &res, nil
@ -72,7 +73,7 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm handler.PrmObjectCreate)
idObj, err := x.pool.PutObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmPut)
if err != nil {
return oid.ID{}, handleObjectError("save object via connection pool", err)
return oid.ID{}, handleStorageError("save object via connection pool", err)
}
return idObj.ObjectID, nil
}
@ -88,7 +89,7 @@ func (x payloadReader) Read(p []byte) (int, error) {
if err != nil && errors.Is(err, io.EOF) {
return n, err
}
return n, handleObjectError("read payload", err)
return n, handleStorageError("read payload", err)
}
// HeadObject implements frostfs.FrostFS interface method.
@ -105,7 +106,7 @@ func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*o
res, err := x.pool.HeadObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmHead)
if err != nil {
return nil, handleObjectError("read object header via connection pool", err)
return nil, handleStorageError("read object header via connection pool", err)
}
return &res, nil
@ -125,7 +126,7 @@ func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*han
res, err := x.pool.GetObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmGet)
if err != nil {
return nil, handleObjectError("init full object reading via connection pool", err)
return nil, handleStorageError("init full object reading via connection pool", err)
}
return &handler.Object{
@ -150,7 +151,7 @@ func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (
res, err := x.pool.ObjectRange(qostagging.ContextWithIOTag(ctx, clientIOTag), prmRange)
if err != nil {
return nil, handleObjectError("init payload range reading via connection pool", err)
return nil, handleStorageError("init payload range reading via connection pool", err)
}
return payloadReader{&res}, nil
@ -171,7 +172,7 @@ func (x *FrostFS) SearchObjects(ctx context.Context, prm handler.PrmObjectSearch
res, err := x.pool.SearchObjects(qostagging.ContextWithIOTag(ctx, clientIOTag), prmSearch)
if err != nil {
return nil, handleObjectError("init object search via connection pool", err)
return nil, handleStorageError("init object search via connection pool", err)
}
return &res, nil
@ -205,7 +206,7 @@ func (x *FrostFS) NetmapSnapshot(ctx context.Context) (netmap.NetMap, error) {
netmapSnapshot, err := x.pool.NetMapSnapshot(ctx)
if err != nil {
return netmapSnapshot, handleObjectError("get netmap via connection pool", err)
return netmapSnapshot, handleStorageError("get netmap via connection pool", err)
}
return netmapSnapshot, nil
@ -229,7 +230,7 @@ func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
networkInfo, err := x.pool.NetworkInfo(ctx)
if err != nil {
return "", handleObjectError("read network info via client", err)
return "", handleStorageError("read network info via client", err)
}
domain := networkInfo.RawNetworkParameter("SystemDNS")
@ -240,7 +241,7 @@ func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
return string(domain), nil
}
func handleObjectError(msg string, err error) error {
func handleStorageError(msg string, err error) error {
if err == nil {
return nil
}
@ -253,6 +254,14 @@ func handleObjectError(msg string, err error) error {
return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason)
}
if client.IsErrContainerNotFound(err) {
return fmt.Errorf("%s: %w: %s", msg, handler.ErrContainerNotFound, err.Error())
}
if client.IsErrObjectNotFound(err) {
return fmt.Errorf("%s: %w: %s", msg, handler.ErrObjectNotFound, err.Error())
}
if IsTimeoutError(err) {
return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error())
}

View file

@ -18,7 +18,7 @@ func TestHandleObjectError(t *testing.T) {
msg := "some msg"
t.Run("nil error", func(t *testing.T) {
err := handleObjectError(msg, nil)
err := handleStorageError(msg, nil)
require.Nil(t, err)
})
@ -27,7 +27,7 @@ func TestHandleObjectError(t *testing.T) {
inputErr := new(apistatus.ObjectAccessDenied)
inputErr.WriteReason(reason)
err := handleObjectError(msg, inputErr)
err := handleStorageError(msg, inputErr)
require.ErrorIs(t, err, handler.ErrAccessDenied)
require.Contains(t, err.Error(), reason)
require.Contains(t, err.Error(), msg)
@ -38,7 +38,7 @@ func TestHandleObjectError(t *testing.T) {
inputErr := new(apistatus.ObjectAccessDenied)
inputErr.WriteReason(reason)
err := handleObjectError(msg, inputErr)
err := handleStorageError(msg, inputErr)
require.ErrorIs(t, err, handler.ErrQuotaLimitReached)
require.Contains(t, err.Error(), reason)
require.Contains(t, err.Error(), msg)
@ -47,7 +47,7 @@ func TestHandleObjectError(t *testing.T) {
t.Run("simple timeout", func(t *testing.T) {
inputErr := errors.New("timeout")
err := handleObjectError(msg, inputErr)
err := handleStorageError(msg, inputErr)
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
require.Contains(t, err.Error(), inputErr.Error())
require.Contains(t, err.Error(), msg)
@ -58,7 +58,7 @@ func TestHandleObjectError(t *testing.T) {
defer cancel()
<-ctx.Done()
err := handleObjectError(msg, ctx.Err())
err := handleStorageError(msg, ctx.Err())
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
require.Contains(t, err.Error(), ctx.Err().Error())
require.Contains(t, err.Error(), msg)
@ -67,7 +67,7 @@ func TestHandleObjectError(t *testing.T) {
t.Run("grpc deadline exceeded", func(t *testing.T) {
inputErr := fmt.Errorf("wrap grpc error: %w", status.Error(codes.DeadlineExceeded, "error"))
err := handleObjectError(msg, inputErr)
err := handleStorageError(msg, inputErr)
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
require.Contains(t, err.Error(), inputErr.Error())
require.Contains(t, err.Error(), msg)
@ -76,7 +76,7 @@ func TestHandleObjectError(t *testing.T) {
t.Run("unknown error", func(t *testing.T) {
inputErr := errors.New("unknown error")
err := handleObjectError(msg, inputErr)
err := handleStorageError(msg, inputErr)
require.ErrorIs(t, err, inputErr)
require.Contains(t, err.Error(), msg)
})

View file

@ -64,7 +64,7 @@ func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([
nodes, err := w.p.GetNodes(qostagging.ContextWithIOTag(ctx, clientIOTag), poolPrm)
if err != nil {
return nil, handleError(err)
return nil, handleTreeError(err)
}
res := make([]tree.NodeResponse, len(nodes))
@ -83,7 +83,7 @@ func getBearer(ctx context.Context) []byte {
return token.Marshal()
}
func handleError(err error) error {
func handleTreeError(err error) error {
if err == nil {
return nil
}
@ -123,7 +123,7 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
subTreeReader, err := w.p.GetSubTree(qostagging.ContextWithIOTag(ctx, clientIOTag), poolPrm)
if err != nil {
return nil, handleError(err)
return nil, handleTreeError(err)
}
var subtree []tree.NodeResponse
@ -134,7 +134,7 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
node, err = subTreeReader.Next()
}
if err != io.EOF {
return nil, handleError(err)
return nil, handleTreeError(err)
}
return subtree, nil

View file

@ -8,14 +8,18 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type (
Tree struct {
service ServiceClient
log *zap.Logger
}
// ServiceClient is a client to interact with tree service.
@ -73,8 +77,8 @@ const (
)
// NewTree creates instance of Tree using provided address and create grpc connection.
func NewTree(service ServiceClient) *Tree {
return &Tree{service: service}
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{service: service, log: log}
}
type Meta interface {
@ -257,6 +261,9 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name
if len(nodes) == 0 {
return nil, layer.ErrNodeNotFound
}
if len(nodes) != 1 {
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemTreeNodes, zap.String("name", name), logs.TagField(logs.TagExternalStorageTree))
}
return newMultiNode(nodes)
}
@ -296,7 +303,7 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
}
if targetIndexNode == -1 {
return nil, layer.ErrNodeNotFound
return nil, fmt.Errorf("latest version: %w", layer.ErrNodeNotFound)
}
return nodes[targetIndexNode], nil
@ -423,6 +430,10 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr
return intermediateNodes, nil
}
func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
return utils.GetReqLogOrDefault(ctx, c.log)
}
func GetFilename(node NodeResponse) string {
for _, kv := range node.GetMeta() {
if kv.GetKey() == FileNameKey {