[#191] Refactor error handling and logging #221
18 changed files with 431 additions and 413 deletions
|
@ -724,12 +724,12 @@ func (a *app) stopServices() {
|
|||
}
|
||||
|
||||
func (a *app) configureRouter(workerPool *ants.Pool) {
|
||||
a.handle = handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool)
|
||||
a.handle = handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool), a.log), workerPool)
|
||||
|
||||
r := router.New()
|
||||
r.RedirectTrailingSlash = true
|
||||
r.NotFound = func(r *fasthttp.RequestCtx) {
|
||||
handler.ResponseError(r, "Not found", fasthttp.StatusNotFound)
|
||||
handler.ResponseError(r, "Route Not found", fasthttp.StatusNotFound)
|
||||
}
|
||||
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
|
||||
handler.ResponseError(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||||
|
|
|
@ -20,9 +20,11 @@ import (
|
|||
|
||||
containerv2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -99,6 +101,7 @@ func TestIntegration(t *testing.T) {
|
|||
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID) })
|
||||
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID) })
|
||||
t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID) })
|
||||
t.Run("test status codes "+version, func(t *testing.T) { checkStatusCodes(ctx, t, clientPool, ownerID, version) })
|
||||
|
||||
cancel()
|
||||
server.Wait()
|
||||
|
@ -267,7 +270,7 @@ func putWithDuplicateKeys(t *testing.T, CID cid.ID) {
|
|||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "key duplication error: "+attr+"\n", string(body))
|
||||
require.Contains(t, string(body), "key duplication error: "+attr+"\n")
|
||||
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
||||
}
|
||||
|
||||
|
@ -436,7 +439,80 @@ func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
|
|||
resp, err = http.DefaultClient.Do(req)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||
}
|
||||
|
||||
func checkStatusCodes(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, version string) {
|
||||
cli := http.Client{Timeout: 30 * time.Second}
|
||||
|
||||
t.Run("container not found by name", func(t *testing.T) {
|
||||
resp, err := cli.Get(testHost + "/get/unknown/object")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||
requireBodyContains(t, resp, "container not found")
|
||||
})
|
||||
|
||||
t.Run("container not found by cid", func(t *testing.T) {
|
||||
cnrIDTest := cidtest.ID()
|
||||
resp, err := cli.Get(testHost + "/get/" + cnrIDTest.EncodeToString() + "/object")
|
||||
require.NoError(t, err)
|
||||
requireBodyContains(t, resp, "container not found")
|
||||
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||
})
|
||||
|
||||
t.Run("object not found in storage", func(t *testing.T) {
|
||||
resp, err := cli.Get(testHost + "/get_by_attribute/" + testContainerName + "/FilePath/object2")
|
||||
require.NoError(t, err)
|
||||
requireBodyContains(t, resp, "object not found")
|
||||
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||
})
|
||||
|
||||
t.Run("access denied", func(t *testing.T) {
|
||||
basicACL := acl.Private
|
||||
var recs []*eacl.Record
|
||||
if version == "1.2.7" {
|
||||
basicACL = acl.PublicRWExtended
|
||||
rec := eacl.NewRecord()
|
||||
rec.SetAction(eacl.ActionDeny)
|
||||
rec.SetOperation(eacl.OperationGet)
|
||||
recs = append(recs, rec)
|
||||
}
|
||||
|
||||
cnrID, err := createContainerBase(ctx, t, clientPool, ownerID, basicACL, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
key, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
jsonToken, _ := makeBearerTokens(t, key, ownerID, version, recs...)
|
||||
|
||||
t.Run("get", func(t *testing.T) {
|
||||
request, err := http.NewRequest(http.MethodGet, testHost+"/get/"+cnrID.EncodeToString()+"/object", nil)
|
||||
require.NoError(t, err)
|
||||
request.Header.Set("Authorization", "Bearer "+jsonToken)
|
||||
|
||||
resp, err := cli.Do(request)
|
||||
require.NoError(t, err)
|
||||
requireBodyContains(t, resp, "access denied")
|
||||
require.Equal(t, http.StatusForbidden, resp.StatusCode)
|
||||
})
|
||||
|
||||
t.Run("upload", func(t *testing.T) {
|
||||
request, _, _ := makePutRequest(t, testHost+"/upload/"+cnrID.EncodeToString())
|
||||
request.Header.Set("Authorization", "Bearer "+jsonToken)
|
||||
|
||||
resp, err := cli.Do(request)
|
||||
require.NoError(t, err)
|
||||
requireBodyContains(t, resp, "access denied")
|
||||
require.Equal(t, http.StatusForbidden, resp.StatusCode)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func requireBodyContains(t *testing.T, resp *http.Response, msg string) {
|
||||
data, err := io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
defer resp.Body.Close()
|
||||
|
||||
require.Contains(t, strings.ToLower(string(data)), strings.ToLower(msg))
|
||||
}
|
||||
|
||||
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
|
||||
|
@ -485,6 +561,10 @@ func getPool(ctx context.Context, t *testing.T, key *keys.PrivateKey) *pool.Pool
|
|||
}
|
||||
|
||||
func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, name string) (cid.ID, error) {
|
||||
return createContainerBase(ctx, t, clientPool, ownerID, acl.PublicRWExtended, name)
|
||||
}
|
||||
|
||||
func createContainerBase(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, basicACL acl.Basic, name string) (cid.ID, error) {
|
||||
var policy netmap.PlacementPolicy
|
||||
err := policy.DecodeString("REP 1")
|
||||
require.NoError(t, err)
|
||||
|
@ -492,24 +572,28 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
|
|||
var cnr container.Container
|
||||
cnr.Init()
|
||||
cnr.SetPlacementPolicy(policy)
|
||||
cnr.SetBasicACL(acl.PublicRWExtended)
|
||||
cnr.SetBasicACL(basicACL)
|
||||
cnr.SetOwner(ownerID)
|
||||
|
||||
container.SetCreationTime(&cnr, time.Now())
|
||||
|
||||
var domain container.Domain
|
||||
domain.SetName(name)
|
||||
if name != "" {
|
||||
var domain container.Domain
|
||||
domain.SetName(name)
|
||||
|
||||
cnr.SetAttribute(containerv2.SysAttributeName, domain.Name())
|
||||
cnr.SetAttribute(containerv2.SysAttributeZone, domain.Zone())
|
||||
cnr.SetAttribute(containerv2.SysAttributeName, domain.Name())
|
||||
cnr.SetAttribute(containerv2.SysAttributeZone, domain.Zone())
|
||||
}
|
||||
|
||||
var waitPrm pool.WaitParams
|
||||
waitPrm.SetTimeout(15 * time.Second)
|
||||
waitPrm.SetPollInterval(3 * time.Second)
|
||||
|
||||
var prm pool.PrmContainerPut
|
||||
prm.SetContainer(cnr)
|
||||
prm.SetWaitParams(waitPrm)
|
||||
prm := pool.PrmContainerPut{
|
||||
ClientParams: client.PrmContainerPut{
|
||||
Container: &cnr,
|
||||
},
|
||||
WaitParams: &pool.WaitParams{
|
||||
Timeout: 15 * time.Second,
|
||||
PollInterval: 3 * time.Second,
|
||||
},
|
||||
}
|
||||
|
||||
CID, err := clientPool.PutContainer(ctx, prm)
|
||||
if err != nil {
|
||||
|
@ -556,13 +640,18 @@ func registerUser(t *testing.T, ctx context.Context, aioContainer testcontainers
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func makeBearerTokens(t *testing.T, key *keys.PrivateKey, ownerID user.ID, version string) (jsonTokenBase64, binaryTokenBase64 string) {
|
||||
func makeBearerTokens(t *testing.T, key *keys.PrivateKey, ownerID user.ID, version string, records ...*eacl.Record) (jsonTokenBase64, binaryTokenBase64 string) {
|
||||
tkn := new(bearer.Token)
|
||||
tkn.ForUser(ownerID)
|
||||
tkn.SetExp(10000)
|
||||
|
||||
if version == "1.2.7" {
|
||||
tkn.SetEACLTable(*eacl.NewTable())
|
||||
table := eacl.NewTable()
|
||||
for i := range records {
|
||||
table.AddRecord(records[i])
|
||||
}
|
||||
|
||||
tkn.SetEACLTable(*table)
|
||||
} else {
|
||||
tkn.SetImpersonate(true)
|
||||
}
|
||||
|
|
26
docs/api.md
26
docs/api.md
|
@ -94,6 +94,8 @@ The `filename` field from the multipart form will be set as `FileName` attribute
|
|||
|--------|----------------------------------------------|
|
||||
| 200 | Object created successfully. |
|
||||
| 400 | Some error occurred during object uploading. |
|
||||
| 403 | Access denied. |
|
||||
| 409 | Can not upload object due to quota reached. |
|
||||
|
||||
## Get object
|
||||
|
||||
|
@ -141,6 +143,7 @@ Get an object (payload and attributes) by an address.
|
|||
|--------|------------------------------------------------|
|
||||
| 200 | Object got successfully. |
|
||||
| 400 | Some error occurred during object downloading. |
|
||||
| 403 | Access denied. |
|
||||
| 404 | Container or object not found. |
|
||||
|
||||
###### Body
|
||||
|
@ -183,6 +186,7 @@ Get an object attributes by an address.
|
|||
|--------|---------------------------------------------------|
|
||||
| 200 | Object head successfully. |
|
||||
| 400 | Some error occurred during object HEAD operation. |
|
||||
| 403 | Access denied. |
|
||||
| 404 | Container or object not found. |
|
||||
|
||||
## Search object
|
||||
|
@ -233,6 +237,7 @@ If more than one object is found, an arbitrary one will be returned.
|
|||
|--------|------------------------------------------------|
|
||||
| 200 | Object got successfully. |
|
||||
| 400 | Some error occurred during object downloading. |
|
||||
| 403 | Access denied. |
|
||||
| 404 | Container or object not found. |
|
||||
|
||||
#### HEAD
|
||||
|
@ -269,6 +274,7 @@ If more than one object is found, an arbitrary one will be used to get attribute
|
|||
|--------|---------------------------------------|
|
||||
| 200 | Object head successfully. |
|
||||
| 400 | Some error occurred during operation. |
|
||||
| 403 | Access denied. |
|
||||
| 404 | Container or object not found. |
|
||||
|
||||
## Download archive
|
||||
|
@ -304,16 +310,16 @@ Archive can be compressed (see http-gw [configuration](gate-configuration.md#arc
|
|||
|
||||
###### Headers
|
||||
|
||||
| Header | Description |
|
||||
|-----------------------|-------------------------------------------------------------------------------------------------------------------|
|
||||
| `Content-Disposition` | Indicate how to browsers should treat file (`attachment`). Set `filename` as `archive.zip`. |
|
||||
| `Content-Type` | Indicate content type of object. Set to `application/zip` |
|
||||
| Header | Description |
|
||||
|-----------------------|---------------------------------------------------------------------------------------------|
|
||||
| `Content-Disposition` | Indicate how to browsers should treat file (`attachment`). Set `filename` as `archive.zip`. |
|
||||
| `Content-Type` | Indicate content type of object. Set to `application/zip` |
|
||||
|
||||
###### Status codes
|
||||
|
||||
| Status | Description |
|
||||
|--------|-----------------------------------------------------|
|
||||
| 200 | Object got successfully. |
|
||||
| 400 | Some error occurred during object downloading. |
|
||||
| 404 | Container or objects not found. |
|
||||
| 500 | Some inner error (e.g. error on streaming objects). |
|
||||
| Status | Description |
|
||||
|--------|------------------------------------------------|
|
||||
| 200 | Object got successfully. |
|
||||
| 400 | Some error occurred during object downloading. |
|
||||
| 403 | Access denied. |
|
||||
| 404 | Container or objects not found. |
|
||||
|
|
|
@ -223,7 +223,7 @@ func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.Buck
|
|||
return nil, err
|
||||
}
|
||||
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
log := h.reqLogger(ctx)
|
||||
dirs := make(map[string]struct{})
|
||||
result := &GetObjectsResponse{
|
||||
objects: make([]ResponseObject, 0, 100),
|
||||
|
@ -258,7 +258,7 @@ func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs Re
|
|||
|
||||
go func() {
|
||||
defer close(res)
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
||||
log := h.reqLogger(ctx).With(
|
||||
zap.String("cid", cnrID.EncodeToString()),
|
||||
zap.String("path", basePath),
|
||||
)
|
||||
|
@ -273,7 +273,7 @@ func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs Re
|
|||
})
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -328,20 +328,18 @@ type browseParams struct {
|
|||
listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error)
|
||||
}
|
||||
|
||||
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
|
||||
func (h *Handler) browseObjects(ctx context.Context, req *fasthttp.RequestCtx, p browseParams) {
|
||||
const S3Protocol = "s3"
|
||||
const FrostfsProtocol = "frostfs"
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
log := reqLog.With(
|
||||
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
|
||||
zap.String("bucket", p.bucketInfo.Name),
|
||||
zap.String("container", p.bucketInfo.CID.EncodeToString()),
|
||||
zap.String("prefix", p.prefix),
|
||||
)
|
||||
))
|
||||
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToListObjects, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -360,7 +358,7 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
|
|||
"parentDir": parentDir,
|
||||
}).Parse(h.config.IndexPageTemplate())
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToParseTemplate, err)
|
||||
return
|
||||
}
|
||||
bucketName := p.bucketInfo.Name
|
||||
|
@ -369,14 +367,14 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
|
|||
bucketName = p.bucketInfo.CID.EncodeToString()
|
||||
protocol = FrostfsProtocol
|
||||
}
|
||||
if err = tmpl.Execute(c, &BrowsePageData{
|
||||
if err = tmpl.Execute(req, &BrowsePageData{
|
||||
Container: bucketName,
|
||||
Prefix: p.prefix,
|
||||
Objects: objects,
|
||||
Protocol: protocol,
|
||||
HasErrors: resp.hasErrors,
|
||||
}); err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToExecuteTemplate, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,32 +30,32 @@ const (
|
|||
|
||||
var errNoCORS = errors.New("no CORS objects found")
|
||||
|
||||
func (h *Handler) Preflight(c *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.Preflight")
|
||||
func (h *Handler) Preflight(req *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.Preflight")
|
||||
defer span.End()
|
||||
|
||||
ctx = qostagging.ContextWithIOTag(ctx, internalIOTag)
|
||||
cidParam, _ := c.UserValue("cid").(string)
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
cidParam, _ := req.UserValue("cid").(string)
|
||||
reqLog := h.reqLogger(ctx)
|
||||
log := reqLog.With(zap.String("cid", cidParam))
|
||||
|
||||
origin := c.Request.Header.Peek(fasthttp.HeaderOrigin)
|
||||
origin := req.Request.Header.Peek(fasthttp.HeaderOrigin)
|
||||
if len(origin) == 0 {
|
||||
log.Error(logs.EmptyOriginRequestHeader, logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "Origin request header needed", fasthttp.StatusBadRequest)
|
||||
ResponseError(req, "Origin request header needed", fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
method := c.Request.Header.Peek(fasthttp.HeaderAccessControlRequestMethod)
|
||||
method := req.Request.Header.Peek(fasthttp.HeaderAccessControlRequestMethod)
|
||||
if len(method) == 0 {
|
||||
log.Error(logs.EmptyAccessControlRequestMethodHeader, logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "Access-Control-Request-Method request header needed", fasthttp.StatusBadRequest)
|
||||
ResponseError(req, "Access-Control-Request-Method request header needed", fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
corsRule := h.config.CORS()
|
||||
if corsRule != nil {
|
||||
setCORSHeadersFromRule(c, corsRule)
|
||||
setCORSHeadersFromRule(req, corsRule)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -66,12 +66,12 @@ func (h *Handler) Preflight(c *fasthttp.RequestCtx) {
|
|||
if errors.Is(err, errNoCORS) {
|
||||
status = fasthttp.StatusNotFound
|
||||
}
|
||||
ResponseError(c, "could not get CORS configuration: "+err.Error(), status)
|
||||
ResponseError(req, "could not get CORS configuration: "+err.Error(), status)
|
||||
return
|
||||
}
|
||||
|
||||
var headers []string
|
||||
requestHeaders := c.Request.Header.Peek(fasthttp.HeaderAccessControlRequestHeaders)
|
||||
requestHeaders := req.Request.Header.Peek(fasthttp.HeaderAccessControlRequestHeaders)
|
||||
if len(requestHeaders) > 0 {
|
||||
headers = strings.Split(string(requestHeaders), ", ")
|
||||
}
|
||||
|
@ -84,19 +84,19 @@ func (h *Handler) Preflight(c *fasthttp.RequestCtx) {
|
|||
if !checkSubslice(rule.AllowedHeaders, headers) {
|
||||
continue
|
||||
}
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowOrigin, string(origin))
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowMethods, strings.Join(rule.AllowedMethods, ", "))
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowOrigin, string(origin))
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowMethods, strings.Join(rule.AllowedMethods, ", "))
|
||||
if headers != nil {
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowHeaders, string(requestHeaders))
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowHeaders, string(requestHeaders))
|
||||
}
|
||||
if rule.ExposeHeaders != nil {
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlExposeHeaders, strings.Join(rule.ExposeHeaders, ", "))
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlExposeHeaders, strings.Join(rule.ExposeHeaders, ", "))
|
||||
}
|
||||
if rule.MaxAgeSeconds > 0 || rule.MaxAgeSeconds == -1 {
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlMaxAge, strconv.Itoa(rule.MaxAgeSeconds))
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlMaxAge, strconv.Itoa(rule.MaxAgeSeconds))
|
||||
}
|
||||
if o != wildcard {
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowCredentials, "true")
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowCredentials, "true")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -105,26 +105,26 @@ func (h *Handler) Preflight(c *fasthttp.RequestCtx) {
|
|||
}
|
||||
}
|
||||
log.Error(logs.CORSRuleWasNotMatched, logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "Forbidden", fasthttp.StatusForbidden)
|
||||
ResponseError(req, "Forbidden", fasthttp.StatusForbidden)
|
||||
}
|
||||
|
||||
func (h *Handler) SetCORSHeaders(c *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.SetCORSHeaders")
|
||||
func (h *Handler) SetCORSHeaders(req *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.SetCORSHeaders")
|
||||
defer span.End()
|
||||
|
||||
origin := c.Request.Header.Peek(fasthttp.HeaderOrigin)
|
||||
origin := req.Request.Header.Peek(fasthttp.HeaderOrigin)
|
||||
if len(origin) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
ctx = qostagging.ContextWithIOTag(ctx, internalIOTag)
|
||||
cidParam, _ := c.UserValue("cid").(string)
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
cidParam, _ := req.UserValue("cid").(string)
|
||||
reqLog := h.reqLogger(ctx)
|
||||
log := reqLog.With(zap.String("cid", cidParam))
|
||||
|
||||
corsRule := h.config.CORS()
|
||||
if corsRule != nil {
|
||||
setCORSHeadersFromRule(c, corsRule)
|
||||
setCORSHeadersFromRule(req, corsRule)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -143,26 +143,26 @@ func (h *Handler) SetCORSHeaders(c *fasthttp.RequestCtx) {
|
|||
for _, o := range rule.AllowedOrigins {
|
||||
if o == string(origin) {
|
||||
for _, m := range rule.AllowedMethods {
|
||||
if m == string(c.Method()) {
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowOrigin, string(origin))
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowMethods, strings.Join(rule.AllowedMethods, ", "))
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowCredentials, "true")
|
||||
c.Response.Header.Set(fasthttp.HeaderVary, fasthttp.HeaderOrigin)
|
||||
if m == string(req.Method()) {
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowOrigin, string(origin))
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowMethods, strings.Join(rule.AllowedMethods, ", "))
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowCredentials, "true")
|
||||
req.Response.Header.Set(fasthttp.HeaderVary, fasthttp.HeaderOrigin)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if o == wildcard {
|
||||
for _, m := range rule.AllowedMethods {
|
||||
if m == string(c.Method()) {
|
||||
if m == string(req.Method()) {
|
||||
if withCredentials {
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowOrigin, string(origin))
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowCredentials, "true")
|
||||
c.Response.Header.Set(fasthttp.HeaderVary, fasthttp.HeaderOrigin)
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowOrigin, string(origin))
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowCredentials, "true")
|
||||
req.Response.Header.Set(fasthttp.HeaderVary, fasthttp.HeaderOrigin)
|
||||
} else {
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowOrigin, o)
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowOrigin, o)
|
||||
}
|
||||
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowMethods, strings.Join(rule.AllowedMethods, ", "))
|
||||
req.Response.Header.Set(fasthttp.HeaderAccessControlAllowMethods, strings.Join(rule.AllowedMethods, ", "))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,43 +25,38 @@ import (
|
|||
)
|
||||
|
||||
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
|
||||
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadByAddressOrBucketName")
|
||||
func (h *Handler) DownloadByAddressOrBucketName(req *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadByAddressOrBucketName")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
cidParam := c.UserValue("cid").(string)
|
||||
oidParam := c.UserValue("oid").(string)
|
||||
downloadParam := c.QueryArgs().GetBool("download")
|
||||
cidParam := req.UserValue("cid").(string)
|
||||
oidParam := req.UserValue("oid").(string)
|
||||
downloadParam := req.QueryArgs().GetBool("download")
|
||||
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
||||
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
|
||||
zap.String("cid", cidParam),
|
||||
zap.String("oid", oidParam),
|
||||
)
|
||||
))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidParam)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
||||
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
||||
log.Error(logs.FailedToCheckIfSettingsNodeExist, zap.String("cid", bktInfo.CID.String()),
|
||||
zap.Error(checkS3Err), logs.TagField(logs.TagExternalStorageTree))
|
||||
logAndSendBucketError(c, log, checkS3Err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToCheckIfSettingsNodeExist, checkS3Err)
|
||||
return
|
||||
}
|
||||
|
||||
req := newRequest(c, log)
|
||||
|
||||
var objID oid.ID
|
||||
if checkS3Err == nil && shouldDownload(oidParam, downloadParam) {
|
||||
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.receiveFile)
|
||||
} else if err = objID.DecodeString(oidParam); err == nil {
|
||||
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.receiveFile)
|
||||
} else {
|
||||
h.browseIndex(c, checkS3Err != nil)
|
||||
h.browseIndex(ctx, req, cidParam, oidParam, checkS3Err != nil)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,12 +65,11 @@ func shouldDownload(oidParam string, downloadParam bool) bool {
|
|||
}
|
||||
|
||||
// DownloadByAttribute handles attribute-based download requests.
|
||||
func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadByAttribute")
|
||||
func (h *Handler) DownloadByAttribute(req *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadByAttribute")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
h.byAttribute(c, h.receiveFile)
|
||||
h.byAttribute(ctx, req, h.receiveFile)
|
||||
}
|
||||
|
||||
func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
|
||||
|
@ -95,31 +89,33 @@ func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op
|
|||
}
|
||||
|
||||
// DownloadZip handles zip by prefix requests.
|
||||
func (h *Handler) DownloadZip(c *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadZip")
|
||||
func (h *Handler) DownloadZip(req *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadZip")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
scid, _ := req.UserValue("cid").(string)
|
||||
prefix, _ := req.UserValue("prefix").(string)
|
||||
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", scid), zap.String("prefix", prefix)))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||
return
|
||||
}
|
||||
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
|
||||
|
||||
resSearch, err := h.searchObjectsByPrefix(ctx, bktInfo.CID, prefix)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
|
||||
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
|
||||
req.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
|
||||
req.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
|
||||
|
||||
c.SetBodyStreamWriter(h.getZipResponseWriter(ctx, log, resSearch, bktInfo))
|
||||
req.SetBodyStreamWriter(h.getZipResponseWriter(ctx, resSearch, bktInfo))
|
||||
}
|
||||
|
||||
func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
||||
func (h *Handler) getZipResponseWriter(ctx context.Context, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
||||
return func(w *bufio.Writer) {
|
||||
defer resSearch.Close()
|
||||
|
||||
|
@ -127,20 +123,20 @@ func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, res
|
|||
zipWriter := zip.NewWriter(w)
|
||||
var objectsWritten int
|
||||
|
||||
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
|
||||
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, bktInfo.CID, buf,
|
||||
func(obj *object.Object) (io.Writer, error) {
|
||||
objectsWritten++
|
||||
return h.createZipFile(zipWriter, obj)
|
||||
}),
|
||||
)
|
||||
if errIter != nil {
|
||||
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
|
||||
return
|
||||
} else if objectsWritten == 0 {
|
||||
log.Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
|
||||
}
|
||||
if err := zipWriter.Close(); err != nil {
|
||||
log.Error(logs.CloseZipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Error(logs.CloseZipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -164,31 +160,33 @@ func (h *Handler) createZipFile(zw *zip.Writer, obj *object.Object) (io.Writer,
|
|||
}
|
||||
|
||||
// DownloadTar forms tar.gz from objects by prefix.
|
||||
func (h *Handler) DownloadTar(c *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadTar")
|
||||
func (h *Handler) DownloadTar(req *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadTar")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
scid, _ := req.UserValue("cid").(string)
|
||||
prefix, _ := req.UserValue("prefix").(string)
|
||||
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", scid), zap.String("prefix", prefix)))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||
return
|
||||
}
|
||||
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
|
||||
|
||||
resSearch, err := h.searchObjectsByPrefix(ctx, bktInfo.CID, prefix)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip")
|
||||
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"")
|
||||
req.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip")
|
||||
req.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"")
|
||||
|
||||
c.SetBodyStreamWriter(h.getTarResponseWriter(ctx, log, resSearch, bktInfo))
|
||||
req.SetBodyStreamWriter(h.getTarResponseWriter(ctx, resSearch, bktInfo))
|
||||
}
|
||||
|
||||
func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
||||
func (h *Handler) getTarResponseWriter(ctx context.Context, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
||||
return func(w *bufio.Writer) {
|
||||
defer resSearch.Close()
|
||||
|
||||
|
@ -203,26 +201,26 @@ func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, res
|
|||
|
||||
defer func() {
|
||||
if err := tarWriter.Close(); err != nil {
|
||||
log.Error(logs.CloseTarWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Error(logs.CloseTarWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
}
|
||||
if err := gzipWriter.Close(); err != nil {
|
||||
log.Error(logs.CloseGzipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Error(logs.CloseGzipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
}
|
||||
}()
|
||||
|
||||
var objectsWritten int
|
||||
buf := make([]byte, 3<<20) // the same as for upload
|
||||
|
||||
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
|
||||
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, bktInfo.CID, buf,
|
||||
func(obj *object.Object) (io.Writer, error) {
|
||||
objectsWritten++
|
||||
return h.createTarFile(tarWriter, obj)
|
||||
}),
|
||||
)
|
||||
if errIter != nil {
|
||||
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
|
||||
} else if objectsWritten == 0 {
|
||||
log.Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -240,9 +238,9 @@ func (h *Handler) createTarFile(tw *tar.Writer, obj *object.Object) (io.Writer,
|
|||
})
|
||||
}
|
||||
|
||||
func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool {
|
||||
func (h *Handler) putObjectToArchive(ctx context.Context, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool {
|
||||
return func(id oid.ID) bool {
|
||||
log = log.With(zap.String("oid", id.EncodeToString()))
|
||||
logger := h.reqLogger(ctx).With(zap.String("oid", id.EncodeToString()))
|
||||
|
||||
prm := PrmObjectGet{
|
||||
PrmAuth: PrmAuth{
|
||||
|
@ -253,18 +251,18 @@ func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID
|
|||
|
||||
resGet, err := h.frostfs.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToGetObject, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
logger.Error(logs.FailedToGetObject, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
return false
|
||||
}
|
||||
|
||||
fileWriter, err := createArchiveHeader(&resGet.Header)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
logger.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
return false
|
||||
}
|
||||
|
||||
if err = writeToArchive(resGet, fileWriter, buf); err != nil {
|
||||
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
logger.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -272,28 +270,17 @@ func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID
|
|||
}
|
||||
}
|
||||
|
||||
func (h *Handler) searchObjectsByPrefix(c *fasthttp.RequestCtx, log *zap.Logger, cnrID cid.ID) (ResObjectSearch, error) {
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
prefix, _ := c.UserValue("prefix").(string)
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
func (h *Handler) searchObjectsByPrefix(ctx context.Context, cnrID cid.ID, prefix string) (ResObjectSearch, error) {
|
||||
prefix, err := url.QueryUnescape(prefix)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix),
|
||||
zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("unescape prefix: %w", err)
|
||||
}
|
||||
|
||||
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
||||
|
||||
resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("search objects by prefix: %w", err)
|
||||
}
|
||||
|
||||
return resSearch, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -144,6 +143,10 @@ var (
|
|||
ErrGatewayTimeout = errors.New("gateway timeout")
|
||||
// ErrQuotaLimitReached is returned from FrostFS in case of quota exceeded.
|
||||
ErrQuotaLimitReached = errors.New("quota limit reached")
|
||||
// ErrContainerNotFound is returned from FrostFS in case of container was not found.
|
||||
ErrContainerNotFound = errors.New("container not found")
|
||||
// ErrObjectNotFound is returned from FrostFS in case of object was not found.
|
||||
ErrObjectNotFound = errors.New("object not found")
|
||||
)
|
||||
|
||||
// FrostFS represents virtual connection to FrostFS network.
|
||||
|
@ -203,7 +206,7 @@ func New(params *AppParams, config Config, tree layer.TreeService, workerPool *a
|
|||
|
||||
// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byNativeAddress(ctx context.Context, req request, cnrID cid.ID, objID oid.ID, handler func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byNativeAddress(ctx context.Context, req *fasthttp.RequestCtx, cnrID cid.ID, objID oid.ID, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "handler.byNativeAddress")
|
||||
defer span.End()
|
||||
|
||||
|
@ -213,72 +216,59 @@ func (h *Handler) byNativeAddress(ctx context.Context, req request, cnrID cid.ID
|
|||
|
||||
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// resolves object address from S3-like path <bucket name>/<object key>.
|
||||
func (h *Handler) byS3Path(ctx context.Context, req request, cnrID cid.ID, path string, handler func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byS3Path(ctx context.Context, req *fasthttp.RequestCtx, cnrID cid.ID, path string, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "handler.byS3Path")
|
||||
defer span.End()
|
||||
|
||||
c, log := req.RequestCtx, req.log
|
||||
|
||||
foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToGetLatestVersionOfObject, zap.Error(err), zap.String("cid", cnrID.String()),
|
||||
zap.String("path", path), logs.TagField(logs.TagExternalStorageTree))
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetLatestVersionOfObject, err, zap.String("path", path))
|
||||
return
|
||||
}
|
||||
if foundOID.IsDeleteMarker {
|
||||
log.Error(logs.ObjectWasDeleted, logs.TagField(logs.TagExternalStorageTree))
|
||||
ResponseError(c, "object deleted", fasthttp.StatusNotFound)
|
||||
h.logAndSendError(ctx, req, logs.ObjectWasDeleted, ErrObjectNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
addr := newAddress(cnrID, foundOID.OID)
|
||||
handler(ctx, newRequest(c, log), addr)
|
||||
handler(ctx, req, addr)
|
||||
}
|
||||
|
||||
// byAttribute is a wrapper similar to byNativeAddress.
|
||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Context, request, oid.Address)) {
|
||||
cidParam, _ := c.UserValue("cid").(string)
|
||||
key, _ := c.UserValue("attr_key").(string)
|
||||
val, _ := c.UserValue("attr_val").(string)
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
func (h *Handler) byAttribute(ctx context.Context, req *fasthttp.RequestCtx, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
|
||||
cidParam, _ := req.UserValue("cid").(string)
|
||||
key, _ := req.UserValue("attr_key").(string)
|
||||
val, _ := req.UserValue("attr_val").(string)
|
||||
|
||||
key, err := url.QueryUnescape(key)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_key", key),
|
||||
zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToUnescapeQuery, err, zap.String("cid", cidParam), zap.String("attr_key", key))
|
||||
return
|
||||
}
|
||||
|
||||
val, err = url.QueryUnescape(val)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_val", val),
|
||||
zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToUnescapeQuery, err, zap.String("cid", cidParam), zap.String("attr_val", key))
|
||||
return
|
||||
}
|
||||
|
||||
val = prepareAtribute(key, val)
|
||||
|
||||
log = log.With(zap.String("cid", cidParam), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", cidParam),
|
||||
zap.String("attr_key", key), zap.String("attr_val", val)))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidParam)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
objID, err := h.findObjectByAttribute(ctx, log, bktInfo.CID, key, val)
|
||||
objID, err := h.findObjectByAttribute(ctx, bktInfo.CID, key, val)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
ResponseError(c, err.Error(), fasthttp.StatusNotFound)
|
||||
return
|
||||
err = fmt.Errorf("%w: %s", ErrObjectNotFound, err.Error())
|
||||
}
|
||||
|
||||
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToFindObjectByAttribute, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -286,14 +276,13 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte
|
|||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(objID)
|
||||
|
||||
handler(ctx, newRequest(c, log), addr)
|
||||
handler(ctx, req, addr)
|
||||
}
|
||||
|
||||
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
|
||||
func (h *Handler) findObjectByAttribute(ctx context.Context, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
|
||||
res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
return oid.ID{}, fmt.Errorf("could not search for objects: %w", err)
|
||||
return oid.ID{}, fmt.Errorf("search objects: %w", err)
|
||||
}
|
||||
defer res.Close()
|
||||
|
||||
|
@ -303,14 +292,14 @@ func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cn
|
|||
if n == 0 {
|
||||
switch {
|
||||
case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal):
|
||||
log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage))
|
||||
h.reqLogger(ctx).Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage))
|
||||
attrVal = prepareAtribute(attrFileName, attrVal)
|
||||
return h.findObjectByAttribute(ctx, log, cnrID, attrFileName, attrVal)
|
||||
return h.findObjectByAttribute(ctx, cnrID, attrFileName, attrVal)
|
||||
case errors.Is(err, io.EOF):
|
||||
log.Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
h.reqLogger(ctx).Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
return oid.ID{}, fmt.Errorf("object not found: %w", err)
|
||||
default:
|
||||
log.Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
h.reqLogger(ctx).Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
return oid.ID{}, fmt.Errorf("read object list failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
@ -369,13 +358,13 @@ func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*ci
|
|||
zone := h.config.FormContainerZone(namespace)
|
||||
cnrID, err = h.containerResolver.Resolve(ctx, zone, containerID)
|
||||
if err != nil && strings.Contains(err.Error(), "not found") {
|
||||
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error())
|
||||
err = fmt.Errorf("%w: %s", ErrContainerNotFound, err.Error())
|
||||
}
|
||||
}
|
||||
return cnrID, err
|
||||
}
|
||||
|
||||
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
|
||||
func (h *Handler) getBucketInfo(ctx context.Context, containerName string) (*data.BucketInfo, error) {
|
||||
ns, err := middleware.GetNamespace(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -387,21 +376,16 @@ func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *
|
|||
|
||||
cnrID, err := h.resolveContainer(ctx, containerName)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotResolveContainerID, zap.Error(err), zap.String("cnrName", containerName),
|
||||
logs.TagField(logs.TagDatapath))
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("resolve container: %w", err)
|
||||
}
|
||||
|
||||
bktInfo, err := h.readContainer(ctx, *cnrID)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotGetContainerInfo, zap.Error(err), zap.String("cnrName", containerName),
|
||||
zap.String("cnrName", cnrID.String()),
|
||||
logs.TagField(logs.TagExternalStorage))
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("read container: %w", err)
|
||||
}
|
||||
|
||||
if err = h.cache.Put(bktInfo); err != nil {
|
||||
log.Warn(logs.CouldntPutBucketIntoCache,
|
||||
h.reqLogger(ctx).Warn(logs.CouldntPutBucketIntoCache,
|
||||
zap.String("bucket name", bktInfo.Name),
|
||||
zap.Stringer("bucket cid", bktInfo.CID),
|
||||
zap.Error(err),
|
||||
|
@ -434,31 +418,24 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
|
|||
return bktInfo, err
|
||||
}
|
||||
|
||||
func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.browseIndex")
|
||||
func (h *Handler) browseIndex(ctx context.Context, req *fasthttp.RequestCtx, cidParam, oidParam string, isNativeList bool) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "handler.browseIndex")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
if !h.config.IndexPageEnabled() {
|
||||
c.SetStatusCode(fasthttp.StatusNotFound)
|
||||
req.SetStatusCode(fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
cidURLParam := c.UserValue("cid").(string)
|
||||
oidURLParam := c.UserValue("oid").(string)
|
||||
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
log := reqLog.With(zap.String("cid", cidURLParam), zap.String("oid", oidURLParam))
|
||||
|
||||
unescapedKey, err := url.QueryUnescape(oidURLParam)
|
||||
unescapedKey, err := url.QueryUnescape(oidParam)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToUnescapeOIDParam, err)
|
||||
return
|
||||
}
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log)
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidParam)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -468,7 +445,7 @@ func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) {
|
|||
listFunc = h.getDirObjectsNative
|
||||
}
|
||||
|
||||
h.browseObjects(c, browseParams{
|
||||
h.browseObjects(ctx, req, browseParams{
|
||||
bucketInfo: bktInfo,
|
||||
prefix: unescapedKey,
|
||||
listObjects: listFunc,
|
||||
|
|
|
@ -409,7 +409,7 @@ func TestFindObjectByAttribute(t *testing.T) {
|
|||
obj.SetAttributes(tc.firstAttr, tc.secondAttr)
|
||||
hc.cfg.additionalSearch = tc.additionalSearch
|
||||
|
||||
objID, err := hc.Handler().findObjectByAttribute(ctx, hc.Handler().log, cnrID, tc.reqAttrKey, tc.reqAttrValue)
|
||||
objID, err := hc.Handler().findObjectByAttribute(ctx, cnrID, tc.reqAttrKey, tc.reqAttrValue)
|
||||
if tc.err != "" {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), tc.err)
|
||||
|
|
|
@ -27,7 +27,7 @@ const (
|
|||
hdrContainerID = "X-Container-Id"
|
||||
)
|
||||
|
||||
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) {
|
||||
func (h *Handler) headObject(ctx context.Context, req *fasthttp.RequestCtx, objectAddress oid.Address) {
|
||||
var start = time.Now()
|
||||
|
||||
btoken := bearerToken(ctx)
|
||||
|
@ -41,7 +41,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
|||
|
||||
obj, err := h.frostfs.HeadObject(ctx, prm)
|
||||
if err != nil {
|
||||
req.handleFrostFSErr(err, start)
|
||||
h.logAndSendError(ctx, req, logs.FailedToHeadObject, err, zap.Stringer("elapsed", time.Since(start)))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
|||
case object.AttributeTimestamp:
|
||||
value, err := strconv.ParseInt(val, 10, 64)
|
||||
if err != nil {
|
||||
req.log.Info(logs.CouldntParseCreationDate,
|
||||
h.reqLogger(ctx).Info(logs.CouldntParseCreationDate,
|
||||
zap.String("key", key),
|
||||
zap.String("val", val),
|
||||
zap.Error(err),
|
||||
|
@ -100,7 +100,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
|||
return h.frostfs.RangeObject(ctx, prmRange)
|
||||
}, filename)
|
||||
if err != nil && err != io.EOF {
|
||||
req.handleFrostFSErr(err, start)
|
||||
h.logAndSendError(ctx, req, logs.FailedToDetectContentTypeFromPayload, err, zap.Stringer("elapsed", time.Since(start)))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -116,48 +116,44 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
|||
}
|
||||
|
||||
// HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format.
|
||||
func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.HeadByAddressOrBucketName")
|
||||
func (h *Handler) HeadByAddressOrBucketName(req *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.HeadByAddressOrBucketName")
|
||||
defer span.End()
|
||||
|
||||
cidParam, _ := c.UserValue("cid").(string)
|
||||
oidParam, _ := c.UserValue("oid").(string)
|
||||
cidParam, _ := req.UserValue("cid").(string)
|
||||
oidParam, _ := req.UserValue("oid").(string)
|
||||
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
||||
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
|
||||
zap.String("cid", cidParam),
|
||||
zap.String("oid", oidParam),
|
||||
)
|
||||
))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidParam)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
||||
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
||||
log.Error(logs.FailedToCheckIfSettingsNodeExist, zap.String("cid", bktInfo.CID.String()),
|
||||
zap.Error(checkS3Err), logs.TagField(logs.TagExternalStorageTree))
|
||||
logAndSendBucketError(c, log, checkS3Err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToCheckIfSettingsNodeExist, checkS3Err)
|
||||
return
|
||||
}
|
||||
|
||||
req := newRequest(c, log)
|
||||
|
||||
var objID oid.ID
|
||||
if checkS3Err == nil {
|
||||
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.headObject)
|
||||
} else if err = objID.DecodeString(oidParam); err == nil {
|
||||
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.headObject)
|
||||
} else {
|
||||
logAndSendBucketError(c, log, checkS3Err)
|
||||
h.logAndSendError(ctx, req, logs.InvalidOIDParam, err)
|
||||
}
|
||||
}
|
||||
|
||||
// HeadByAttribute handles attribute-based head requests.
|
||||
func (h *Handler) HeadByAttribute(c *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.HeadByAttribute")
|
||||
func (h *Handler) HeadByAttribute(req *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.HeadByAttribute")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
h.byAttribute(c, h.headObject)
|
||||
h.byAttribute(ctx, req, h.headObject)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"strconv"
|
||||
|
@ -53,7 +54,7 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
|||
}
|
||||
|
||||
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
||||
func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
|
||||
func (h *Handler) getPayload(ctx context.Context, p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
|
||||
cid, ok := p.obj.Header.ContainerID()
|
||||
if !ok {
|
||||
return nil, 0, errors.New("no container id set")
|
||||
|
@ -66,7 +67,6 @@ func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64,
|
|||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
ctx := p.req.RequestCtx
|
||||
params := PrmInitMultiObjectReader{
|
||||
Addr: newAddress(cid, oid),
|
||||
Bearer: bearerToken(ctx),
|
||||
|
|
|
@ -63,11 +63,10 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error), file
|
|||
|
||||
type getMultiobjectBodyParams struct {
|
||||
obj *Object
|
||||
req request
|
||||
strSize string
|
||||
}
|
||||
|
||||
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address) {
|
||||
func (h *Handler) receiveFile(ctx context.Context, req *fasthttp.RequestCtx, objAddress oid.Address) {
|
||||
var (
|
||||
shouldDownload = req.QueryArgs().GetBool("download")
|
||||
start = time.Now()
|
||||
|
@ -85,12 +84,12 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
|||
|
||||
rObj, err := h.frostfs.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
req.handleFrostFSErr(err, start)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetObject, err, zap.Stringer("elapsed", time.Since(start)))
|
||||
return
|
||||
}
|
||||
|
||||
// we can't close reader in this function, so how to do it?
|
||||
req.setIDs(rObj.Header)
|
||||
setIDs(req, rObj.Header)
|
||||
payload := rObj.Payload
|
||||
payloadSize := rObj.Header.PayloadSize()
|
||||
for _, attr := range rObj.Header.Attributes() {
|
||||
|
@ -107,8 +106,8 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
|||
case object.AttributeFileName:
|
||||
filename = val
|
||||
case object.AttributeTimestamp:
|
||||
if err = req.setTimestamp(val); err != nil {
|
||||
req.log.Error(logs.CouldntParseCreationDate,
|
||||
if err = setTimestamp(req, val); err != nil {
|
||||
h.reqLogger(ctx).Error(logs.CouldntParseCreationDate,
|
||||
zap.String("val", val),
|
||||
zap.Error(err),
|
||||
logs.TagField(logs.TagDatapath))
|
||||
|
@ -118,13 +117,12 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
|||
case object.AttributeFilePath:
|
||||
filepath = val
|
||||
case attributeMultipartObjectSize:
|
||||
payload, payloadSize, err = h.getPayload(getMultiobjectBodyParams{
|
||||
payload, payloadSize, err = h.getPayload(ctx, getMultiobjectBodyParams{
|
||||
obj: rObj,
|
||||
req: req,
|
||||
strSize: val,
|
||||
})
|
||||
if err != nil {
|
||||
req.handleFrostFSErr(err, start)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetObjectPayload, err, zap.Stringer("elapsed", time.Since(start)))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -133,7 +131,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
|||
filename = filepath
|
||||
}
|
||||
|
||||
req.setDisposition(shouldDownload, filename)
|
||||
setDisposition(req, shouldDownload, filename)
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||
|
||||
|
@ -145,8 +143,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
|||
return payload, nil
|
||||
}, filename)
|
||||
if err != nil && err != io.EOF {
|
||||
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToDetectContentTypeFromPayload, err, zap.Stringer("elapsed", time.Since(start)))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -165,7 +162,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
|||
req.Response.SetBodyStream(payload, int(payloadSize))
|
||||
}
|
||||
|
||||
func (r *request) setIDs(obj object.Object) {
|
||||
func setIDs(r *fasthttp.RequestCtx, obj object.Object) {
|
||||
objID, _ := obj.ID()
|
||||
cnrID, _ := obj.ContainerID()
|
||||
r.Response.Header.Set(hdrObjectID, objID.String())
|
||||
|
@ -173,7 +170,7 @@ func (r *request) setIDs(obj object.Object) {
|
|||
r.Response.Header.Set(hdrContainerID, cnrID.String())
|
||||
}
|
||||
|
||||
func (r *request) setDisposition(shouldDownload bool, filename string) {
|
||||
func setDisposition(r *fasthttp.RequestCtx, shouldDownload bool, filename string) {
|
||||
const (
|
||||
inlineDisposition = "inline"
|
||||
attachmentDisposition = "attachment"
|
||||
|
@ -187,7 +184,7 @@ func (r *request) setDisposition(shouldDownload bool, filename string) {
|
|||
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||
}
|
||||
|
||||
func (r *request) setTimestamp(timestamp string) error {
|
||||
func setTimestamp(r *fasthttp.RequestCtx, timestamp string) error {
|
||||
value, err := strconv.ParseInt(timestamp, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -50,44 +50,41 @@ func (pr *putResponse) encode(w io.Writer) error {
|
|||
}
|
||||
|
||||
// Upload handles multipart upload request.
|
||||
func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.Upload")
|
||||
func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.Upload")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
var file MultipartFile
|
||||
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
bodyStream := c.RequestBodyStream()
|
||||
scid, _ := req.UserValue("cid").(string)
|
||||
bodyStream := req.RequestBodyStream()
|
||||
drainBuf := make([]byte, drainBufSize)
|
||||
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
log := reqLog.With(zap.String("cid", scid))
|
||||
log := h.reqLogger(ctx)
|
||||
ctx = utils.SetReqLog(ctx, log.With(zap.String("cid", scid)))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
boundary := string(c.Request.Header.MultipartFormBoundary())
|
||||
boundary := string(req.Request.Header.MultipartFormBoundary())
|
||||
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
|
||||
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.CouldNotReceiveMultipartForm, err)
|
||||
return
|
||||
}
|
||||
|
||||
filtered, err := filterHeaders(log, &c.Request.Header)
|
||||
filtered, err := filterHeaders(log, &req.Request.Header)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToFilterHeaders, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToFilterHeaders, err)
|
||||
return
|
||||
}
|
||||
|
||||
if c.Request.Header.Peek(explodeArchiveHeader) != nil {
|
||||
h.explodeArchive(request{c, log}, bktInfo, file, filtered)
|
||||
if req.Request.Header.Peek(explodeArchiveHeader) != nil {
|
||||
h.explodeArchive(ctx, req, bktInfo, file, filtered)
|
||||
} else {
|
||||
h.uploadSingleObject(request{c, log}, bktInfo, file, filtered)
|
||||
h.uploadSingleObject(ctx, req, bktInfo, file, filtered)
|
||||
}
|
||||
|
||||
// Multipart is multipart and thus can contain more than one part which
|
||||
|
@ -104,46 +101,39 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *Handler) uploadSingleObject(req request, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
|
||||
c, log := req.RequestCtx, req.log
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.uploadSingleObject")
|
||||
func (h *Handler) uploadSingleObject(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "handler.uploadSingleObject")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
setIfNotExist(filtered, object.AttributeFileName, file.FileName())
|
||||
|
||||
attributes, err := h.extractAttributes(c, log, filtered)
|
||||
attributes, err := h.extractAttributes(ctx, req, filtered)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err)
|
||||
return
|
||||
}
|
||||
|
||||
idObj, err := h.uploadObject(c, bkt, attributes, file)
|
||||
idObj, err := h.uploadObject(ctx, bkt, attributes, file)
|
||||
if err != nil {
|
||||
h.handlePutFrostFSErr(c, err, log)
|
||||
h.logAndSendError(ctx, req, logs.FailedToUploadObject, err)
|
||||
return
|
||||
}
|
||||
log.Debug(logs.ObjectUploaded,
|
||||
h.reqLogger(ctx).Debug(logs.ObjectUploaded,
|
||||
zap.String("oid", idObj.EncodeToString()),
|
||||
zap.String("FileName", file.FileName()),
|
||||
logs.TagField(logs.TagExternalStorage),
|
||||
)
|
||||
|
||||
addr := newAddress(bkt.CID, idObj)
|
||||
c.Response.Header.SetContentType(jsonHeader)
|
||||
req.Response.Header.SetContentType(jsonHeader)
|
||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||
if err = newPutResponse(addr).encode(c); err != nil {
|
||||
log.Error(logs.CouldNotEncodeResponse, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not encode response", fasthttp.StatusBadRequest)
|
||||
if err = newPutResponse(addr).encode(req); err != nil {
|
||||
h.logAndSendError(ctx, req, logs.CouldNotEncodeResponse, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
func (h *Handler) uploadObject(ctx context.Context, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
|
||||
obj := object.New()
|
||||
obj.SetContainerID(bkt.CID)
|
||||
obj.SetOwnerID(*h.ownerID)
|
||||
|
@ -168,19 +158,18 @@ func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, att
|
|||
return idObj, nil
|
||||
}
|
||||
|
||||
func (h *Handler) extractAttributes(c *fasthttp.RequestCtx, log *zap.Logger, filtered map[string]string) ([]object.Attribute, error) {
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
func (h *Handler) extractAttributes(ctx context.Context, req *fasthttp.RequestCtx, filtered map[string]string) ([]object.Attribute, error) {
|
||||
now := time.Now()
|
||||
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||
if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
||||
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err),
|
||||
h.reqLogger(ctx).Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err),
|
||||
logs.TagField(logs.TagDatapath))
|
||||
} else {
|
||||
now = parsed
|
||||
}
|
||||
}
|
||||
if err := utils.PrepareExpirationHeader(ctx, h.frostfs, filtered, now); err != nil {
|
||||
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
return nil, err
|
||||
}
|
||||
attributes := make([]object.Attribute, 0, len(filtered))
|
||||
|
@ -207,38 +196,33 @@ func newAttribute(key string, val string) object.Attribute {
|
|||
|
||||
// explodeArchive read files from archive and creates objects for each of them.
|
||||
// Sets FilePath attribute with name from tar.Header.
|
||||
func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
|
||||
c, log := req.RequestCtx, req.log
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.explodeArchive")
|
||||
func (h *Handler) explodeArchive(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "handler.explodeArchive")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
// remove user attributes which vary for each file in archive
|
||||
// to guarantee that they won't appear twice
|
||||
delete(filtered, object.AttributeFileName)
|
||||
delete(filtered, object.AttributeFilePath)
|
||||
|
||||
commonAttributes, err := h.extractAttributes(c, log, filtered)
|
||||
commonAttributes, err := h.extractAttributes(ctx, req, filtered)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err)
|
||||
return
|
||||
}
|
||||
attributes := commonAttributes
|
||||
|
||||
reader := file
|
||||
if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
|
||||
log.Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath))
|
||||
if bytes.EqualFold(req.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
|
||||
h.reqLogger(ctx).Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath))
|
||||
gzipReader, err := gzip.NewReader(file)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToCreateGzipReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could read gzip file: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToCreateGzipReader, err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err := gzipReader.Close(); err != nil {
|
||||
log.Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
}
|
||||
}()
|
||||
reader = gzipReader
|
||||
|
@ -250,8 +234,7 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
|||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
} else if err != nil {
|
||||
log.Error(logs.FailedToReadFileFromTar, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not get next entry: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToReadFileFromTar, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -265,13 +248,13 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
|||
attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name))
|
||||
attributes = append(attributes, newAttribute(object.AttributeFileName, fileName))
|
||||
|
||||
idObj, err := h.uploadObject(c, bkt, attributes, tarReader)
|
||||
idObj, err := h.uploadObject(ctx, bkt, attributes, tarReader)
|
||||
if err != nil {
|
||||
h.handlePutFrostFSErr(c, err, log)
|
||||
h.logAndSendError(ctx, req, logs.FailedToUploadObject, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug(logs.ObjectUploaded,
|
||||
h.reqLogger(ctx).Debug(logs.ObjectUploaded,
|
||||
zap.String("oid", idObj.EncodeToString()),
|
||||
zap.String("FileName", fileName),
|
||||
logs.TagField(logs.TagExternalStorage),
|
||||
|
@ -279,14 +262,6 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
|||
}
|
||||
}
|
||||
|
||||
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {
|
||||
statusCode, msg, additionalFields := formErrorResponse("could not store file in frostfs", err)
|
||||
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
|
||||
|
||||
log.Error(logs.CouldNotStoreFileInFrostfs, append(logFields, logs.TagField(logs.TagExternalStorage))...)
|
||||
ResponseError(r, msg, statusCode)
|
||||
}
|
||||
|
||||
func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token {
|
||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
|
||||
return tkn
|
||||
|
|
|
@ -5,13 +5,12 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
sdkstatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -19,30 +18,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type request struct {
|
||||
*fasthttp.RequestCtx
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
func newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
|
||||
return request{
|
||||
RequestCtx: ctx,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *request) handleFrostFSErr(err error, start time.Time) {
|
||||
logFields := []zap.Field{
|
||||
zap.Stringer("elapsed", time.Since(start)),
|
||||
zap.Error(err),
|
||||
}
|
||||
statusCode, msg, additionalFields := formErrorResponse("could not receive object", err)
|
||||
logFields = append(logFields, additionalFields...)
|
||||
|
||||
r.log.Error(logs.CouldNotReceiveObject, append(logFields, logs.TagField(logs.TagExternalStorage))...)
|
||||
ResponseError(r.RequestCtx, msg, statusCode)
|
||||
}
|
||||
|
||||
func bearerToken(ctx context.Context) *bearer.Token {
|
||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
||||
return tkn
|
||||
|
@ -84,14 +59,16 @@ func isValidValue(s string) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
|
||||
log.Error(logs.CouldNotGetBucket, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
func (h *Handler) reqLogger(ctx context.Context) *zap.Logger {
|
||||
r.loginov marked this conversation as resolved
Outdated
|
||||
return utils.GetReqLogOrDefault(ctx, h.log)
|
||||
}
|
||||
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
ResponseError(c, "Not Found", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
ResponseError(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
func (h *Handler) logAndSendError(ctx context.Context, c *fasthttp.RequestCtx, msg string, err error, additional ...zap.Field) {
|
||||
utils.GetReqLogOrDefault(ctx, h.log).Error(msg,
|
||||
append([]zap.Field{zap.Error(err), logs.TagField(logs.TagDatapath)}, additional...)...)
|
||||
|
||||
msg, code := formErrorResponse(err)
|
||||
ResponseError(c, msg, code)
|
||||
}
|
||||
|
||||
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
||||
|
@ -112,31 +89,23 @@ func ResponseError(r *fasthttp.RequestCtx, msg string, code int) {
|
|||
r.Error(msg+"\n", code)
|
||||
}
|
||||
|
||||
func formErrorResponse(message string, err error) (int, string, []zap.Field) {
|
||||
var (
|
||||
msg string
|
||||
statusCode int
|
||||
logFields []zap.Field
|
||||
)
|
||||
|
||||
st := new(sdkstatus.ObjectAccessDenied)
|
||||
|
||||
func formErrorResponse(err error) (string, int) {
|
||||
switch {
|
||||
case errors.As(err, &st):
|
||||
statusCode = fasthttp.StatusForbidden
|
||||
reason := st.Reason()
|
||||
msg = fmt.Sprintf("%s: %v: %s", message, err, reason)
|
||||
logFields = append(logFields, zap.String("error_detail", reason))
|
||||
case errors.Is(err, ErrAccessDenied):
|
||||
return fmt.Sprintf("Storage Access Denied:\n%v", err), fasthttp.StatusForbidden
|
||||
case errors.Is(err, layer.ErrNodeAccessDenied):
|
||||
return fmt.Sprintf("Tree Access Denied:\n%v", err), fasthttp.StatusForbidden
|
||||
case errors.Is(err, ErrQuotaLimitReached):
|
||||
statusCode = fasthttp.StatusConflict
|
||||
msg = fmt.Sprintf("%s: %v", message, err)
|
||||
case client.IsErrObjectNotFound(err) || client.IsErrContainerNotFound(err):
|
||||
statusCode = fasthttp.StatusNotFound
|
||||
msg = "Not Found"
|
||||
return fmt.Sprintf("Quota Reached:\n%v", err), fasthttp.StatusConflict
|
||||
case errors.Is(err, ErrContainerNotFound):
|
||||
return fmt.Sprintf("Container Not Found:\n%v", err), fasthttp.StatusNotFound
|
||||
case errors.Is(err, ErrObjectNotFound):
|
||||
return fmt.Sprintf("Object Not Found:\n%v", err), fasthttp.StatusNotFound
|
||||
case errors.Is(err, layer.ErrNodeNotFound):
|
||||
return fmt.Sprintf("Tree Node Not Found:\n%v", err), fasthttp.StatusNotFound
|
||||
case errors.Is(err, ErrGatewayTimeout):
|
||||
return fmt.Sprintf("Gateway Timeout:\n%v", err), fasthttp.StatusGatewayTimeout
|
||||
r.loginov marked this conversation as resolved
Outdated
r.loginov
commented
Should we also note in the API documentation the possibility of returning a 504 GatewayTimeout? Should we also note in the API documentation the possibility of returning a 504 GatewayTimeout?
dkirillov
commented
Then we probably should mention all codes (even this) for any endpoint 🤔 Then we probably should mention all codes (even [this](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/pulls/221#issuecomment-70101)) for any endpoint 🤔
r.loginov
commented
Regarding this - I was considering it from the point of view that we can catch quota overrun only when loading an object. And it seems that we can catch 504 in many places, but it is not mentioned in the documentation. But I understand the idea, so let's leave it as it is now. Regarding [this](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/pulls/221#issuecomment-70101) - I was considering it from the point of view that we can catch quota overrun only when loading an object. And it seems that we can catch 504 in many places, but it is not mentioned in the documentation.
But I understand the idea, so let's leave it as it is now.
|
||||
default:
|
||||
statusCode = fasthttp.StatusBadRequest
|
||||
msg = fmt.Sprintf("%s: %v", message, err)
|
||||
return fmt.Sprintf("Bad Request:\n%v", err), fasthttp.StatusBadRequest
|
||||
}
|
||||
|
||||
return statusCode, msg, logFields
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ const (
|
|||
// Log messages with the "datapath" tag.
|
||||
const (
|
||||
CouldntParseCreationDate = "couldn't parse creation date"
|
||||
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload"
|
||||
FailedToDetectContentTypeFromPayload = "failed to detect Content-Type from payload"
|
||||
FailedToAddObjectToArchive = "failed to add object to archive"
|
||||
CloseZipWriter = "close zip writer"
|
||||
IgnorePartEmptyFormName = "ignore part, empty form name"
|
||||
|
@ -105,9 +105,21 @@ const (
|
|||
CouldNotReceiveMultipartForm = "could not receive multipart/form"
|
||||
ObjectsNotFound = "objects not found"
|
||||
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed"
|
||||
CouldNotGetBucket = "could not get bucket"
|
||||
CouldNotResolveContainerID = "could not resolve container id"
|
||||
FailedToSumbitTaskToPool = "failed to submit task to pool"
|
||||
FailedToGetBucketInfo = "could not get bucket info"
|
||||
FailedToSubmitTaskToPool = "failed to submit task to pool"
|
||||
ObjectWasDeleted = "object was deleted"
|
||||
FailedToGetLatestVersionOfObject = "failed to get latest version of object"
|
||||
FailedToCheckIfSettingsNodeExist = "failed to check if settings node exists"
|
||||
FailedToListObjects = "failed to list objects"
|
||||
FailedToParseTemplate = "failed to parse template"
|
||||
FailedToExecuteTemplate = "failed to execute template"
|
||||
FailedToUploadObject = "failed to upload object"
|
||||
FailedToHeadObject = "failed to head object"
|
||||
FailedToGetObject = "failed to get object"
|
||||
FailedToGetObjectPayload = "failed to get object payload"
|
||||
FailedToFindObjectByAttribute = "failed to get find object by attribute"
|
||||
FailedToUnescapeOIDParam = "failed to unescape oid param"
|
||||
InvalidOIDParam = "invalid oid param"
|
||||
CouldNotGetCORSConfiguration = "could not get cors configuration"
|
||||
EmptyOriginRequestHeader = "empty Origin request header"
|
||||
EmptyAccessControlRequestMethodHeader = "empty Access-Control-Request-Method request header"
|
||||
|
@ -117,21 +129,13 @@ const (
|
|||
|
||||
// Log messages with the "external_storage" tag.
|
||||
const (
|
||||
CouldNotReceiveObject = "could not receive object"
|
||||
CouldNotSearchForObjects = "could not search for objects"
|
||||
ObjectNotFound = "object not found"
|
||||
ReadObjectListFailed = "read object list failed"
|
||||
CouldNotStoreFileInFrostfs = "could not store file in frostfs"
|
||||
FailedToHeadObject = "failed to head object"
|
||||
alexvanin
commented
You moved it to datapath section because those logs are produced in You moved it to datapath section because those logs are produced in `handler` package?
dkirillov
commented
Because it's error that I want to see if datapath request failed. Otherwise we don't understand much from logs Because it's error that I want to see if datapath request failed. Otherwise we don't understand much from logs
|
||||
ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName"
|
||||
FailedToGetObject = "failed to get object"
|
||||
ObjectUploaded = "object uploaded"
|
||||
CouldNotGetContainerInfo = "could not get container info"
|
||||
)
|
||||
|
||||
// Log messages with the "external_storage_tree" tag.
|
||||
const (
|
||||
ObjectWasDeleted = "object was deleted"
|
||||
FailedToGetLatestVersionOfObject = "failed to get latest version of object"
|
||||
FailedToCheckIfSettingsNodeExist = "Failed to check if settings node exists"
|
||||
FoundSeveralSystemTreeNodes = "found several system tree nodes"
|
||||
)
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
|
@ -45,7 +46,7 @@ func (x *FrostFS) Container(ctx context.Context, containerPrm handler.PrmContain
|
|||
|
||||
res, err := x.pool.GetContainer(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read container via connection pool", err)
|
||||
return nil, handleStorageError("read container via connection pool", err)
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
|
@ -69,7 +70,7 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm handler.PrmObjectCreate)
|
|||
|
||||
idObj, err := x.pool.PutObject(ctx, prmPut)
|
||||
if err != nil {
|
||||
return oid.ID{}, handleObjectError("save object via connection pool", err)
|
||||
return oid.ID{}, handleStorageError("save object via connection pool", err)
|
||||
}
|
||||
return idObj.ObjectID, nil
|
||||
}
|
||||
|
@ -85,7 +86,7 @@ func (x payloadReader) Read(p []byte) (int, error) {
|
|||
if err != nil && errors.Is(err, io.EOF) {
|
||||
return n, err
|
||||
}
|
||||
return n, handleObjectError("read payload", err)
|
||||
return n, handleStorageError("read payload", err)
|
||||
}
|
||||
|
||||
// HeadObject implements frostfs.FrostFS interface method.
|
||||
|
@ -102,7 +103,7 @@ func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*o
|
|||
|
||||
res, err := x.pool.HeadObject(ctx, prmHead)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read object header via connection pool", err)
|
||||
return nil, handleStorageError("read object header via connection pool", err)
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
|
@ -122,7 +123,7 @@ func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*han
|
|||
|
||||
res, err := x.pool.GetObject(ctx, prmGet)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init full object reading via connection pool", err)
|
||||
return nil, handleStorageError("init full object reading via connection pool", err)
|
||||
}
|
||||
|
||||
return &handler.Object{
|
||||
|
@ -147,7 +148,7 @@ func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (
|
|||
|
||||
res, err := x.pool.ObjectRange(ctx, prmRange)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init payload range reading via connection pool", err)
|
||||
return nil, handleStorageError("init payload range reading via connection pool", err)
|
||||
}
|
||||
|
||||
return payloadReader{&res}, nil
|
||||
|
@ -168,7 +169,7 @@ func (x *FrostFS) SearchObjects(ctx context.Context, prm handler.PrmObjectSearch
|
|||
|
||||
res, err := x.pool.SearchObjects(ctx, prmSearch)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init object search via connection pool", err)
|
||||
return nil, handleStorageError("init object search via connection pool", err)
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
|
@ -202,7 +203,7 @@ func (x *FrostFS) NetmapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
|||
|
||||
netmapSnapshot, err := x.pool.NetMapSnapshot(ctx)
|
||||
if err != nil {
|
||||
return netmapSnapshot, handleObjectError("get netmap via connection pool", err)
|
||||
return netmapSnapshot, handleStorageError("get netmap via connection pool", err)
|
||||
}
|
||||
|
||||
return netmapSnapshot, nil
|
||||
|
@ -226,7 +227,7 @@ func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
|
|||
|
||||
networkInfo, err := x.pool.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return "", handleObjectError("read network info via client", err)
|
||||
return "", handleStorageError("read network info via client", err)
|
||||
}
|
||||
|
||||
domain := networkInfo.RawNetworkParameter("SystemDNS")
|
||||
|
@ -237,7 +238,7 @@ func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
|
|||
return string(domain), nil
|
||||
}
|
||||
|
||||
func handleObjectError(msg string, err error) error {
|
||||
func handleStorageError(msg string, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -250,6 +251,14 @@ func handleObjectError(msg string, err error) error {
|
|||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason)
|
||||
}
|
||||
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrContainerNotFound, err.Error())
|
||||
}
|
||||
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrObjectNotFound, err.Error())
|
||||
}
|
||||
|
||||
if IsTimeoutError(err) {
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error())
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ func TestHandleObjectError(t *testing.T) {
|
|||
msg := "some msg"
|
||||
|
||||
t.Run("nil error", func(t *testing.T) {
|
||||
err := handleObjectError(msg, nil)
|
||||
err := handleStorageError(msg, nil)
|
||||
require.Nil(t, err)
|
||||
})
|
||||
|
||||
|
@ -27,7 +27,7 @@ func TestHandleObjectError(t *testing.T) {
|
|||
inputErr := new(apistatus.ObjectAccessDenied)
|
||||
inputErr.WriteReason(reason)
|
||||
|
||||
err := handleObjectError(msg, inputErr)
|
||||
err := handleStorageError(msg, inputErr)
|
||||
require.ErrorIs(t, err, handler.ErrAccessDenied)
|
||||
require.Contains(t, err.Error(), reason)
|
||||
require.Contains(t, err.Error(), msg)
|
||||
|
@ -38,7 +38,7 @@ func TestHandleObjectError(t *testing.T) {
|
|||
inputErr := new(apistatus.ObjectAccessDenied)
|
||||
inputErr.WriteReason(reason)
|
||||
|
||||
err := handleObjectError(msg, inputErr)
|
||||
err := handleStorageError(msg, inputErr)
|
||||
require.ErrorIs(t, err, handler.ErrQuotaLimitReached)
|
||||
require.Contains(t, err.Error(), reason)
|
||||
require.Contains(t, err.Error(), msg)
|
||||
|
@ -47,7 +47,7 @@ func TestHandleObjectError(t *testing.T) {
|
|||
t.Run("simple timeout", func(t *testing.T) {
|
||||
inputErr := errors.New("timeout")
|
||||
|
||||
err := handleObjectError(msg, inputErr)
|
||||
err := handleStorageError(msg, inputErr)
|
||||
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
||||
require.Contains(t, err.Error(), inputErr.Error())
|
||||
require.Contains(t, err.Error(), msg)
|
||||
|
@ -58,7 +58,7 @@ func TestHandleObjectError(t *testing.T) {
|
|||
defer cancel()
|
||||
<-ctx.Done()
|
||||
|
||||
err := handleObjectError(msg, ctx.Err())
|
||||
err := handleStorageError(msg, ctx.Err())
|
||||
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
||||
require.Contains(t, err.Error(), ctx.Err().Error())
|
||||
require.Contains(t, err.Error(), msg)
|
||||
|
@ -67,7 +67,7 @@ func TestHandleObjectError(t *testing.T) {
|
|||
t.Run("grpc deadline exceeded", func(t *testing.T) {
|
||||
inputErr := fmt.Errorf("wrap grpc error: %w", status.Error(codes.DeadlineExceeded, "error"))
|
||||
|
||||
err := handleObjectError(msg, inputErr)
|
||||
err := handleStorageError(msg, inputErr)
|
||||
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
||||
require.Contains(t, err.Error(), inputErr.Error())
|
||||
require.Contains(t, err.Error(), msg)
|
||||
|
@ -76,7 +76,7 @@ func TestHandleObjectError(t *testing.T) {
|
|||
t.Run("unknown error", func(t *testing.T) {
|
||||
inputErr := errors.New("unknown error")
|
||||
|
||||
err := handleObjectError(msg, inputErr)
|
||||
err := handleStorageError(msg, inputErr)
|
||||
require.ErrorIs(t, err, inputErr)
|
||||
require.Contains(t, err.Error(), msg)
|
||||
})
|
||||
|
|
|
@ -63,7 +63,7 @@ func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([
|
|||
|
||||
nodes, err := w.p.GetNodes(ctx, poolPrm)
|
||||
if err != nil {
|
||||
return nil, handleError(err)
|
||||
return nil, handleTreeError(err)
|
||||
}
|
||||
|
||||
res := make([]tree.NodeResponse, len(nodes))
|
||||
|
@ -82,7 +82,7 @@ func getBearer(ctx context.Context) []byte {
|
|||
return token.Marshal()
|
||||
}
|
||||
|
||||
func handleError(err error) error {
|
||||
func handleTreeError(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
|
|||
|
||||
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
|
||||
if err != nil {
|
||||
return nil, handleError(err)
|
||||
return nil, handleTreeError(err)
|
||||
}
|
||||
|
||||
var subtree []tree.NodeResponse
|
||||
|
@ -133,7 +133,7 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
|
|||
node, err = subTreeReader.Next()
|
||||
}
|
||||
if err != io.EOF {
|
||||
return nil, handleError(err)
|
||||
return nil, handleTreeError(err)
|
||||
}
|
||||
|
||||
return subtree, nil
|
||||
|
|
17
tree/tree.go
17
tree/tree.go
|
@ -8,14 +8,18 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
Tree struct {
|
||||
service ServiceClient
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
// ServiceClient is a client to interact with tree service.
|
||||
|
@ -73,8 +77,8 @@ const (
|
|||
)
|
||||
|
||||
// NewTree creates instance of Tree using provided address and create grpc connection.
|
||||
func NewTree(service ServiceClient) *Tree {
|
||||
return &Tree{service: service}
|
||||
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
|
||||
return &Tree{service: service, log: log}
|
||||
}
|
||||
|
||||
type Meta interface {
|
||||
|
@ -257,6 +261,9 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name
|
|||
if len(nodes) == 0 {
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
if len(nodes) != 1 {
|
||||
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemTreeNodes, zap.String("name", name), logs.TagField(logs.TagExternalStorageTree))
|
||||
}
|
||||
|
||||
return newMultiNode(nodes)
|
||||
}
|
||||
|
@ -296,7 +303,7 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
|
|||
}
|
||||
|
||||
if targetIndexNode == -1 {
|
||||
return nil, layer.ErrNodeNotFound
|
||||
return nil, fmt.Errorf("latest version: %w", layer.ErrNodeNotFound)
|
||||
}
|
||||
|
||||
return nodes[targetIndexNode], nil
|
||||
|
@ -423,6 +430,10 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr
|
|||
return intermediateNodes, nil
|
||||
}
|
||||
|
||||
func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
|
||||
return utils.GetReqLogOrDefault(ctx, c.log)
|
||||
}
|
||||
|
||||
func GetFilename(node NodeResponse) string {
|
||||
for _, kv := range node.GetMeta() {
|
||||
if kv.GetKey() == FileNameKey {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue
question: is this function intentionally not used here and here? It's just that if within the handler package we wrap the
utils.GetReqLogOrDefault(ctx, h.log)
function inreqLogger(ctx context.Context)
, don't we want to use it wherever possible?Just missed that