[#191] Refactor error handling and logging #221

Open
dkirillov wants to merge 2 commits from dkirillov/frostfs-http-gw:bugfix/191-correct-status-codes into master
17 changed files with 393 additions and 374 deletions

View file

@ -606,7 +606,7 @@ func (a *app) Serve() {
close(a.webDone) close(a.webDone)
}() }()
handle := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool) handle := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool), a.log), workerPool)
// Configure router. // Configure router.
a.configureRouter(handle) a.configureRouter(handle)
@ -734,7 +734,7 @@ func (a *app) configureRouter(h *handler.Handler) {
r := router.New() r := router.New()
r.RedirectTrailingSlash = true r.RedirectTrailingSlash = true
r.NotFound = func(r *fasthttp.RequestCtx) { r.NotFound = func(r *fasthttp.RequestCtx) {
handler.ResponseError(r, "Not found", fasthttp.StatusNotFound) handler.ResponseError(r, "Route Not found", fasthttp.StatusNotFound)
} }
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) { r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
handler.ResponseError(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed) handler.ResponseError(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)

View file

@ -20,9 +20,11 @@ import (
containerv2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" containerv2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -94,6 +96,7 @@ func TestIntegration(t *testing.T) {
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID) }) t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID) })
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID) }) t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID) })
t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID) }) t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID) })
t.Run("test status codes "+version, func(t *testing.T) { checkStatusCodes(ctx, t, clientPool, ownerID, version) })
cancel() cancel()
server.Wait() server.Wait()
@ -260,7 +263,7 @@ func putWithDuplicateKeys(t *testing.T, CID cid.ID) {
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "key duplication error: "+attr+"\n", string(body)) require.Contains(t, string(body), "key duplication error: "+attr+"\n")
require.Equal(t, http.StatusBadRequest, resp.StatusCode) require.Equal(t, http.StatusBadRequest, resp.StatusCode)
} }
@ -429,7 +432,80 @@ func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
resp, err = http.DefaultClient.Do(req) resp, err = http.DefaultClient.Do(req)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, http.StatusNotFound, resp.StatusCode) require.Equal(t, http.StatusNotFound, resp.StatusCode)
}
func checkStatusCodes(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, version string) {
cli := http.Client{Timeout: 30 * time.Second}
t.Run("container not found by name", func(t *testing.T) {
resp, err := cli.Get(testHost + "/get/unknown/object")
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, resp.StatusCode)
requireBodyContains(t, resp, "container not found")
})
t.Run("container not found by cid", func(t *testing.T) {
cnrIDTest := cidtest.ID()
resp, err := cli.Get(testHost + "/get/" + cnrIDTest.EncodeToString() + "/object")
require.NoError(t, err)
requireBodyContains(t, resp, "container not found")
require.Equal(t, http.StatusNotFound, resp.StatusCode)
})
t.Run("object not found in storage", func(t *testing.T) {
resp, err := cli.Get(testHost + "/get_by_attribute/" + testContainerName + "/FilePath/object2")
require.NoError(t, err)
requireBodyContains(t, resp, "object not found")
require.Equal(t, http.StatusNotFound, resp.StatusCode)
})
t.Run("access denied", func(t *testing.T) {
basicACL := acl.Private
var recs []*eacl.Record
if version == "1.2.7" {
basicACL = acl.PublicRWExtended
rec := eacl.NewRecord()
rec.SetAction(eacl.ActionDeny)
rec.SetOperation(eacl.OperationGet)
recs = append(recs, rec)
}
cnrID, err := createContainerBase(ctx, t, clientPool, ownerID, basicACL, "")
require.NoError(t, err)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
jsonToken, _ := makeBearerTokens(t, key, ownerID, version, recs...)
t.Run("get", func(t *testing.T) {
request, err := http.NewRequest(http.MethodGet, testHost+"/get/"+cnrID.EncodeToString()+"/object", nil)
require.NoError(t, err)
request.Header.Set("Authorization", "Bearer "+jsonToken)
resp, err := cli.Do(request)
require.NoError(t, err)
requireBodyContains(t, resp, "access denied")
require.Equal(t, http.StatusForbidden, resp.StatusCode)
})
t.Run("upload", func(t *testing.T) {
request, _, _ := makePutRequest(t, testHost+"/upload/"+cnrID.EncodeToString())
request.Header.Set("Authorization", "Bearer "+jsonToken)
resp, err := cli.Do(request)
require.NoError(t, err)
requireBodyContains(t, resp, "access denied")
require.Equal(t, http.StatusForbidden, resp.StatusCode)
})
})
}
func requireBodyContains(t *testing.T, resp *http.Response, msg string) {
data, err := io.ReadAll(resp.Body)
require.NoError(t, err)
defer resp.Body.Close()
require.Contains(t, strings.ToLower(string(data)), strings.ToLower(msg))
} }
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container { func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
@ -478,6 +554,10 @@ func getPool(ctx context.Context, t *testing.T, key *keys.PrivateKey) *pool.Pool
} }
func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID) (cid.ID, error) { func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID) (cid.ID, error) {
return createContainerBase(ctx, t, clientPool, ownerID, acl.PublicRWExtended, testContainerName)
}
func createContainerBase(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, basicACL acl.Basic, cnrName string) (cid.ID, error) {
var policy netmap.PlacementPolicy var policy netmap.PlacementPolicy
err := policy.DecodeString("REP 1") err := policy.DecodeString("REP 1")
require.NoError(t, err) require.NoError(t, err)
@ -485,24 +565,28 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
var cnr container.Container var cnr container.Container
cnr.Init() cnr.Init()
cnr.SetPlacementPolicy(policy) cnr.SetPlacementPolicy(policy)
cnr.SetBasicACL(acl.PublicRWExtended) cnr.SetBasicACL(basicACL)
cnr.SetOwner(ownerID) cnr.SetOwner(ownerID)
container.SetCreationTime(&cnr, time.Now()) container.SetCreationTime(&cnr, time.Now())
if cnrName != "" {
var domain container.Domain var domain container.Domain
domain.SetName(testContainerName) domain.SetName(cnrName)
cnr.SetAttribute(containerv2.SysAttributeName, domain.Name()) cnr.SetAttribute(containerv2.SysAttributeName, domain.Name())
cnr.SetAttribute(containerv2.SysAttributeZone, domain.Zone()) cnr.SetAttribute(containerv2.SysAttributeZone, domain.Zone())
}
var waitPrm pool.WaitParams prm := pool.PrmContainerPut{
waitPrm.SetTimeout(15 * time.Second) ClientParams: client.PrmContainerPut{
waitPrm.SetPollInterval(3 * time.Second) Container: &cnr,
},
var prm pool.PrmContainerPut WaitParams: &pool.WaitParams{
prm.SetContainer(cnr) Timeout: 15 * time.Second,
prm.SetWaitParams(waitPrm) PollInterval: 3 * time.Second,
},
}
CID, err := clientPool.PutContainer(ctx, prm) CID, err := clientPool.PutContainer(ctx, prm)
if err != nil { if err != nil {
@ -549,13 +633,18 @@ func registerUser(t *testing.T, ctx context.Context, aioContainer testcontainers
require.NoError(t, err) require.NoError(t, err)
} }
func makeBearerTokens(t *testing.T, key *keys.PrivateKey, ownerID user.ID, version string) (jsonTokenBase64, binaryTokenBase64 string) { func makeBearerTokens(t *testing.T, key *keys.PrivateKey, ownerID user.ID, version string, records ...*eacl.Record) (jsonTokenBase64, binaryTokenBase64 string) {
tkn := new(bearer.Token) tkn := new(bearer.Token)
tkn.ForUser(ownerID) tkn.ForUser(ownerID)
tkn.SetExp(10000) tkn.SetExp(10000)
if version == "1.2.7" { if version == "1.2.7" {
tkn.SetEACLTable(*eacl.NewTable()) table := eacl.NewTable()
for i := range records {
table.AddRecord(records[i])
}
tkn.SetEACLTable(*table)
} else { } else {
tkn.SetImpersonate(true) tkn.SetImpersonate(true)
} }

View file

@ -94,6 +94,8 @@ The `filename` field from the multipart form will be set as `FileName` attribute
|--------|----------------------------------------------| |--------|----------------------------------------------|
| 200 | Object created successfully. | | 200 | Object created successfully. |
| 400 | Some error occurred during object uploading. | | 400 | Some error occurred during object uploading. |
| 403 | Access denied. |
| 409 | Can not upload object due to quota reached. |
## Get object ## Get object
@ -141,6 +143,7 @@ Get an object (payload and attributes) by an address.
|--------|------------------------------------------------| |--------|------------------------------------------------|
| 200 | Object got successfully. | | 200 | Object got successfully. |
| 400 | Some error occurred during object downloading. | | 400 | Some error occurred during object downloading. |
| 403 | Access denied. |
| 404 | Container or object not found. | | 404 | Container or object not found. |
###### Body ###### Body
@ -183,6 +186,7 @@ Get an object attributes by an address.
|--------|---------------------------------------------------| |--------|---------------------------------------------------|
| 200 | Object head successfully. | | 200 | Object head successfully. |
| 400 | Some error occurred during object HEAD operation. | | 400 | Some error occurred during object HEAD operation. |
| 403 | Access denied. |
| 404 | Container or object not found. | | 404 | Container or object not found. |
## Search object ## Search object
@ -233,6 +237,7 @@ If more than one object is found, an arbitrary one will be returned.
|--------|------------------------------------------------| |--------|------------------------------------------------|
| 200 | Object got successfully. | | 200 | Object got successfully. |
| 400 | Some error occurred during object downloading. | | 400 | Some error occurred during object downloading. |
| 403 | Access denied. |
| 404 | Container or object not found. | | 404 | Container or object not found. |
#### HEAD #### HEAD
@ -269,6 +274,7 @@ If more than one object is found, an arbitrary one will be used to get attribute
|--------|---------------------------------------| |--------|---------------------------------------|
| 200 | Object head successfully. | | 200 | Object head successfully. |
| 400 | Some error occurred during operation. | | 400 | Some error occurred during operation. |
| 403 | Access denied. |
| 404 | Container or object not found. | | 404 | Container or object not found. |
## Download archive ## Download archive
@ -305,15 +311,16 @@ Archive can be compressed (see http-gw [configuration](gate-configuration.md#arc
###### Headers ###### Headers
| Header | Description | | Header | Description |
|-----------------------|-------------------------------------------------------------------------------------------------------------------| |-----------------------|---------------------------------------------------------------------------------------------|
| `Content-Disposition` | Indicate how to browsers should treat file (`attachment`). Set `filename` as `archive.zip`. | | `Content-Disposition` | Indicate how to browsers should treat file (`attachment`). Set `filename` as `archive.zip`. |
| `Content-Type` | Indicate content type of object. Set to `application/zip` | | `Content-Type` | Indicate content type of object. Set to `application/zip` |
###### Status codes ###### Status codes
| Status | Description | | Status | Description |
|--------|-----------------------------------------------------| |--------|------------------------------------------------|
| 200 | Object got successfully. | | 200 | Object got successfully. |
| 400 | Some error occurred during object downloading. | | 400 | Some error occurred during object downloading. |
| 403 | Access denied. |
| 404 | Container or objects not found. | | 404 | Container or objects not found. |
| 500 | Some inner error (e.g. error on streaming objects). | | 409 | Can not upload object due to quota reached. |
Review

We can't seem to get a status of 409 when downloading the archive.

We can't seem to get a status of 409 when downloading the archive.

View file

@ -273,7 +273,7 @@ func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs Re
}) })
if err != nil { if err != nil {
wg.Done() wg.Done()
log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err), logs.TagField(logs.TagDatapath)) log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err), logs.TagField(logs.TagDatapath))
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -328,20 +328,18 @@ type browseParams struct {
listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error) listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error)
} }
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) { func (h *Handler) browseObjects(ctx context.Context, req *fasthttp.RequestCtx, p browseParams) {
const S3Protocol = "s3" const S3Protocol = "s3"
const FrostfsProtocol = "frostfs" const FrostfsProtocol = "frostfs"
ctx := utils.GetContextFromRequest(c) ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(
zap.String("bucket", p.bucketInfo.Name), zap.String("bucket", p.bucketInfo.Name),
zap.String("container", p.bucketInfo.CID.EncodeToString()), zap.String("container", p.bucketInfo.CID.EncodeToString()),
zap.String("prefix", p.prefix), zap.String("prefix", p.prefix),
) ))
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix) resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToListObjects, err)
return return
} }
@ -360,7 +358,7 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
"parentDir": parentDir, "parentDir": parentDir,
}).Parse(h.config.IndexPageTemplate()) }).Parse(h.config.IndexPageTemplate())
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToParseTemplate, err)
return return
} }
bucketName := p.bucketInfo.Name bucketName := p.bucketInfo.Name
@ -369,14 +367,14 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
bucketName = p.bucketInfo.CID.EncodeToString() bucketName = p.bucketInfo.CID.EncodeToString()
protocol = FrostfsProtocol protocol = FrostfsProtocol
} }
if err = tmpl.Execute(c, &BrowsePageData{ if err = tmpl.Execute(req, &BrowsePageData{
Container: bucketName, Container: bucketName,
Prefix: p.prefix, Prefix: p.prefix,
Objects: objects, Objects: objects,
Protocol: protocol, Protocol: protocol,
HasErrors: resp.hasErrors, HasErrors: resp.hasErrors,
}); err != nil { }); err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToExecuteTemplate, err)
return return
} }
} }

View file

@ -25,43 +25,38 @@ import (
) )
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format. // DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) { func (h *Handler) DownloadByAddressOrBucketName(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadByAddressOrBucketName") ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadByAddressOrBucketName")
defer span.End() defer span.End()
utils.SetContextToRequest(ctx, c)
cidParam := c.UserValue("cid").(string) cidParam := req.UserValue("cid").(string)
oidParam := c.UserValue("oid").(string) oidParam := req.UserValue("oid").(string)
downloadParam := c.QueryArgs().GetBool("download") downloadParam := req.QueryArgs().GetBool("download")
log := utils.GetReqLogOrDefault(ctx, h.log).With( ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
zap.String("cid", cidParam), zap.String("cid", cidParam),
zap.String("oid", oidParam), zap.String("oid", oidParam),
) ))
bktInfo, err := h.getBucketInfo(ctx, cidParam, log) bktInfo, err := h.getBucketInfo(ctx, cidParam)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return return
} }
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo) checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) { if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
log.Error(logs.FailedToCheckIfSettingsNodeExist, zap.String("cid", bktInfo.CID.String()), h.logAndSendError(ctx, req, logs.FailedToCheckIfSettingsNodeExist, checkS3Err)
zap.Error(checkS3Err), logs.TagField(logs.TagExternalStorageTree))
logAndSendBucketError(c, log, checkS3Err)
return return
} }
req := newRequest(c, log)
var objID oid.ID var objID oid.ID
if checkS3Err == nil && shouldDownload(oidParam, downloadParam) { if checkS3Err == nil && shouldDownload(oidParam, downloadParam) {
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.receiveFile) h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.receiveFile)
} else if err = objID.DecodeString(oidParam); err == nil { } else if err = objID.DecodeString(oidParam); err == nil {
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.receiveFile) h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.receiveFile)
} else { } else {
h.browseIndex(c, checkS3Err != nil) h.browseIndex(ctx, req, cidParam, oidParam, checkS3Err != nil)
} }
} }
@ -70,12 +65,11 @@ func shouldDownload(oidParam string, downloadParam bool) bool {
} }
// DownloadByAttribute handles attribute-based download requests. // DownloadByAttribute handles attribute-based download requests.
func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) { func (h *Handler) DownloadByAttribute(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadByAttribute") ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadByAttribute")
defer span.End() defer span.End()
utils.SetContextToRequest(ctx, c)
h.byAttribute(c, h.receiveFile) h.byAttribute(ctx, req, h.receiveFile)
} }
func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) { func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
@ -95,31 +89,33 @@ func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op
} }
// DownloadZip handles zip by prefix requests. // DownloadZip handles zip by prefix requests.
func (h *Handler) DownloadZip(c *fasthttp.RequestCtx) { func (h *Handler) DownloadZip(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadZip") ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadZip")
defer span.End() defer span.End()
utils.SetContextToRequest(ctx, c)
scid, _ := c.UserValue("cid").(string) scid, _ := req.UserValue("cid").(string)
prefix, _ := req.UserValue("prefix").(string)
log := utils.GetReqLogOrDefault(ctx, h.log) ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", scid), zap.String("prefix", prefix)))
bktInfo, err := h.getBucketInfo(ctx, scid, log)
bktInfo, err := h.getBucketInfo(ctx, scid)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return return
} }
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
resSearch, err := h.searchObjectsByPrefix(ctx, bktInfo.CID, prefix)
if err != nil { if err != nil {
return return
} }
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip") req.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"") req.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
c.SetBodyStreamWriter(h.getZipResponseWriter(ctx, log, resSearch, bktInfo)) req.SetBodyStreamWriter(h.getZipResponseWriter(ctx, resSearch, bktInfo))
} }
func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) { func (h *Handler) getZipResponseWriter(ctx context.Context, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
return func(w *bufio.Writer) { return func(w *bufio.Writer) {
defer resSearch.Close() defer resSearch.Close()
@ -127,20 +123,20 @@ func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, res
zipWriter := zip.NewWriter(w) zipWriter := zip.NewWriter(w)
var objectsWritten int var objectsWritten int
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf, errIter := resSearch.Iterate(h.putObjectToArchive(ctx, bktInfo.CID, buf,
func(obj *object.Object) (io.Writer, error) { func(obj *object.Object) (io.Writer, error) {
objectsWritten++ objectsWritten++
return h.createZipFile(zipWriter, obj) return h.createZipFile(zipWriter, obj)
}), }),
) )
if errIter != nil { if errIter != nil {
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath)) h.reqLogger(ctx).Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
return return
} else if objectsWritten == 0 { } else if objectsWritten == 0 {
log.Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath)) h.reqLogger(ctx).Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
} }
if err := zipWriter.Close(); err != nil { if err := zipWriter.Close(); err != nil {
log.Error(logs.CloseZipWriter, zap.Error(err), logs.TagField(logs.TagDatapath)) h.reqLogger(ctx).Error(logs.CloseZipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
} }
} }
} }
@ -164,31 +160,33 @@ func (h *Handler) createZipFile(zw *zip.Writer, obj *object.Object) (io.Writer,
} }
// DownloadTar forms tar.gz from objects by prefix. // DownloadTar forms tar.gz from objects by prefix.
func (h *Handler) DownloadTar(c *fasthttp.RequestCtx) { func (h *Handler) DownloadTar(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadTar") ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadTar")
defer span.End() defer span.End()
utils.SetContextToRequest(ctx, c)
scid, _ := c.UserValue("cid").(string) scid, _ := req.UserValue("cid").(string)
prefix, _ := req.UserValue("prefix").(string)
log := utils.GetReqLogOrDefault(ctx, h.log) ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", scid), zap.String("prefix", prefix)))
bktInfo, err := h.getBucketInfo(ctx, scid, log)
bktInfo, err := h.getBucketInfo(ctx, scid)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return return
} }
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
resSearch, err := h.searchObjectsByPrefix(ctx, bktInfo.CID, prefix)
if err != nil { if err != nil {
return return
} }
c.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip") req.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip")
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"") req.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"")
c.SetBodyStreamWriter(h.getTarResponseWriter(ctx, log, resSearch, bktInfo)) req.SetBodyStreamWriter(h.getTarResponseWriter(ctx, resSearch, bktInfo))
} }
func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) { func (h *Handler) getTarResponseWriter(ctx context.Context, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
return func(w *bufio.Writer) { return func(w *bufio.Writer) {
defer resSearch.Close() defer resSearch.Close()
@ -203,26 +201,26 @@ func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, res
defer func() { defer func() {
if err := tarWriter.Close(); err != nil { if err := tarWriter.Close(); err != nil {
log.Error(logs.CloseTarWriter, zap.Error(err), logs.TagField(logs.TagDatapath)) h.reqLogger(ctx).Error(logs.CloseTarWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
} }
if err := gzipWriter.Close(); err != nil { if err := gzipWriter.Close(); err != nil {
log.Error(logs.CloseGzipWriter, zap.Error(err), logs.TagField(logs.TagDatapath)) h.reqLogger(ctx).Error(logs.CloseGzipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
} }
}() }()
var objectsWritten int var objectsWritten int
buf := make([]byte, 3<<20) // the same as for upload buf := make([]byte, 3<<20) // the same as for upload
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf, errIter := resSearch.Iterate(h.putObjectToArchive(ctx, bktInfo.CID, buf,
func(obj *object.Object) (io.Writer, error) { func(obj *object.Object) (io.Writer, error) {
objectsWritten++ objectsWritten++
return h.createTarFile(tarWriter, obj) return h.createTarFile(tarWriter, obj)
}), }),
) )
if errIter != nil { if errIter != nil {
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath)) h.reqLogger(ctx).Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
} else if objectsWritten == 0 { } else if objectsWritten == 0 {
log.Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath)) h.reqLogger(ctx).Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
} }
} }
} }
@ -240,9 +238,9 @@ func (h *Handler) createTarFile(tw *tar.Writer, obj *object.Object) (io.Writer,
}) })
} }
func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool { func (h *Handler) putObjectToArchive(ctx context.Context, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool {
return func(id oid.ID) bool { return func(id oid.ID) bool {
log = log.With(zap.String("oid", id.EncodeToString())) logger := h.reqLogger(ctx).With(zap.String("oid", id.EncodeToString()))
prm := PrmObjectGet{ prm := PrmObjectGet{
PrmAuth: PrmAuth{ PrmAuth: PrmAuth{
@ -253,18 +251,18 @@ func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID
resGet, err := h.frostfs.GetObject(ctx, prm) resGet, err := h.frostfs.GetObject(ctx, prm)
if err != nil { if err != nil {
log.Error(logs.FailedToGetObject, zap.Error(err), logs.TagField(logs.TagExternalStorage)) logger.Error(logs.FailedToGetObject, zap.Error(err), logs.TagField(logs.TagExternalStorage))
return false return false
} }
fileWriter, err := createArchiveHeader(&resGet.Header) fileWriter, err := createArchiveHeader(&resGet.Header)
if err != nil { if err != nil {
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath)) logger.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
return false return false
} }
if err = writeToArchive(resGet, fileWriter, buf); err != nil { if err = writeToArchive(resGet, fileWriter, buf); err != nil {
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath)) logger.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
return false return false
} }
@ -272,28 +270,17 @@ func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID
} }
} }
func (h *Handler) searchObjectsByPrefix(c *fasthttp.RequestCtx, log *zap.Logger, cnrID cid.ID) (ResObjectSearch, error) { func (h *Handler) searchObjectsByPrefix(ctx context.Context, cnrID cid.ID, prefix string) (ResObjectSearch, error) {
scid, _ := c.UserValue("cid").(string)
prefix, _ := c.UserValue("prefix").(string)
ctx := utils.GetContextFromRequest(c)
prefix, err := url.QueryUnescape(prefix) prefix, err := url.QueryUnescape(prefix)
if err != nil { if err != nil {
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), return nil, fmt.Errorf("unescape prefix: %w", err)
zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
return nil, err
} }
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil { if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage)) return nil, fmt.Errorf("search objects by prefix: %w", err)
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
return nil, err
} }
return resSearch, nil return resSearch, nil
} }

View file

@ -16,7 +16,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -142,6 +141,10 @@ var (
ErrGatewayTimeout = errors.New("gateway timeout") ErrGatewayTimeout = errors.New("gateway timeout")
// ErrQuotaLimitReached is returned from FrostFS in case of quota exceeded. // ErrQuotaLimitReached is returned from FrostFS in case of quota exceeded.
ErrQuotaLimitReached = errors.New("quota limit reached") ErrQuotaLimitReached = errors.New("quota limit reached")
// ErrContainerNotFound is returned from FrostFS in case of container was not found.
ErrContainerNotFound = errors.New("container not found")
// ErrObjectNotFound is returned from FrostFS in case of object was not found.
ErrObjectNotFound = errors.New("object not found")
) )
// FrostFS represents virtual connection to FrostFS network. // FrostFS represents virtual connection to FrostFS network.
@ -195,7 +198,7 @@ func New(params *AppParams, config Config, tree layer.TreeService, workerPool *a
// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it. // prepares request and object address to it.
func (h *Handler) byNativeAddress(ctx context.Context, req request, cnrID cid.ID, objID oid.ID, handler func(context.Context, request, oid.Address)) { func (h *Handler) byNativeAddress(ctx context.Context, req *fasthttp.RequestCtx, cnrID cid.ID, objID oid.ID, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
ctx, span := tracing.StartSpanFromContext(ctx, "handler.byNativeAddress") ctx, span := tracing.StartSpanFromContext(ctx, "handler.byNativeAddress")
defer span.End() defer span.End()
@ -205,72 +208,59 @@ func (h *Handler) byNativeAddress(ctx context.Context, req request, cnrID cid.ID
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that // byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// resolves object address from S3-like path <bucket name>/<object key>. // resolves object address from S3-like path <bucket name>/<object key>.
func (h *Handler) byS3Path(ctx context.Context, req request, cnrID cid.ID, path string, handler func(context.Context, request, oid.Address)) { func (h *Handler) byS3Path(ctx context.Context, req *fasthttp.RequestCtx, cnrID cid.ID, path string, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
ctx, span := tracing.StartSpanFromContext(ctx, "handler.byS3Path") ctx, span := tracing.StartSpanFromContext(ctx, "handler.byS3Path")
defer span.End() defer span.End()
c, log := req.RequestCtx, req.log
foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path) foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path)
if err != nil { if err != nil {
log.Error(logs.FailedToGetLatestVersionOfObject, zap.Error(err), zap.String("cid", cnrID.String()), h.logAndSendError(ctx, req, logs.FailedToGetLatestVersionOfObject, err, zap.String("path", path))
zap.String("path", path), logs.TagField(logs.TagExternalStorageTree))
logAndSendBucketError(c, log, err)
return return
} }
if foundOID.IsDeleteMarker { if foundOID.IsDeleteMarker {
log.Error(logs.ObjectWasDeleted, logs.TagField(logs.TagExternalStorageTree)) h.logAndSendError(ctx, req, logs.ObjectWasDeleted, ErrObjectNotFound)
ResponseError(c, "object deleted", fasthttp.StatusNotFound)
return return
} }
addr := newAddress(cnrID, foundOID.OID) addr := newAddress(cnrID, foundOID.OID)
handler(ctx, newRequest(c, log), addr) handler(ctx, req, addr)
} }
// byAttribute is a wrapper similar to byNativeAddress. // byAttribute is a wrapper similar to byNativeAddress.
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Context, request, oid.Address)) { func (h *Handler) byAttribute(ctx context.Context, req *fasthttp.RequestCtx, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
cidParam, _ := c.UserValue("cid").(string) cidParam, _ := req.UserValue("cid").(string)
key, _ := c.UserValue("attr_key").(string) key, _ := req.UserValue("attr_key").(string)
val, _ := c.UserValue("attr_val").(string) val, _ := req.UserValue("attr_val").(string)
ctx := utils.GetContextFromRequest(c)
log := utils.GetReqLogOrDefault(ctx, h.log)
key, err := url.QueryUnescape(key) key, err := url.QueryUnescape(key)
if err != nil { if err != nil {
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_key", key), h.logAndSendError(ctx, req, logs.FailedToUnescapeQuery, err, zap.String("cid", cidParam), zap.String("attr_key", key))
zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
val, err = url.QueryUnescape(val) val, err = url.QueryUnescape(val)
if err != nil { if err != nil {
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_val", val), h.logAndSendError(ctx, req, logs.FailedToUnescapeQuery, err, zap.String("cid", cidParam), zap.String("attr_val", key))
zap.Error(err), logs.TagField(logs.TagDatapath))
ResponseError(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
val = prepareAtribute(key, val) val = prepareAtribute(key, val)
log = log.With(zap.String("cid", cidParam), zap.String("attr_key", key), zap.String("attr_val", val)) ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", cidParam),
zap.String("attr_key", key), zap.String("attr_val", val)))
bktInfo, err := h.getBucketInfo(ctx, cidParam, log) bktInfo, err := h.getBucketInfo(ctx, cidParam)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return return
} }
objID, err := h.findObjectByAttribute(ctx, log, bktInfo.CID, key, val) objID, err := h.findObjectByAttribute(ctx, bktInfo.CID, key, val)
if err != nil { if err != nil {
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
ResponseError(c, err.Error(), fasthttp.StatusNotFound) err = fmt.Errorf("%w: %s", ErrObjectNotFound, err.Error())
return
} }
h.logAndSendError(ctx, req, logs.FailedToFindObjectByAttribute, err)
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
return return
} }
@ -278,14 +268,13 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte
addr.SetContainer(bktInfo.CID) addr.SetContainer(bktInfo.CID)
addr.SetObject(objID) addr.SetObject(objID)
handler(ctx, newRequest(c, log), addr) handler(ctx, req, addr)
} }
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) { func (h *Handler) findObjectByAttribute(ctx context.Context, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual) res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual)
if err != nil { if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage)) return oid.ID{}, fmt.Errorf("search objects: %w", err)
return oid.ID{}, fmt.Errorf("could not search for objects: %w", err)
} }
defer res.Close() defer res.Close()
@ -295,14 +284,14 @@ func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cn
if n == 0 { if n == 0 {
switch { switch {
case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal): case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal):
log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage)) h.reqLogger(ctx).Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage))
attrVal = prepareAtribute(attrFileName, attrVal) attrVal = prepareAtribute(attrFileName, attrVal)
return h.findObjectByAttribute(ctx, log, cnrID, attrFileName, attrVal) return h.findObjectByAttribute(ctx, cnrID, attrFileName, attrVal)
case errors.Is(err, io.EOF): case errors.Is(err, io.EOF):
log.Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage)) h.reqLogger(ctx).Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage))
return oid.ID{}, fmt.Errorf("object not found: %w", err) return oid.ID{}, fmt.Errorf("object not found: %w", err)
default: default:
log.Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage)) h.reqLogger(ctx).Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage))
return oid.ID{}, fmt.Errorf("read object list failed: %w", err) return oid.ID{}, fmt.Errorf("read object list failed: %w", err)
} }
} }
@ -354,13 +343,13 @@ func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*ci
if err != nil { if err != nil {
cnrID, err = h.containerResolver.Resolve(ctx, containerID) cnrID, err = h.containerResolver.Resolve(ctx, containerID)
if err != nil && strings.Contains(err.Error(), "not found") { if err != nil && strings.Contains(err.Error(), "not found") {
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error()) err = fmt.Errorf("%w: %s", ErrContainerNotFound, err.Error())
} }
} }
return cnrID, err return cnrID, err
} }
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) { func (h *Handler) getBucketInfo(ctx context.Context, containerName string) (*data.BucketInfo, error) {
ns, err := middleware.GetNamespace(ctx) ns, err := middleware.GetNamespace(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -372,21 +361,16 @@ func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *
cnrID, err := h.resolveContainer(ctx, containerName) cnrID, err := h.resolveContainer(ctx, containerName)
if err != nil { if err != nil {
log.Error(logs.CouldNotResolveContainerID, zap.Error(err), zap.String("cnrName", containerName), return nil, fmt.Errorf("resolve container: %w", err)
logs.TagField(logs.TagDatapath))
return nil, err
} }
bktInfo, err := h.readContainer(ctx, *cnrID) bktInfo, err := h.readContainer(ctx, *cnrID)
if err != nil { if err != nil {
log.Error(logs.CouldNotGetContainerInfo, zap.Error(err), zap.String("cnrName", containerName), return nil, fmt.Errorf("read container: %w", err)
zap.String("cnrName", cnrID.String()),
logs.TagField(logs.TagExternalStorage))
return nil, err
} }
if err = h.cache.Put(bktInfo); err != nil { if err = h.cache.Put(bktInfo); err != nil {
log.Warn(logs.CouldntPutBucketIntoCache, h.reqLogger(ctx).Warn(logs.CouldntPutBucketIntoCache,
zap.String("bucket name", bktInfo.Name), zap.String("bucket name", bktInfo.Name),
zap.Stringer("bucket cid", bktInfo.CID), zap.Stringer("bucket cid", bktInfo.CID),
zap.Error(err), zap.Error(err),
@ -419,31 +403,24 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
return bktInfo, err return bktInfo, err
} }
func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) { func (h *Handler) browseIndex(ctx context.Context, req *fasthttp.RequestCtx, cidParam, oidParam string, isNativeList bool) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.browseIndex") ctx, span := tracing.StartSpanFromContext(ctx, "handler.browseIndex")
defer span.End() defer span.End()
utils.SetContextToRequest(ctx, c)
if !h.config.IndexPageEnabled() { if !h.config.IndexPageEnabled() {
c.SetStatusCode(fasthttp.StatusNotFound) req.SetStatusCode(fasthttp.StatusNotFound)
return return
} }
cidURLParam := c.UserValue("cid").(string) unescapedKey, err := url.QueryUnescape(oidParam)
oidURLParam := c.UserValue("oid").(string)
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(zap.String("cid", cidURLParam), zap.String("oid", oidURLParam))
unescapedKey, err := url.QueryUnescape(oidURLParam)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToUnescapeOIDParam, err)
return return
} }
bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log) bktInfo, err := h.getBucketInfo(ctx, cidParam)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return return
} }
@ -453,7 +430,7 @@ func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) {
listFunc = h.getDirObjectsNative listFunc = h.getDirObjectsNative
} }
h.browseObjects(c, browseParams{ h.browseObjects(ctx, req, browseParams{
bucketInfo: bktInfo, bucketInfo: bktInfo,
prefix: unescapedKey, prefix: unescapedKey,
listObjects: listFunc, listObjects: listFunc,

View file

@ -374,7 +374,7 @@ func TestFindObjectByAttribute(t *testing.T) {
obj.SetAttributes(tc.firstAttr, tc.secondAttr) obj.SetAttributes(tc.firstAttr, tc.secondAttr)
hc.cfg.additionalSearch = tc.additionalSearch hc.cfg.additionalSearch = tc.additionalSearch
objID, err := hc.Handler().findObjectByAttribute(ctx, hc.Handler().log, cnrID, tc.reqAttrKey, tc.reqAttrValue) objID, err := hc.Handler().findObjectByAttribute(ctx, cnrID, tc.reqAttrKey, tc.reqAttrValue)
if tc.err != "" { if tc.err != "" {
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), tc.err) require.Contains(t, err.Error(), tc.err)

View file

@ -27,7 +27,7 @@ const (
hdrContainerID = "X-Container-Id" hdrContainerID = "X-Container-Id"
) )
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) { func (h *Handler) headObject(ctx context.Context, req *fasthttp.RequestCtx, objectAddress oid.Address) {
var start = time.Now() var start = time.Now()
btoken := bearerToken(ctx) btoken := bearerToken(ctx)
@ -41,7 +41,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
obj, err := h.frostfs.HeadObject(ctx, prm) obj, err := h.frostfs.HeadObject(ctx, prm)
if err != nil { if err != nil {
req.handleFrostFSErr(err, start) h.logAndSendError(ctx, req, logs.FailedToHeadObject, err, zap.Stringer("elapsed", time.Since(start)))
return return
} }
@ -65,7 +65,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
case object.AttributeTimestamp: case object.AttributeTimestamp:
value, err := strconv.ParseInt(val, 10, 64) value, err := strconv.ParseInt(val, 10, 64)
if err != nil { if err != nil {
req.log.Info(logs.CouldntParseCreationDate, h.reqLogger(ctx).Info(logs.CouldntParseCreationDate,
zap.String("key", key), zap.String("key", key),
zap.String("val", val), zap.String("val", val),
zap.Error(err), zap.Error(err),
@ -100,7 +100,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
return h.frostfs.RangeObject(ctx, prmRange) return h.frostfs.RangeObject(ctx, prmRange)
}, filename) }, filename)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
req.handleFrostFSErr(err, start) h.logAndSendError(ctx, req, logs.FailedToDetectContentTypeFromPayload, err, zap.Stringer("elapsed", time.Since(start)))
return return
} }
} }
@ -116,40 +116,37 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
} }
// HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format. // HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format.
func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) { func (h *Handler) HeadByAddressOrBucketName(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.HeadByAddressOrBucketName") ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.HeadByAddressOrBucketName")
defer span.End() defer span.End()
cidParam, _ := c.UserValue("cid").(string) cidParam, _ := req.UserValue("cid").(string)
oidParam, _ := c.UserValue("oid").(string) oidParam, _ := req.UserValue("oid").(string)
log := utils.GetReqLogOrDefault(ctx, h.log).With( ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
zap.String("cid", cidParam), zap.String("cid", cidParam),
zap.String("oid", oidParam), zap.String("oid", oidParam),
) ))
bktInfo, err := h.getBucketInfo(ctx, cidParam, log) bktInfo, err := h.getBucketInfo(ctx, cidParam)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return return
} }
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo) checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) { if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
log.Error(logs.FailedToCheckIfSettingsNodeExist, zap.String("cid", bktInfo.CID.String()), h.logAndSendError(ctx, req, logs.FailedToCheckIfSettingsNodeExist, checkS3Err)
zap.Error(checkS3Err), logs.TagField(logs.TagExternalStorageTree))
logAndSendBucketError(c, log, checkS3Err)
return return
} }
req := newRequest(c, log)
var objID oid.ID var objID oid.ID
if checkS3Err == nil { if checkS3Err == nil {
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.headObject) h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.headObject)
} else if err = objID.DecodeString(oidParam); err == nil { } else if err = objID.DecodeString(oidParam); err == nil {
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.headObject) h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.headObject)
} else { } else {
logAndSendBucketError(c, log, checkS3Err) h.logAndSendError(ctx, req, logs.InvalidOIDParam, err)
} }
} }
@ -157,7 +154,6 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
func (h *Handler) HeadByAttribute(c *fasthttp.RequestCtx) { func (h *Handler) HeadByAttribute(c *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.HeadByAttribute") ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.HeadByAttribute")
defer span.End() defer span.End()
utils.SetContextToRequest(ctx, c)
h.byAttribute(c, h.headObject) h.byAttribute(ctx, c, h.headObject)
} }

View file

@ -1,6 +1,7 @@
package handler package handler
import ( import (
"context"
"errors" "errors"
"io" "io"
"strconv" "strconv"
@ -53,7 +54,7 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
} }
// getPayload returns initial payload if object is not multipart else composes new reader with parts data. // getPayload returns initial payload if object is not multipart else composes new reader with parts data.
func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) { func (h *Handler) getPayload(ctx context.Context, p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
cid, ok := p.obj.Header.ContainerID() cid, ok := p.obj.Header.ContainerID()
if !ok { if !ok {
return nil, 0, errors.New("no container id set") return nil, 0, errors.New("no container id set")
@ -66,7 +67,6 @@ func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64,
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
ctx := p.req.RequestCtx
params := PrmInitMultiObjectReader{ params := PrmInitMultiObjectReader{
Addr: newAddress(cid, oid), Addr: newAddress(cid, oid),
Bearer: bearerToken(ctx), Bearer: bearerToken(ctx),

View file

@ -63,11 +63,10 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error), file
type getMultiobjectBodyParams struct { type getMultiobjectBodyParams struct {
obj *Object obj *Object
req request
strSize string strSize string
} }
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address) { func (h *Handler) receiveFile(ctx context.Context, req *fasthttp.RequestCtx, objAddress oid.Address) {
var ( var (
shouldDownload = req.QueryArgs().GetBool("download") shouldDownload = req.QueryArgs().GetBool("download")
start = time.Now() start = time.Now()
@ -85,12 +84,12 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
rObj, err := h.frostfs.GetObject(ctx, prm) rObj, err := h.frostfs.GetObject(ctx, prm)
if err != nil { if err != nil {
req.handleFrostFSErr(err, start) h.logAndSendError(ctx, req, logs.FailedToGetObject, err, zap.Stringer("elapsed", time.Since(start)))
return return
} }
// we can't close reader in this function, so how to do it? // we can't close reader in this function, so how to do it?
req.setIDs(rObj.Header) setIDs(req, rObj.Header)
payload := rObj.Payload payload := rObj.Payload
payloadSize := rObj.Header.PayloadSize() payloadSize := rObj.Header.PayloadSize()
for _, attr := range rObj.Header.Attributes() { for _, attr := range rObj.Header.Attributes() {
@ -107,8 +106,8 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
case object.AttributeFileName: case object.AttributeFileName:
filename = val filename = val
case object.AttributeTimestamp: case object.AttributeTimestamp:
if err = req.setTimestamp(val); err != nil { if err = setTimestamp(req, val); err != nil {
req.log.Error(logs.CouldntParseCreationDate, h.reqLogger(ctx).Error(logs.CouldntParseCreationDate,
zap.String("val", val), zap.String("val", val),
zap.Error(err), zap.Error(err),
logs.TagField(logs.TagDatapath)) logs.TagField(logs.TagDatapath))
@ -118,13 +117,12 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
case object.AttributeFilePath: case object.AttributeFilePath:
filepath = val filepath = val
case attributeMultipartObjectSize: case attributeMultipartObjectSize:
payload, payloadSize, err = h.getPayload(getMultiobjectBodyParams{ payload, payloadSize, err = h.getPayload(ctx, getMultiobjectBodyParams{
obj: rObj, obj: rObj,
req: req,
strSize: val, strSize: val,
}) })
if err != nil { if err != nil {
req.handleFrostFSErr(err, start) h.logAndSendError(ctx, req, logs.FailedToGetObjectPayload, err, zap.Stringer("elapsed", time.Since(start)))
return return
} }
} }
@ -133,7 +131,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
filename = filepath filename = filepath
} }
req.setDisposition(shouldDownload, filename) setDisposition(req, shouldDownload, filename)
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
@ -145,8 +143,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
return payload, nil return payload, nil
}, filename) }, filename)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err), logs.TagField(logs.TagDatapath)) h.logAndSendError(ctx, req, logs.FailedToDetectContentTypeFromPayload, err, zap.Stringer("elapsed", time.Since(start)))
ResponseError(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
@ -165,7 +162,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
req.Response.SetBodyStream(payload, int(payloadSize)) req.Response.SetBodyStream(payload, int(payloadSize))
} }
func (r *request) setIDs(obj object.Object) { func setIDs(r *fasthttp.RequestCtx, obj object.Object) {
objID, _ := obj.ID() objID, _ := obj.ID()
cnrID, _ := obj.ContainerID() cnrID, _ := obj.ContainerID()
r.Response.Header.Set(hdrObjectID, objID.String()) r.Response.Header.Set(hdrObjectID, objID.String())
@ -173,7 +170,7 @@ func (r *request) setIDs(obj object.Object) {
r.Response.Header.Set(hdrContainerID, cnrID.String()) r.Response.Header.Set(hdrContainerID, cnrID.String())
} }
func (r *request) setDisposition(shouldDownload bool, filename string) { func setDisposition(r *fasthttp.RequestCtx, shouldDownload bool, filename string) {
const ( const (
inlineDisposition = "inline" inlineDisposition = "inline"
attachmentDisposition = "attachment" attachmentDisposition = "attachment"
@ -187,7 +184,7 @@ func (r *request) setDisposition(shouldDownload bool, filename string) {
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
} }
func (r *request) setTimestamp(timestamp string) error { func setTimestamp(r *fasthttp.RequestCtx, timestamp string) error {
value, err := strconv.ParseInt(timestamp, 10, 64) value, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil { if err != nil {
return err return err

View file

@ -50,44 +50,41 @@ func (pr *putResponse) encode(w io.Writer) error {
} }
// Upload handles multipart upload request. // Upload handles multipart upload request.
func (h *Handler) Upload(c *fasthttp.RequestCtx) { func (h *Handler) Upload(req *fasthttp.RequestCtx) {
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.Upload") ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.Upload")
defer span.End() defer span.End()
utils.SetContextToRequest(ctx, c)
var file MultipartFile var file MultipartFile
scid, _ := c.UserValue("cid").(string) scid, _ := req.UserValue("cid").(string)
bodyStream := c.RequestBodyStream() bodyStream := req.RequestBodyStream()
drainBuf := make([]byte, drainBufSize) drainBuf := make([]byte, drainBufSize)
reqLog := utils.GetReqLogOrDefault(ctx, h.log) log := h.reqLogger(ctx)
log := reqLog.With(zap.String("cid", scid)) ctx = utils.SetReqLog(ctx, log.With(zap.String("cid", scid)))
bktInfo, err := h.getBucketInfo(ctx, scid, log) bktInfo, err := h.getBucketInfo(ctx, scid)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
return return
} }
boundary := string(c.Request.Header.MultipartFormBoundary()) boundary := string(req.Request.Header.MultipartFormBoundary())
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil { if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err), logs.TagField(logs.TagDatapath)) h.logAndSendError(ctx, req, logs.CouldNotReceiveMultipartForm, err)
ResponseError(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
filtered, err := filterHeaders(log, &c.Request.Header) filtered, err := filterHeaders(log, &req.Request.Header)
if err != nil { if err != nil {
log.Error(logs.FailedToFilterHeaders, zap.Error(err), logs.TagField(logs.TagDatapath)) h.logAndSendError(ctx, req, logs.FailedToFilterHeaders, err)
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
return return
} }
if c.Request.Header.Peek(explodeArchiveHeader) != nil { if req.Request.Header.Peek(explodeArchiveHeader) != nil {
h.explodeArchive(request{c, log}, bktInfo, file, filtered) h.explodeArchive(ctx, req, bktInfo, file, filtered)
} else { } else {
h.uploadSingleObject(request{c, log}, bktInfo, file, filtered) h.uploadSingleObject(ctx, req, bktInfo, file, filtered)
} }
// Multipart is multipart and thus can contain more than one part which // Multipart is multipart and thus can contain more than one part which
@ -104,46 +101,39 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
} }
} }
func (h *Handler) uploadSingleObject(req request, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) { func (h *Handler) uploadSingleObject(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
c, log := req.RequestCtx, req.log ctx, span := tracing.StartSpanFromContext(ctx, "handler.uploadSingleObject")
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.uploadSingleObject")
defer span.End() defer span.End()
utils.SetContextToRequest(ctx, c)
setIfNotExist(filtered, object.AttributeFileName, file.FileName()) setIfNotExist(filtered, object.AttributeFileName, file.FileName())
attributes, err := h.extractAttributes(c, log, filtered) attributes, err := h.extractAttributes(ctx, req, filtered)
if err != nil { if err != nil {
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath)) h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err)
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
idObj, err := h.uploadObject(c, bkt, attributes, file) idObj, err := h.uploadObject(ctx, bkt, attributes, file)
if err != nil { if err != nil {
h.handlePutFrostFSErr(c, err, log) h.logAndSendError(ctx, req, logs.FailedToUploadObject, err)
return return
} }
log.Debug(logs.ObjectUploaded, h.reqLogger(ctx).Debug(logs.ObjectUploaded,
zap.String("oid", idObj.EncodeToString()), zap.String("oid", idObj.EncodeToString()),
zap.String("FileName", file.FileName()), zap.String("FileName", file.FileName()),
logs.TagField(logs.TagExternalStorage), logs.TagField(logs.TagExternalStorage),
) )
addr := newAddress(bkt.CID, idObj) addr := newAddress(bkt.CID, idObj)
c.Response.Header.SetContentType(jsonHeader) req.Response.Header.SetContentType(jsonHeader)
// Try to return the response, otherwise, if something went wrong, throw an error. // Try to return the response, otherwise, if something went wrong, throw an error.
if err = newPutResponse(addr).encode(c); err != nil { if err = newPutResponse(addr).encode(req); err != nil {
log.Error(logs.CouldNotEncodeResponse, zap.Error(err), logs.TagField(logs.TagDatapath)) h.logAndSendError(ctx, req, logs.CouldNotEncodeResponse, err)
ResponseError(c, "could not encode response", fasthttp.StatusBadRequest)
return return
} }
} }
func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) { func (h *Handler) uploadObject(ctx context.Context, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
ctx := utils.GetContextFromRequest(c)
obj := object.New() obj := object.New()
obj.SetContainerID(bkt.CID) obj.SetContainerID(bkt.CID)
obj.SetOwnerID(*h.ownerID) obj.SetOwnerID(*h.ownerID)
@ -168,19 +158,18 @@ func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, att
return idObj, nil return idObj, nil
} }
func (h *Handler) extractAttributes(c *fasthttp.RequestCtx, log *zap.Logger, filtered map[string]string) ([]object.Attribute, error) { func (h *Handler) extractAttributes(ctx context.Context, req *fasthttp.RequestCtx, filtered map[string]string) ([]object.Attribute, error) {
ctx := utils.GetContextFromRequest(c)
now := time.Now() now := time.Now()
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil { if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil { if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err), h.reqLogger(ctx).Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err),
logs.TagField(logs.TagDatapath)) logs.TagField(logs.TagDatapath))
} else { } else {
now = parsed now = parsed
} }
} }
if err := utils.PrepareExpirationHeader(ctx, h.frostfs, filtered, now); err != nil { if err := utils.PrepareExpirationHeader(ctx, h.frostfs, filtered, now); err != nil {
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath)) h.reqLogger(ctx).Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath))
return nil, err return nil, err
} }
attributes := make([]object.Attribute, 0, len(filtered)) attributes := make([]object.Attribute, 0, len(filtered))
@ -207,38 +196,33 @@ func newAttribute(key string, val string) object.Attribute {
// explodeArchive read files from archive and creates objects for each of them. // explodeArchive read files from archive and creates objects for each of them.
// Sets FilePath attribute with name from tar.Header. // Sets FilePath attribute with name from tar.Header.
func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) { func (h *Handler) explodeArchive(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
c, log := req.RequestCtx, req.log ctx, span := tracing.StartSpanFromContext(ctx, "handler.explodeArchive")
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.explodeArchive")
defer span.End() defer span.End()
utils.SetContextToRequest(ctx, c)
// remove user attributes which vary for each file in archive // remove user attributes which vary for each file in archive
// to guarantee that they won't appear twice // to guarantee that they won't appear twice
delete(filtered, object.AttributeFileName) delete(filtered, object.AttributeFileName)
delete(filtered, object.AttributeFilePath) delete(filtered, object.AttributeFilePath)
commonAttributes, err := h.extractAttributes(c, log, filtered) commonAttributes, err := h.extractAttributes(ctx, req, filtered)
if err != nil { if err != nil {
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath)) h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err)
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
attributes := commonAttributes attributes := commonAttributes
reader := file reader := file
if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) { if bytes.EqualFold(req.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
log.Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath)) h.reqLogger(ctx).Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath))
gzipReader, err := gzip.NewReader(file) gzipReader, err := gzip.NewReader(file)
if err != nil { if err != nil {
log.Error(logs.FailedToCreateGzipReader, zap.Error(err), logs.TagField(logs.TagDatapath)) h.logAndSendError(ctx, req, logs.FailedToCreateGzipReader, err)
ResponseError(c, "could read gzip file: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
defer func() { defer func() {
if err := gzipReader.Close(); err != nil { if err := gzipReader.Close(); err != nil {
log.Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath)) h.reqLogger(ctx).Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
} }
}() }()
reader = gzipReader reader = gzipReader
@ -250,8 +234,7 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
break break
} else if err != nil { } else if err != nil {
log.Error(logs.FailedToReadFileFromTar, zap.Error(err), logs.TagField(logs.TagDatapath)) h.logAndSendError(ctx, req, logs.FailedToReadFileFromTar, err)
ResponseError(c, "could not get next entry: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
@ -265,13 +248,13 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name)) attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name))
attributes = append(attributes, newAttribute(object.AttributeFileName, fileName)) attributes = append(attributes, newAttribute(object.AttributeFileName, fileName))
idObj, err := h.uploadObject(c, bkt, attributes, tarReader) idObj, err := h.uploadObject(ctx, bkt, attributes, tarReader)
if err != nil { if err != nil {
h.handlePutFrostFSErr(c, err, log) h.logAndSendError(ctx, req, logs.FailedToUploadObject, err)
return return
} }
log.Debug(logs.ObjectUploaded, h.reqLogger(ctx).Debug(logs.ObjectUploaded,
zap.String("oid", idObj.EncodeToString()), zap.String("oid", idObj.EncodeToString()),
zap.String("FileName", fileName), zap.String("FileName", fileName),
logs.TagField(logs.TagExternalStorage), logs.TagField(logs.TagExternalStorage),
@ -279,14 +262,6 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
} }
} }
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {
statusCode, msg, additionalFields := formErrorResponse("could not store file in frostfs", err)
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
log.Error(logs.CouldNotStoreFileInFrostfs, append(logFields, logs.TagField(logs.TagExternalStorage))...)
ResponseError(r, msg, statusCode)
}
func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token { func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token {
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil { if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
return tkn return tkn

View file

@ -5,13 +5,12 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
sdkstatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -19,30 +18,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type request struct {
*fasthttp.RequestCtx
log *zap.Logger
}
func newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
return request{
RequestCtx: ctx,
log: log,
}
}
func (r *request) handleFrostFSErr(err error, start time.Time) {
logFields := []zap.Field{
zap.Stringer("elapsed", time.Since(start)),
zap.Error(err),
}
statusCode, msg, additionalFields := formErrorResponse("could not receive object", err)
logFields = append(logFields, additionalFields...)
r.log.Error(logs.CouldNotReceiveObject, append(logFields, logs.TagField(logs.TagExternalStorage))...)
ResponseError(r.RequestCtx, msg, statusCode)
}
func bearerToken(ctx context.Context) *bearer.Token { func bearerToken(ctx context.Context) *bearer.Token {
if tkn, err := tokens.LoadBearerToken(ctx); err == nil { if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
return tkn return tkn
@ -84,14 +59,16 @@ func isValidValue(s string) bool {
return true return true
} }
func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) { func (h *Handler) reqLogger(ctx context.Context) *zap.Logger {
Review

question: is this function intentionally not used here and here? It's just that if within the handler package we wrap the utils.GetReqLogOrDefault(ctx, h.log) function in reqLogger(ctx context.Context), don't we want to use it wherever possible?

question: is this function intentionally not used [here](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/75328dd9340d5c1e4320ce95dc118b00f3a5b8c1/internal/handler/browse.go#L226) and [here](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/75328dd9340d5c1e4320ce95dc118b00f3a5b8c1/internal/handler/browse.go#L261)? It's just that if within the handler package we wrap the `utils.GetReqLogOrDefault(ctx, h.log)` function in `reqLogger(ctx context.Context)`, don't we want to use it wherever possible?
log.Error(logs.CouldNotGetBucket, zap.Error(err), logs.TagField(logs.TagDatapath)) return utils.GetReqLogOrDefault(ctx, h.log)
if client.IsErrContainerNotFound(err) {
ResponseError(c, "Not Found", fasthttp.StatusNotFound)
return
} }
ResponseError(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
func (h *Handler) logAndSendError(ctx context.Context, c *fasthttp.RequestCtx, msg string, err error, additional ...zap.Field) {
utils.GetReqLogOrDefault(ctx, h.log).Error(msg,
append([]zap.Field{zap.Error(err), logs.TagField(logs.TagDatapath)}, additional...)...)
msg, code := formErrorResponse(err)
ResponseError(c, msg, code)
} }
func newAddress(cnr cid.ID, obj oid.ID) oid.Address { func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
@ -112,31 +89,23 @@ func ResponseError(r *fasthttp.RequestCtx, msg string, code int) {
r.Error(msg+"\n", code) r.Error(msg+"\n", code)
} }
func formErrorResponse(message string, err error) (int, string, []zap.Field) { func formErrorResponse(err error) (string, int) {
var (
msg string
statusCode int
logFields []zap.Field
)
st := new(sdkstatus.ObjectAccessDenied)
switch { switch {
case errors.As(err, &st): case errors.Is(err, ErrAccessDenied):
statusCode = fasthttp.StatusForbidden return fmt.Sprintf("Storage Access Denied:\n%v", err), fasthttp.StatusForbidden
reason := st.Reason() case errors.Is(err, layer.ErrNodeAccessDenied):
msg = fmt.Sprintf("%s: %v: %s", message, err, reason) return fmt.Sprintf("Tree Access Denied:\n%v", err), fasthttp.StatusForbidden
logFields = append(logFields, zap.String("error_detail", reason))
case errors.Is(err, ErrQuotaLimitReached): case errors.Is(err, ErrQuotaLimitReached):
statusCode = fasthttp.StatusConflict return fmt.Sprintf("Quota Reached:\n%v", err), fasthttp.StatusConflict
msg = fmt.Sprintf("%s: %v", message, err) case errors.Is(err, ErrContainerNotFound):
case client.IsErrObjectNotFound(err) || client.IsErrContainerNotFound(err): return fmt.Sprintf("Container Not Found:\n%v", err), fasthttp.StatusNotFound
statusCode = fasthttp.StatusNotFound case errors.Is(err, ErrObjectNotFound):
msg = "Not Found" return fmt.Sprintf("Object Not Found:\n%v", err), fasthttp.StatusNotFound
case errors.Is(err, layer.ErrNodeNotFound):
return fmt.Sprintf("Tree Node Not Found:\n%v", err), fasthttp.StatusNotFound
case errors.Is(err, ErrGatewayTimeout):
return fmt.Sprintf("Gateway Timeout:\n%v", err), fasthttp.StatusGatewayTimeout
Review

Should we also note in the API documentation the possibility of returning a 504 GatewayTimeout?

Should we also note in the API documentation the possibility of returning a 504 GatewayTimeout?
default: default:
statusCode = fasthttp.StatusBadRequest return fmt.Sprintf("Bad Request:\n%v", err), fasthttp.StatusBadRequest
msg = fmt.Sprintf("%s: %v", message, err)
} }
return statusCode, msg, logFields
} }

View file

@ -77,7 +77,7 @@ const (
// Log messages with the "datapath" tag. // Log messages with the "datapath" tag.
const ( const (
CouldntParseCreationDate = "couldn't parse creation date" CouldntParseCreationDate = "couldn't parse creation date"
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload" FailedToDetectContentTypeFromPayload = "failed to detect Content-Type from payload"
FailedToAddObjectToArchive = "failed to add object to archive" FailedToAddObjectToArchive = "failed to add object to archive"
CloseZipWriter = "close zip writer" CloseZipWriter = "close zip writer"
IgnorePartEmptyFormName = "ignore part, empty form name" IgnorePartEmptyFormName = "ignore part, empty form name"
@ -104,28 +104,32 @@ const (
CouldNotReceiveMultipartForm = "could not receive multipart/form" CouldNotReceiveMultipartForm = "could not receive multipart/form"
ObjectsNotFound = "objects not found" ObjectsNotFound = "objects not found"
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed" IteratingOverSelectedObjectsFailed = "iterating over selected objects failed"
CouldNotGetBucket = "could not get bucket" FailedToGetBucketInfo = "could not get bucket info"
CouldNotResolveContainerID = "could not resolve container id" FailedToSubmitTaskToPool = "failed to submit task to pool"
FailedToSumbitTaskToPool = "failed to submit task to pool" ObjectWasDeleted = "object was deleted"
FailedToGetLatestVersionOfObject = "failed to get latest version of object"
FailedToCheckIfSettingsNodeExist = "failed to check if settings node exists"
FailedToListObjects = "failed to list objects"
FailedToParseTemplate = "failed to parse template"
FailedToExecuteTemplate = "failed to execute template"
FailedToUploadObject = "failed to upload object"
FailedToHeadObject = "failed to head object"
FailedToGetObject = "failed to get object"
FailedToGetObjectPayload = "failed to get object payload"
FailedToFindObjectByAttribute = "failed to get find object by attribute"
FailedToUnescapeOIDParam = "failed to unescape oid param"
InvalidOIDParam = "invalid oid param"
) )
// Log messages with the "external_storage" tag. // Log messages with the "external_storage" tag.
const ( const (
CouldNotReceiveObject = "could not receive object"
CouldNotSearchForObjects = "could not search for objects"
ObjectNotFound = "object not found" ObjectNotFound = "object not found"
ReadObjectListFailed = "read object list failed" ReadObjectListFailed = "read object list failed"
CouldNotStoreFileInFrostfs = "could not store file in frostfs"
FailedToHeadObject = "failed to head object"
ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName" ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName"
FailedToGetObject = "failed to get object"
ObjectUploaded = "object uploaded" ObjectUploaded = "object uploaded"
CouldNotGetContainerInfo = "could not get container info"
) )
// Log messages with the "external_storage_tree" tag. // Log messages with the "external_storage_tree" tag.
const ( const (
ObjectWasDeleted = "object was deleted" FoundSeveralSystemTreeNodes = "found several system tree nodes"
FailedToGetLatestVersionOfObject = "failed to get latest version of object"
FailedToCheckIfSettingsNodeExist = "Failed to check if settings node exists"
) )

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
qostagging "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" qostagging "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -48,7 +49,7 @@ func (x *FrostFS) Container(ctx context.Context, containerPrm handler.PrmContain
res, err := x.pool.GetContainer(ctx, prm) res, err := x.pool.GetContainer(ctx, prm)
if err != nil { if err != nil {
return nil, handleObjectError("read container via connection pool", err) return nil, handleStorageError("read container via connection pool", err)
} }
return &res, nil return &res, nil
@ -72,7 +73,7 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm handler.PrmObjectCreate)
idObj, err := x.pool.PutObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmPut) idObj, err := x.pool.PutObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmPut)
if err != nil { if err != nil {
return oid.ID{}, handleObjectError("save object via connection pool", err) return oid.ID{}, handleStorageError("save object via connection pool", err)
} }
return idObj.ObjectID, nil return idObj.ObjectID, nil
} }
@ -88,7 +89,7 @@ func (x payloadReader) Read(p []byte) (int, error) {
if err != nil && errors.Is(err, io.EOF) { if err != nil && errors.Is(err, io.EOF) {
return n, err return n, err
} }
return n, handleObjectError("read payload", err) return n, handleStorageError("read payload", err)
} }
// HeadObject implements frostfs.FrostFS interface method. // HeadObject implements frostfs.FrostFS interface method.
@ -105,7 +106,7 @@ func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*o
res, err := x.pool.HeadObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmHead) res, err := x.pool.HeadObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmHead)
if err != nil { if err != nil {
return nil, handleObjectError("read object header via connection pool", err) return nil, handleStorageError("read object header via connection pool", err)
} }
return &res, nil return &res, nil
@ -125,7 +126,7 @@ func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*han
res, err := x.pool.GetObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmGet) res, err := x.pool.GetObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmGet)
if err != nil { if err != nil {
return nil, handleObjectError("init full object reading via connection pool", err) return nil, handleStorageError("init full object reading via connection pool", err)
} }
return &handler.Object{ return &handler.Object{
@ -150,7 +151,7 @@ func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (
res, err := x.pool.ObjectRange(qostagging.ContextWithIOTag(ctx, clientIOTag), prmRange) res, err := x.pool.ObjectRange(qostagging.ContextWithIOTag(ctx, clientIOTag), prmRange)
if err != nil { if err != nil {
return nil, handleObjectError("init payload range reading via connection pool", err) return nil, handleStorageError("init payload range reading via connection pool", err)
} }
return payloadReader{&res}, nil return payloadReader{&res}, nil
@ -171,7 +172,7 @@ func (x *FrostFS) SearchObjects(ctx context.Context, prm handler.PrmObjectSearch
res, err := x.pool.SearchObjects(qostagging.ContextWithIOTag(ctx, clientIOTag), prmSearch) res, err := x.pool.SearchObjects(qostagging.ContextWithIOTag(ctx, clientIOTag), prmSearch)
if err != nil { if err != nil {
return nil, handleObjectError("init object search via connection pool", err) return nil, handleStorageError("init object search via connection pool", err)
} }
return &res, nil return &res, nil
@ -205,7 +206,7 @@ func (x *FrostFS) NetmapSnapshot(ctx context.Context) (netmap.NetMap, error) {
netmapSnapshot, err := x.pool.NetMapSnapshot(ctx) netmapSnapshot, err := x.pool.NetMapSnapshot(ctx)
if err != nil { if err != nil {
return netmapSnapshot, handleObjectError("get netmap via connection pool", err) return netmapSnapshot, handleStorageError("get netmap via connection pool", err)
} }
return netmapSnapshot, nil return netmapSnapshot, nil
@ -229,7 +230,7 @@ func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
networkInfo, err := x.pool.NetworkInfo(ctx) networkInfo, err := x.pool.NetworkInfo(ctx)
if err != nil { if err != nil {
return "", handleObjectError("read network info via client", err) return "", handleStorageError("read network info via client", err)
} }
domain := networkInfo.RawNetworkParameter("SystemDNS") domain := networkInfo.RawNetworkParameter("SystemDNS")
@ -240,7 +241,7 @@ func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
return string(domain), nil return string(domain), nil
} }
func handleObjectError(msg string, err error) error { func handleStorageError(msg string, err error) error {
if err == nil { if err == nil {
return nil return nil
} }
@ -253,6 +254,14 @@ func handleObjectError(msg string, err error) error {
return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason) return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason)
} }
if client.IsErrContainerNotFound(err) {
return fmt.Errorf("%s: %w: %s", msg, handler.ErrContainerNotFound, err.Error())
}
if client.IsErrObjectNotFound(err) {
return fmt.Errorf("%s: %w: %s", msg, handler.ErrObjectNotFound, err.Error())
}
if IsTimeoutError(err) { if IsTimeoutError(err) {
return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error()) return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error())
} }

View file

@ -18,7 +18,7 @@ func TestHandleObjectError(t *testing.T) {
msg := "some msg" msg := "some msg"
t.Run("nil error", func(t *testing.T) { t.Run("nil error", func(t *testing.T) {
err := handleObjectError(msg, nil) err := handleStorageError(msg, nil)
require.Nil(t, err) require.Nil(t, err)
}) })
@ -27,7 +27,7 @@ func TestHandleObjectError(t *testing.T) {
inputErr := new(apistatus.ObjectAccessDenied) inputErr := new(apistatus.ObjectAccessDenied)
inputErr.WriteReason(reason) inputErr.WriteReason(reason)
err := handleObjectError(msg, inputErr) err := handleStorageError(msg, inputErr)
require.ErrorIs(t, err, handler.ErrAccessDenied) require.ErrorIs(t, err, handler.ErrAccessDenied)
require.Contains(t, err.Error(), reason) require.Contains(t, err.Error(), reason)
require.Contains(t, err.Error(), msg) require.Contains(t, err.Error(), msg)
@ -38,7 +38,7 @@ func TestHandleObjectError(t *testing.T) {
inputErr := new(apistatus.ObjectAccessDenied) inputErr := new(apistatus.ObjectAccessDenied)
inputErr.WriteReason(reason) inputErr.WriteReason(reason)
err := handleObjectError(msg, inputErr) err := handleStorageError(msg, inputErr)
require.ErrorIs(t, err, handler.ErrQuotaLimitReached) require.ErrorIs(t, err, handler.ErrQuotaLimitReached)
require.Contains(t, err.Error(), reason) require.Contains(t, err.Error(), reason)
require.Contains(t, err.Error(), msg) require.Contains(t, err.Error(), msg)
@ -47,7 +47,7 @@ func TestHandleObjectError(t *testing.T) {
t.Run("simple timeout", func(t *testing.T) { t.Run("simple timeout", func(t *testing.T) {
inputErr := errors.New("timeout") inputErr := errors.New("timeout")
err := handleObjectError(msg, inputErr) err := handleStorageError(msg, inputErr)
require.ErrorIs(t, err, handler.ErrGatewayTimeout) require.ErrorIs(t, err, handler.ErrGatewayTimeout)
require.Contains(t, err.Error(), inputErr.Error()) require.Contains(t, err.Error(), inputErr.Error())
require.Contains(t, err.Error(), msg) require.Contains(t, err.Error(), msg)
@ -58,7 +58,7 @@ func TestHandleObjectError(t *testing.T) {
defer cancel() defer cancel()
<-ctx.Done() <-ctx.Done()
err := handleObjectError(msg, ctx.Err()) err := handleStorageError(msg, ctx.Err())
require.ErrorIs(t, err, handler.ErrGatewayTimeout) require.ErrorIs(t, err, handler.ErrGatewayTimeout)
require.Contains(t, err.Error(), ctx.Err().Error()) require.Contains(t, err.Error(), ctx.Err().Error())
require.Contains(t, err.Error(), msg) require.Contains(t, err.Error(), msg)
@ -67,7 +67,7 @@ func TestHandleObjectError(t *testing.T) {
t.Run("grpc deadline exceeded", func(t *testing.T) { t.Run("grpc deadline exceeded", func(t *testing.T) {
inputErr := fmt.Errorf("wrap grpc error: %w", status.Error(codes.DeadlineExceeded, "error")) inputErr := fmt.Errorf("wrap grpc error: %w", status.Error(codes.DeadlineExceeded, "error"))
err := handleObjectError(msg, inputErr) err := handleStorageError(msg, inputErr)
require.ErrorIs(t, err, handler.ErrGatewayTimeout) require.ErrorIs(t, err, handler.ErrGatewayTimeout)
require.Contains(t, err.Error(), inputErr.Error()) require.Contains(t, err.Error(), inputErr.Error())
require.Contains(t, err.Error(), msg) require.Contains(t, err.Error(), msg)
@ -76,7 +76,7 @@ func TestHandleObjectError(t *testing.T) {
t.Run("unknown error", func(t *testing.T) { t.Run("unknown error", func(t *testing.T) {
inputErr := errors.New("unknown error") inputErr := errors.New("unknown error")
err := handleObjectError(msg, inputErr) err := handleStorageError(msg, inputErr)
require.ErrorIs(t, err, inputErr) require.ErrorIs(t, err, inputErr)
require.Contains(t, err.Error(), msg) require.Contains(t, err.Error(), msg)
}) })

View file

@ -64,7 +64,7 @@ func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([
nodes, err := w.p.GetNodes(qostagging.ContextWithIOTag(ctx, clientIOTag), poolPrm) nodes, err := w.p.GetNodes(qostagging.ContextWithIOTag(ctx, clientIOTag), poolPrm)
if err != nil { if err != nil {
return nil, handleError(err) return nil, handleTreeError(err)
} }
res := make([]tree.NodeResponse, len(nodes)) res := make([]tree.NodeResponse, len(nodes))
@ -83,7 +83,7 @@ func getBearer(ctx context.Context) []byte {
return token.Marshal() return token.Marshal()
} }
func handleError(err error) error { func handleTreeError(err error) error {
if err == nil { if err == nil {
return nil return nil
} }
@ -123,7 +123,7 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
subTreeReader, err := w.p.GetSubTree(qostagging.ContextWithIOTag(ctx, clientIOTag), poolPrm) subTreeReader, err := w.p.GetSubTree(qostagging.ContextWithIOTag(ctx, clientIOTag), poolPrm)
if err != nil { if err != nil {
return nil, handleError(err) return nil, handleTreeError(err)
} }
var subtree []tree.NodeResponse var subtree []tree.NodeResponse
@ -134,7 +134,7 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
node, err = subTreeReader.Next() node, err = subTreeReader.Next()
} }
if err != io.EOF { if err != io.EOF {
return nil, handleError(err) return nil, handleTreeError(err)
} }
return subtree, nil return subtree, nil

View file

@ -8,14 +8,18 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
) )
type ( type (
Tree struct { Tree struct {
service ServiceClient service ServiceClient
log *zap.Logger
} }
// ServiceClient is a client to interact with tree service. // ServiceClient is a client to interact with tree service.
@ -73,8 +77,8 @@ const (
) )
// NewTree creates instance of Tree using provided address and create grpc connection. // NewTree creates instance of Tree using provided address and create grpc connection.
func NewTree(service ServiceClient) *Tree { func NewTree(service ServiceClient, log *zap.Logger) *Tree {
return &Tree{service: service} return &Tree{service: service, log: log}
} }
type Meta interface { type Meta interface {
@ -257,6 +261,9 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name
if len(nodes) == 0 { if len(nodes) == 0 {
return nil, layer.ErrNodeNotFound return nil, layer.ErrNodeNotFound
} }
if len(nodes) != 1 {
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemTreeNodes, zap.String("name", name), logs.TagField(logs.TagExternalStorageTree))
}
return newMultiNode(nodes) return newMultiNode(nodes)
} }
@ -296,7 +303,7 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
} }
if targetIndexNode == -1 { if targetIndexNode == -1 {
return nil, layer.ErrNodeNotFound return nil, fmt.Errorf("latest version: %w", layer.ErrNodeNotFound)
} }
return nodes[targetIndexNode], nil return nodes[targetIndexNode], nil
@ -423,6 +430,10 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr
return intermediateNodes, nil return intermediateNodes, nil
} }
func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
return utils.GetReqLogOrDefault(ctx, c.log)
}
func GetFilename(node NodeResponse) string { func GetFilename(node NodeResponse) string {
for _, kv := range node.GetMeta() { for _, kv := range node.GetMeta() {
if kv.GetKey() == FileNameKey { if kv.GetKey() == FileNameKey {