[#191] Refactor error handling and logging #221
17 changed files with 393 additions and 374 deletions
|
@ -606,7 +606,7 @@ func (a *app) Serve() {
|
||||||
close(a.webDone)
|
close(a.webDone)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
handle := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool)
|
handle := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool), a.log), workerPool)
|
||||||
|
|
||||||
// Configure router.
|
// Configure router.
|
||||||
a.configureRouter(handle)
|
a.configureRouter(handle)
|
||||||
|
@ -734,7 +734,7 @@ func (a *app) configureRouter(h *handler.Handler) {
|
||||||
r := router.New()
|
r := router.New()
|
||||||
r.RedirectTrailingSlash = true
|
r.RedirectTrailingSlash = true
|
||||||
r.NotFound = func(r *fasthttp.RequestCtx) {
|
r.NotFound = func(r *fasthttp.RequestCtx) {
|
||||||
handler.ResponseError(r, "Not found", fasthttp.StatusNotFound)
|
handler.ResponseError(r, "Route Not found", fasthttp.StatusNotFound)
|
||||||
}
|
}
|
||||||
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
|
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
|
||||||
handler.ResponseError(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
handler.ResponseError(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||||||
|
|
|
@ -20,9 +20,11 @@ import (
|
||||||
|
|
||||||
containerv2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
containerv2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -94,6 +96,7 @@ func TestIntegration(t *testing.T) {
|
||||||
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID) })
|
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID) })
|
||||||
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID) })
|
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID) })
|
||||||
t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID) })
|
t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID) })
|
||||||
|
t.Run("test status codes "+version, func(t *testing.T) { checkStatusCodes(ctx, t, clientPool, ownerID, version) })
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
server.Wait()
|
server.Wait()
|
||||||
|
@ -260,7 +263,7 @@ func putWithDuplicateKeys(t *testing.T, CID cid.ID) {
|
||||||
|
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, "key duplication error: "+attr+"\n", string(body))
|
require.Contains(t, string(body), "key duplication error: "+attr+"\n")
|
||||||
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -429,7 +432,80 @@ func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
|
||||||
resp, err = http.DefaultClient.Do(req)
|
resp, err = http.DefaultClient.Do(req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkStatusCodes(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, version string) {
|
||||||
|
cli := http.Client{Timeout: 30 * time.Second}
|
||||||
|
|
||||||
|
t.Run("container not found by name", func(t *testing.T) {
|
||||||
|
resp, err := cli.Get(testHost + "/get/unknown/object")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||||
|
requireBodyContains(t, resp, "container not found")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("container not found by cid", func(t *testing.T) {
|
||||||
|
cnrIDTest := cidtest.ID()
|
||||||
|
resp, err := cli.Get(testHost + "/get/" + cnrIDTest.EncodeToString() + "/object")
|
||||||
|
require.NoError(t, err)
|
||||||
|
requireBodyContains(t, resp, "container not found")
|
||||||
|
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("object not found in storage", func(t *testing.T) {
|
||||||
|
resp, err := cli.Get(testHost + "/get_by_attribute/" + testContainerName + "/FilePath/object2")
|
||||||
|
require.NoError(t, err)
|
||||||
|
requireBodyContains(t, resp, "object not found")
|
||||||
|
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("access denied", func(t *testing.T) {
|
||||||
|
basicACL := acl.Private
|
||||||
|
var recs []*eacl.Record
|
||||||
|
if version == "1.2.7" {
|
||||||
|
basicACL = acl.PublicRWExtended
|
||||||
|
rec := eacl.NewRecord()
|
||||||
|
rec.SetAction(eacl.ActionDeny)
|
||||||
|
rec.SetOperation(eacl.OperationGet)
|
||||||
|
recs = append(recs, rec)
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrID, err := createContainerBase(ctx, t, clientPool, ownerID, basicACL, "")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
jsonToken, _ := makeBearerTokens(t, key, ownerID, version, recs...)
|
||||||
|
|
||||||
|
t.Run("get", func(t *testing.T) {
|
||||||
|
request, err := http.NewRequest(http.MethodGet, testHost+"/get/"+cnrID.EncodeToString()+"/object", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
request.Header.Set("Authorization", "Bearer "+jsonToken)
|
||||||
|
|
||||||
|
resp, err := cli.Do(request)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requireBodyContains(t, resp, "access denied")
|
||||||
|
require.Equal(t, http.StatusForbidden, resp.StatusCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("upload", func(t *testing.T) {
|
||||||
|
request, _, _ := makePutRequest(t, testHost+"/upload/"+cnrID.EncodeToString())
|
||||||
|
request.Header.Set("Authorization", "Bearer "+jsonToken)
|
||||||
|
|
||||||
|
resp, err := cli.Do(request)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requireBodyContains(t, resp, "access denied")
|
||||||
|
require.Equal(t, http.StatusForbidden, resp.StatusCode)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func requireBodyContains(t *testing.T, resp *http.Response, msg string) {
|
||||||
|
data, err := io.ReadAll(resp.Body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
require.Contains(t, strings.ToLower(string(data)), strings.ToLower(msg))
|
||||||
}
|
}
|
||||||
|
|
||||||
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
|
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
|
||||||
|
@ -478,6 +554,10 @@ func getPool(ctx context.Context, t *testing.T, key *keys.PrivateKey) *pool.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID) (cid.ID, error) {
|
func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID) (cid.ID, error) {
|
||||||
|
return createContainerBase(ctx, t, clientPool, ownerID, acl.PublicRWExtended, testContainerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func createContainerBase(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, basicACL acl.Basic, cnrName string) (cid.ID, error) {
|
||||||
var policy netmap.PlacementPolicy
|
var policy netmap.PlacementPolicy
|
||||||
err := policy.DecodeString("REP 1")
|
err := policy.DecodeString("REP 1")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -485,24 +565,28 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
|
||||||
var cnr container.Container
|
var cnr container.Container
|
||||||
cnr.Init()
|
cnr.Init()
|
||||||
cnr.SetPlacementPolicy(policy)
|
cnr.SetPlacementPolicy(policy)
|
||||||
cnr.SetBasicACL(acl.PublicRWExtended)
|
cnr.SetBasicACL(basicACL)
|
||||||
cnr.SetOwner(ownerID)
|
cnr.SetOwner(ownerID)
|
||||||
|
|
||||||
container.SetCreationTime(&cnr, time.Now())
|
container.SetCreationTime(&cnr, time.Now())
|
||||||
|
|
||||||
|
if cnrName != "" {
|
||||||
var domain container.Domain
|
var domain container.Domain
|
||||||
domain.SetName(testContainerName)
|
domain.SetName(cnrName)
|
||||||
|
|
||||||
cnr.SetAttribute(containerv2.SysAttributeName, domain.Name())
|
cnr.SetAttribute(containerv2.SysAttributeName, domain.Name())
|
||||||
cnr.SetAttribute(containerv2.SysAttributeZone, domain.Zone())
|
cnr.SetAttribute(containerv2.SysAttributeZone, domain.Zone())
|
||||||
|
}
|
||||||
|
|
||||||
var waitPrm pool.WaitParams
|
prm := pool.PrmContainerPut{
|
||||||
waitPrm.SetTimeout(15 * time.Second)
|
ClientParams: client.PrmContainerPut{
|
||||||
waitPrm.SetPollInterval(3 * time.Second)
|
Container: &cnr,
|
||||||
|
},
|
||||||
var prm pool.PrmContainerPut
|
WaitParams: &pool.WaitParams{
|
||||||
prm.SetContainer(cnr)
|
Timeout: 15 * time.Second,
|
||||||
prm.SetWaitParams(waitPrm)
|
PollInterval: 3 * time.Second,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
CID, err := clientPool.PutContainer(ctx, prm)
|
CID, err := clientPool.PutContainer(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -549,13 +633,18 @@ func registerUser(t *testing.T, ctx context.Context, aioContainer testcontainers
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeBearerTokens(t *testing.T, key *keys.PrivateKey, ownerID user.ID, version string) (jsonTokenBase64, binaryTokenBase64 string) {
|
func makeBearerTokens(t *testing.T, key *keys.PrivateKey, ownerID user.ID, version string, records ...*eacl.Record) (jsonTokenBase64, binaryTokenBase64 string) {
|
||||||
tkn := new(bearer.Token)
|
tkn := new(bearer.Token)
|
||||||
tkn.ForUser(ownerID)
|
tkn.ForUser(ownerID)
|
||||||
tkn.SetExp(10000)
|
tkn.SetExp(10000)
|
||||||
|
|
||||||
if version == "1.2.7" {
|
if version == "1.2.7" {
|
||||||
tkn.SetEACLTable(*eacl.NewTable())
|
table := eacl.NewTable()
|
||||||
|
for i := range records {
|
||||||
|
table.AddRecord(records[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
tkn.SetEACLTable(*table)
|
||||||
} else {
|
} else {
|
||||||
tkn.SetImpersonate(true)
|
tkn.SetImpersonate(true)
|
||||||
}
|
}
|
||||||
|
|
13
docs/api.md
13
docs/api.md
|
@ -94,6 +94,8 @@ The `filename` field from the multipart form will be set as `FileName` attribute
|
||||||
|--------|----------------------------------------------|
|
|--------|----------------------------------------------|
|
||||||
| 200 | Object created successfully. |
|
| 200 | Object created successfully. |
|
||||||
| 400 | Some error occurred during object uploading. |
|
| 400 | Some error occurred during object uploading. |
|
||||||
|
| 403 | Access denied. |
|
||||||
|
| 409 | Can not upload object due to quota reached. |
|
||||||
|
|
||||||
## Get object
|
## Get object
|
||||||
|
|
||||||
|
@ -141,6 +143,7 @@ Get an object (payload and attributes) by an address.
|
||||||
|--------|------------------------------------------------|
|
|--------|------------------------------------------------|
|
||||||
| 200 | Object got successfully. |
|
| 200 | Object got successfully. |
|
||||||
| 400 | Some error occurred during object downloading. |
|
| 400 | Some error occurred during object downloading. |
|
||||||
|
| 403 | Access denied. |
|
||||||
| 404 | Container or object not found. |
|
| 404 | Container or object not found. |
|
||||||
|
|
||||||
###### Body
|
###### Body
|
||||||
|
@ -183,6 +186,7 @@ Get an object attributes by an address.
|
||||||
|--------|---------------------------------------------------|
|
|--------|---------------------------------------------------|
|
||||||
| 200 | Object head successfully. |
|
| 200 | Object head successfully. |
|
||||||
| 400 | Some error occurred during object HEAD operation. |
|
| 400 | Some error occurred during object HEAD operation. |
|
||||||
|
| 403 | Access denied. |
|
||||||
| 404 | Container or object not found. |
|
| 404 | Container or object not found. |
|
||||||
|
|
||||||
## Search object
|
## Search object
|
||||||
|
@ -233,6 +237,7 @@ If more than one object is found, an arbitrary one will be returned.
|
||||||
|--------|------------------------------------------------|
|
|--------|------------------------------------------------|
|
||||||
| 200 | Object got successfully. |
|
| 200 | Object got successfully. |
|
||||||
| 400 | Some error occurred during object downloading. |
|
| 400 | Some error occurred during object downloading. |
|
||||||
|
| 403 | Access denied. |
|
||||||
| 404 | Container or object not found. |
|
| 404 | Container or object not found. |
|
||||||
|
|
||||||
#### HEAD
|
#### HEAD
|
||||||
|
@ -269,6 +274,7 @@ If more than one object is found, an arbitrary one will be used to get attribute
|
||||||
|--------|---------------------------------------|
|
|--------|---------------------------------------|
|
||||||
| 200 | Object head successfully. |
|
| 200 | Object head successfully. |
|
||||||
| 400 | Some error occurred during operation. |
|
| 400 | Some error occurred during operation. |
|
||||||
|
| 403 | Access denied. |
|
||||||
| 404 | Container or object not found. |
|
| 404 | Container or object not found. |
|
||||||
|
|
||||||
## Download archive
|
## Download archive
|
||||||
|
@ -305,15 +311,16 @@ Archive can be compressed (see http-gw [configuration](gate-configuration.md#arc
|
||||||
###### Headers
|
###### Headers
|
||||||
|
|
||||||
| Header | Description |
|
| Header | Description |
|
||||||
|-----------------------|-------------------------------------------------------------------------------------------------------------------|
|
|-----------------------|---------------------------------------------------------------------------------------------|
|
||||||
| `Content-Disposition` | Indicate how to browsers should treat file (`attachment`). Set `filename` as `archive.zip`. |
|
| `Content-Disposition` | Indicate how to browsers should treat file (`attachment`). Set `filename` as `archive.zip`. |
|
||||||
| `Content-Type` | Indicate content type of object. Set to `application/zip` |
|
| `Content-Type` | Indicate content type of object. Set to `application/zip` |
|
||||||
|
|
||||||
###### Status codes
|
###### Status codes
|
||||||
|
|
||||||
| Status | Description |
|
| Status | Description |
|
||||||
|--------|-----------------------------------------------------|
|
|--------|------------------------------------------------|
|
||||||
| 200 | Object got successfully. |
|
| 200 | Object got successfully. |
|
||||||
| 400 | Some error occurred during object downloading. |
|
| 400 | Some error occurred during object downloading. |
|
||||||
|
| 403 | Access denied. |
|
||||||
| 404 | Container or objects not found. |
|
| 404 | Container or objects not found. |
|
||||||
| 500 | Some inner error (e.g. error on streaming objects). |
|
| 409 | Can not upload object due to quota reached. |
|
||||||
|
|||||||
|
|
|
@ -273,7 +273,7 @@ func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs Re
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err), logs.TagField(logs.TagDatapath))
|
log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -328,20 +328,18 @@ type browseParams struct {
|
||||||
listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error)
|
listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
|
func (h *Handler) browseObjects(ctx context.Context, req *fasthttp.RequestCtx, p browseParams) {
|
||||||
const S3Protocol = "s3"
|
const S3Protocol = "s3"
|
||||||
const FrostfsProtocol = "frostfs"
|
const FrostfsProtocol = "frostfs"
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
|
||||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
|
||||||
log := reqLog.With(
|
|
||||||
zap.String("bucket", p.bucketInfo.Name),
|
zap.String("bucket", p.bucketInfo.Name),
|
||||||
zap.String("container", p.bucketInfo.CID.EncodeToString()),
|
zap.String("container", p.bucketInfo.CID.EncodeToString()),
|
||||||
zap.String("prefix", p.prefix),
|
zap.String("prefix", p.prefix),
|
||||||
)
|
))
|
||||||
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
|
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToListObjects, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -360,7 +358,7 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
|
||||||
"parentDir": parentDir,
|
"parentDir": parentDir,
|
||||||
}).Parse(h.config.IndexPageTemplate())
|
}).Parse(h.config.IndexPageTemplate())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToParseTemplate, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
bucketName := p.bucketInfo.Name
|
bucketName := p.bucketInfo.Name
|
||||||
|
@ -369,14 +367,14 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
|
||||||
bucketName = p.bucketInfo.CID.EncodeToString()
|
bucketName = p.bucketInfo.CID.EncodeToString()
|
||||||
protocol = FrostfsProtocol
|
protocol = FrostfsProtocol
|
||||||
}
|
}
|
||||||
if err = tmpl.Execute(c, &BrowsePageData{
|
if err = tmpl.Execute(req, &BrowsePageData{
|
||||||
Container: bucketName,
|
Container: bucketName,
|
||||||
Prefix: p.prefix,
|
Prefix: p.prefix,
|
||||||
Objects: objects,
|
Objects: objects,
|
||||||
Protocol: protocol,
|
Protocol: protocol,
|
||||||
HasErrors: resp.hasErrors,
|
HasErrors: resp.hasErrors,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToExecuteTemplate, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,43 +25,38 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
|
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
|
||||||
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
func (h *Handler) DownloadByAddressOrBucketName(req *fasthttp.RequestCtx) {
|
||||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadByAddressOrBucketName")
|
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadByAddressOrBucketName")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
utils.SetContextToRequest(ctx, c)
|
|
||||||
|
|
||||||
cidParam := c.UserValue("cid").(string)
|
cidParam := req.UserValue("cid").(string)
|
||||||
oidParam := c.UserValue("oid").(string)
|
oidParam := req.UserValue("oid").(string)
|
||||||
downloadParam := c.QueryArgs().GetBool("download")
|
downloadParam := req.QueryArgs().GetBool("download")
|
||||||
|
|
||||||
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
|
||||||
zap.String("cid", cidParam),
|
zap.String("cid", cidParam),
|
||||||
zap.String("oid", oidParam),
|
zap.String("oid", oidParam),
|
||||||
)
|
))
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
bktInfo, err := h.getBucketInfo(ctx, cidParam)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
||||||
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
||||||
log.Error(logs.FailedToCheckIfSettingsNodeExist, zap.String("cid", bktInfo.CID.String()),
|
h.logAndSendError(ctx, req, logs.FailedToCheckIfSettingsNodeExist, checkS3Err)
|
||||||
zap.Error(checkS3Err), logs.TagField(logs.TagExternalStorageTree))
|
|
||||||
logAndSendBucketError(c, log, checkS3Err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
req := newRequest(c, log)
|
|
||||||
|
|
||||||
var objID oid.ID
|
var objID oid.ID
|
||||||
if checkS3Err == nil && shouldDownload(oidParam, downloadParam) {
|
if checkS3Err == nil && shouldDownload(oidParam, downloadParam) {
|
||||||
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.receiveFile)
|
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.receiveFile)
|
||||||
} else if err = objID.DecodeString(oidParam); err == nil {
|
} else if err = objID.DecodeString(oidParam); err == nil {
|
||||||
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.receiveFile)
|
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.receiveFile)
|
||||||
} else {
|
} else {
|
||||||
h.browseIndex(c, checkS3Err != nil)
|
h.browseIndex(ctx, req, cidParam, oidParam, checkS3Err != nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,12 +65,11 @@ func shouldDownload(oidParam string, downloadParam bool) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadByAttribute handles attribute-based download requests.
|
// DownloadByAttribute handles attribute-based download requests.
|
||||||
func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
func (h *Handler) DownloadByAttribute(req *fasthttp.RequestCtx) {
|
||||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadByAttribute")
|
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadByAttribute")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
utils.SetContextToRequest(ctx, c)
|
|
||||||
|
|
||||||
h.byAttribute(c, h.receiveFile)
|
h.byAttribute(ctx, req, h.receiveFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
|
func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
|
||||||
|
@ -95,31 +89,33 @@ func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadZip handles zip by prefix requests.
|
// DownloadZip handles zip by prefix requests.
|
||||||
func (h *Handler) DownloadZip(c *fasthttp.RequestCtx) {
|
func (h *Handler) DownloadZip(req *fasthttp.RequestCtx) {
|
||||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadZip")
|
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadZip")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
utils.SetContextToRequest(ctx, c)
|
|
||||||
|
|
||||||
scid, _ := c.UserValue("cid").(string)
|
scid, _ := req.UserValue("cid").(string)
|
||||||
|
prefix, _ := req.UserValue("prefix").(string)
|
||||||
|
|
||||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", scid), zap.String("prefix", prefix)))
|
||||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
|
||||||
|
bktInfo, err := h.getBucketInfo(ctx, scid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
|
|
||||||
|
resSearch, err := h.searchObjectsByPrefix(ctx, bktInfo.CID, prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
|
req.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
|
||||||
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
|
req.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
|
||||||
|
|
||||||
c.SetBodyStreamWriter(h.getZipResponseWriter(ctx, log, resSearch, bktInfo))
|
req.SetBodyStreamWriter(h.getZipResponseWriter(ctx, resSearch, bktInfo))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
func (h *Handler) getZipResponseWriter(ctx context.Context, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
||||||
return func(w *bufio.Writer) {
|
return func(w *bufio.Writer) {
|
||||||
defer resSearch.Close()
|
defer resSearch.Close()
|
||||||
|
|
||||||
|
@ -127,20 +123,20 @@ func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, res
|
||||||
zipWriter := zip.NewWriter(w)
|
zipWriter := zip.NewWriter(w)
|
||||||
var objectsWritten int
|
var objectsWritten int
|
||||||
|
|
||||||
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
|
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, bktInfo.CID, buf,
|
||||||
func(obj *object.Object) (io.Writer, error) {
|
func(obj *object.Object) (io.Writer, error) {
|
||||||
objectsWritten++
|
objectsWritten++
|
||||||
return h.createZipFile(zipWriter, obj)
|
return h.createZipFile(zipWriter, obj)
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
if errIter != nil {
|
if errIter != nil {
|
||||||
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
|
h.reqLogger(ctx).Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
|
||||||
return
|
return
|
||||||
} else if objectsWritten == 0 {
|
} else if objectsWritten == 0 {
|
||||||
log.Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
|
h.reqLogger(ctx).Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
if err := zipWriter.Close(); err != nil {
|
if err := zipWriter.Close(); err != nil {
|
||||||
log.Error(logs.CloseZipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.reqLogger(ctx).Error(logs.CloseZipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -164,31 +160,33 @@ func (h *Handler) createZipFile(zw *zip.Writer, obj *object.Object) (io.Writer,
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadTar forms tar.gz from objects by prefix.
|
// DownloadTar forms tar.gz from objects by prefix.
|
||||||
func (h *Handler) DownloadTar(c *fasthttp.RequestCtx) {
|
func (h *Handler) DownloadTar(req *fasthttp.RequestCtx) {
|
||||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.DownloadTar")
|
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.DownloadTar")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
utils.SetContextToRequest(ctx, c)
|
|
||||||
|
|
||||||
scid, _ := c.UserValue("cid").(string)
|
scid, _ := req.UserValue("cid").(string)
|
||||||
|
prefix, _ := req.UserValue("prefix").(string)
|
||||||
|
|
||||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", scid), zap.String("prefix", prefix)))
|
||||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
|
||||||
|
bktInfo, err := h.getBucketInfo(ctx, scid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
|
|
||||||
|
resSearch, err := h.searchObjectsByPrefix(ctx, bktInfo.CID, prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip")
|
req.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip")
|
||||||
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"")
|
req.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"")
|
||||||
|
|
||||||
c.SetBodyStreamWriter(h.getTarResponseWriter(ctx, log, resSearch, bktInfo))
|
req.SetBodyStreamWriter(h.getTarResponseWriter(ctx, resSearch, bktInfo))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
func (h *Handler) getTarResponseWriter(ctx context.Context, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
||||||
return func(w *bufio.Writer) {
|
return func(w *bufio.Writer) {
|
||||||
defer resSearch.Close()
|
defer resSearch.Close()
|
||||||
|
|
||||||
|
@ -203,26 +201,26 @@ func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, res
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := tarWriter.Close(); err != nil {
|
if err := tarWriter.Close(); err != nil {
|
||||||
log.Error(logs.CloseTarWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.reqLogger(ctx).Error(logs.CloseTarWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
if err := gzipWriter.Close(); err != nil {
|
if err := gzipWriter.Close(); err != nil {
|
||||||
log.Error(logs.CloseGzipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.reqLogger(ctx).Error(logs.CloseGzipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var objectsWritten int
|
var objectsWritten int
|
||||||
buf := make([]byte, 3<<20) // the same as for upload
|
buf := make([]byte, 3<<20) // the same as for upload
|
||||||
|
|
||||||
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
|
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, bktInfo.CID, buf,
|
||||||
func(obj *object.Object) (io.Writer, error) {
|
func(obj *object.Object) (io.Writer, error) {
|
||||||
objectsWritten++
|
objectsWritten++
|
||||||
return h.createTarFile(tarWriter, obj)
|
return h.createTarFile(tarWriter, obj)
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
if errIter != nil {
|
if errIter != nil {
|
||||||
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
|
h.reqLogger(ctx).Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
|
||||||
} else if objectsWritten == 0 {
|
} else if objectsWritten == 0 {
|
||||||
log.Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
|
h.reqLogger(ctx).Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -240,9 +238,9 @@ func (h *Handler) createTarFile(tw *tar.Writer, obj *object.Object) (io.Writer,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool {
|
func (h *Handler) putObjectToArchive(ctx context.Context, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool {
|
||||||
return func(id oid.ID) bool {
|
return func(id oid.ID) bool {
|
||||||
log = log.With(zap.String("oid", id.EncodeToString()))
|
logger := h.reqLogger(ctx).With(zap.String("oid", id.EncodeToString()))
|
||||||
|
|
||||||
prm := PrmObjectGet{
|
prm := PrmObjectGet{
|
||||||
PrmAuth: PrmAuth{
|
PrmAuth: PrmAuth{
|
||||||
|
@ -253,18 +251,18 @@ func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID
|
||||||
|
|
||||||
resGet, err := h.frostfs.GetObject(ctx, prm)
|
resGet, err := h.frostfs.GetObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToGetObject, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
logger.Error(logs.FailedToGetObject, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
fileWriter, err := createArchiveHeader(&resGet.Header)
|
fileWriter, err := createArchiveHeader(&resGet.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
|
logger.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = writeToArchive(resGet, fileWriter, buf); err != nil {
|
if err = writeToArchive(resGet, fileWriter, buf); err != nil {
|
||||||
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
|
logger.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,28 +270,17 @@ func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) searchObjectsByPrefix(c *fasthttp.RequestCtx, log *zap.Logger, cnrID cid.ID) (ResObjectSearch, error) {
|
func (h *Handler) searchObjectsByPrefix(ctx context.Context, cnrID cid.ID, prefix string) (ResObjectSearch, error) {
|
||||||
scid, _ := c.UserValue("cid").(string)
|
|
||||||
prefix, _ := c.UserValue("prefix").(string)
|
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
|
||||||
|
|
||||||
prefix, err := url.QueryUnescape(prefix)
|
prefix, err := url.QueryUnescape(prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix),
|
return nil, fmt.Errorf("unescape prefix: %w", err)
|
||||||
zap.Error(err), logs.TagField(logs.TagDatapath))
|
|
||||||
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
|
||||||
|
|
||||||
resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
return nil, fmt.Errorf("search objects by prefix: %w", err)
|
||||||
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return resSearch, nil
|
return resSearch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -142,6 +141,10 @@ var (
|
||||||
ErrGatewayTimeout = errors.New("gateway timeout")
|
ErrGatewayTimeout = errors.New("gateway timeout")
|
||||||
// ErrQuotaLimitReached is returned from FrostFS in case of quota exceeded.
|
// ErrQuotaLimitReached is returned from FrostFS in case of quota exceeded.
|
||||||
ErrQuotaLimitReached = errors.New("quota limit reached")
|
ErrQuotaLimitReached = errors.New("quota limit reached")
|
||||||
|
// ErrContainerNotFound is returned from FrostFS in case of container was not found.
|
||||||
|
ErrContainerNotFound = errors.New("container not found")
|
||||||
|
// ErrObjectNotFound is returned from FrostFS in case of object was not found.
|
||||||
|
ErrObjectNotFound = errors.New("object not found")
|
||||||
)
|
)
|
||||||
|
|
||||||
// FrostFS represents virtual connection to FrostFS network.
|
// FrostFS represents virtual connection to FrostFS network.
|
||||||
|
@ -195,7 +198,7 @@ func New(params *AppParams, config Config, tree layer.TreeService, workerPool *a
|
||||||
|
|
||||||
// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
// prepares request and object address to it.
|
// prepares request and object address to it.
|
||||||
func (h *Handler) byNativeAddress(ctx context.Context, req request, cnrID cid.ID, objID oid.ID, handler func(context.Context, request, oid.Address)) {
|
func (h *Handler) byNativeAddress(ctx context.Context, req *fasthttp.RequestCtx, cnrID cid.ID, objID oid.ID, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "handler.byNativeAddress")
|
ctx, span := tracing.StartSpanFromContext(ctx, "handler.byNativeAddress")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -205,72 +208,59 @@ func (h *Handler) byNativeAddress(ctx context.Context, req request, cnrID cid.ID
|
||||||
|
|
||||||
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
// resolves object address from S3-like path <bucket name>/<object key>.
|
// resolves object address from S3-like path <bucket name>/<object key>.
|
||||||
func (h *Handler) byS3Path(ctx context.Context, req request, cnrID cid.ID, path string, handler func(context.Context, request, oid.Address)) {
|
func (h *Handler) byS3Path(ctx context.Context, req *fasthttp.RequestCtx, cnrID cid.ID, path string, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "handler.byS3Path")
|
ctx, span := tracing.StartSpanFromContext(ctx, "handler.byS3Path")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
c, log := req.RequestCtx, req.log
|
|
||||||
|
|
||||||
foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path)
|
foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToGetLatestVersionOfObject, zap.Error(err), zap.String("cid", cnrID.String()),
|
h.logAndSendError(ctx, req, logs.FailedToGetLatestVersionOfObject, err, zap.String("path", path))
|
||||||
zap.String("path", path), logs.TagField(logs.TagExternalStorageTree))
|
|
||||||
logAndSendBucketError(c, log, err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if foundOID.IsDeleteMarker {
|
if foundOID.IsDeleteMarker {
|
||||||
log.Error(logs.ObjectWasDeleted, logs.TagField(logs.TagExternalStorageTree))
|
h.logAndSendError(ctx, req, logs.ObjectWasDeleted, ErrObjectNotFound)
|
||||||
ResponseError(c, "object deleted", fasthttp.StatusNotFound)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
addr := newAddress(cnrID, foundOID.OID)
|
addr := newAddress(cnrID, foundOID.OID)
|
||||||
handler(ctx, newRequest(c, log), addr)
|
handler(ctx, req, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byAttribute is a wrapper similar to byNativeAddress.
|
// byAttribute is a wrapper similar to byNativeAddress.
|
||||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Context, request, oid.Address)) {
|
func (h *Handler) byAttribute(ctx context.Context, req *fasthttp.RequestCtx, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) {
|
||||||
cidParam, _ := c.UserValue("cid").(string)
|
cidParam, _ := req.UserValue("cid").(string)
|
||||||
key, _ := c.UserValue("attr_key").(string)
|
key, _ := req.UserValue("attr_key").(string)
|
||||||
val, _ := c.UserValue("attr_val").(string)
|
val, _ := req.UserValue("attr_val").(string)
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
|
||||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
|
||||||
|
|
||||||
key, err := url.QueryUnescape(key)
|
key, err := url.QueryUnescape(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_key", key),
|
h.logAndSendError(ctx, req, logs.FailedToUnescapeQuery, err, zap.String("cid", cidParam), zap.String("attr_key", key))
|
||||||
zap.Error(err), logs.TagField(logs.TagDatapath))
|
|
||||||
ResponseError(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
val, err = url.QueryUnescape(val)
|
val, err = url.QueryUnescape(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_val", val),
|
h.logAndSendError(ctx, req, logs.FailedToUnescapeQuery, err, zap.String("cid", cidParam), zap.String("attr_val", key))
|
||||||
zap.Error(err), logs.TagField(logs.TagDatapath))
|
|
||||||
ResponseError(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
val = prepareAtribute(key, val)
|
val = prepareAtribute(key, val)
|
||||||
|
|
||||||
log = log.With(zap.String("cid", cidParam), zap.String("attr_key", key), zap.String("attr_val", val))
|
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", cidParam),
|
||||||
|
zap.String("attr_key", key), zap.String("attr_val", val)))
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
bktInfo, err := h.getBucketInfo(ctx, cidParam)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
objID, err := h.findObjectByAttribute(ctx, log, bktInfo.CID, key, val)
|
objID, err := h.findObjectByAttribute(ctx, bktInfo.CID, key, val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, io.EOF) {
|
if errors.Is(err, io.EOF) {
|
||||||
ResponseError(c, err.Error(), fasthttp.StatusNotFound)
|
err = fmt.Errorf("%w: %s", ErrObjectNotFound, err.Error())
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
h.logAndSendError(ctx, req, logs.FailedToFindObjectByAttribute, err)
|
||||||
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,14 +268,13 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte
|
||||||
addr.SetContainer(bktInfo.CID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
|
|
||||||
handler(ctx, newRequest(c, log), addr)
|
handler(ctx, req, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
|
func (h *Handler) findObjectByAttribute(ctx context.Context, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
|
||||||
res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual)
|
res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
return oid.ID{}, fmt.Errorf("search objects: %w", err)
|
||||||
return oid.ID{}, fmt.Errorf("could not search for objects: %w", err)
|
|
||||||
}
|
}
|
||||||
defer res.Close()
|
defer res.Close()
|
||||||
|
|
||||||
|
@ -295,14 +284,14 @@ func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cn
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal):
|
case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal):
|
||||||
log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage))
|
h.reqLogger(ctx).Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage))
|
||||||
attrVal = prepareAtribute(attrFileName, attrVal)
|
attrVal = prepareAtribute(attrFileName, attrVal)
|
||||||
return h.findObjectByAttribute(ctx, log, cnrID, attrFileName, attrVal)
|
return h.findObjectByAttribute(ctx, cnrID, attrFileName, attrVal)
|
||||||
case errors.Is(err, io.EOF):
|
case errors.Is(err, io.EOF):
|
||||||
log.Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
h.reqLogger(ctx).Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||||
return oid.ID{}, fmt.Errorf("object not found: %w", err)
|
return oid.ID{}, fmt.Errorf("object not found: %w", err)
|
||||||
default:
|
default:
|
||||||
log.Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
h.reqLogger(ctx).Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||||
return oid.ID{}, fmt.Errorf("read object list failed: %w", err)
|
return oid.ID{}, fmt.Errorf("read object list failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -354,13 +343,13 @@ func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*ci
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
||||||
if err != nil && strings.Contains(err.Error(), "not found") {
|
if err != nil && strings.Contains(err.Error(), "not found") {
|
||||||
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error())
|
err = fmt.Errorf("%w: %s", ErrContainerNotFound, err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return cnrID, err
|
return cnrID, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
|
func (h *Handler) getBucketInfo(ctx context.Context, containerName string) (*data.BucketInfo, error) {
|
||||||
ns, err := middleware.GetNamespace(ctx)
|
ns, err := middleware.GetNamespace(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -372,21 +361,16 @@ func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *
|
||||||
|
|
||||||
cnrID, err := h.resolveContainer(ctx, containerName)
|
cnrID, err := h.resolveContainer(ctx, containerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.CouldNotResolveContainerID, zap.Error(err), zap.String("cnrName", containerName),
|
return nil, fmt.Errorf("resolve container: %w", err)
|
||||||
logs.TagField(logs.TagDatapath))
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bktInfo, err := h.readContainer(ctx, *cnrID)
|
bktInfo, err := h.readContainer(ctx, *cnrID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.CouldNotGetContainerInfo, zap.Error(err), zap.String("cnrName", containerName),
|
return nil, fmt.Errorf("read container: %w", err)
|
||||||
zap.String("cnrName", cnrID.String()),
|
|
||||||
logs.TagField(logs.TagExternalStorage))
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = h.cache.Put(bktInfo); err != nil {
|
if err = h.cache.Put(bktInfo); err != nil {
|
||||||
log.Warn(logs.CouldntPutBucketIntoCache,
|
h.reqLogger(ctx).Warn(logs.CouldntPutBucketIntoCache,
|
||||||
zap.String("bucket name", bktInfo.Name),
|
zap.String("bucket name", bktInfo.Name),
|
||||||
zap.Stringer("bucket cid", bktInfo.CID),
|
zap.Stringer("bucket cid", bktInfo.CID),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
|
@ -419,31 +403,24 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
|
||||||
return bktInfo, err
|
return bktInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) {
|
func (h *Handler) browseIndex(ctx context.Context, req *fasthttp.RequestCtx, cidParam, oidParam string, isNativeList bool) {
|
||||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.browseIndex")
|
ctx, span := tracing.StartSpanFromContext(ctx, "handler.browseIndex")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
utils.SetContextToRequest(ctx, c)
|
|
||||||
|
|
||||||
if !h.config.IndexPageEnabled() {
|
if !h.config.IndexPageEnabled() {
|
||||||
c.SetStatusCode(fasthttp.StatusNotFound)
|
req.SetStatusCode(fasthttp.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cidURLParam := c.UserValue("cid").(string)
|
unescapedKey, err := url.QueryUnescape(oidParam)
|
||||||
oidURLParam := c.UserValue("oid").(string)
|
|
||||||
|
|
||||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
|
||||||
log := reqLog.With(zap.String("cid", cidURLParam), zap.String("oid", oidURLParam))
|
|
||||||
|
|
||||||
unescapedKey, err := url.QueryUnescape(oidURLParam)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToUnescapeOIDParam, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log)
|
bktInfo, err := h.getBucketInfo(ctx, cidParam)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -453,7 +430,7 @@ func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) {
|
||||||
listFunc = h.getDirObjectsNative
|
listFunc = h.getDirObjectsNative
|
||||||
}
|
}
|
||||||
|
|
||||||
h.browseObjects(c, browseParams{
|
h.browseObjects(ctx, req, browseParams{
|
||||||
bucketInfo: bktInfo,
|
bucketInfo: bktInfo,
|
||||||
prefix: unescapedKey,
|
prefix: unescapedKey,
|
||||||
listObjects: listFunc,
|
listObjects: listFunc,
|
||||||
|
|
|
@ -374,7 +374,7 @@ func TestFindObjectByAttribute(t *testing.T) {
|
||||||
obj.SetAttributes(tc.firstAttr, tc.secondAttr)
|
obj.SetAttributes(tc.firstAttr, tc.secondAttr)
|
||||||
hc.cfg.additionalSearch = tc.additionalSearch
|
hc.cfg.additionalSearch = tc.additionalSearch
|
||||||
|
|
||||||
objID, err := hc.Handler().findObjectByAttribute(ctx, hc.Handler().log, cnrID, tc.reqAttrKey, tc.reqAttrValue)
|
objID, err := hc.Handler().findObjectByAttribute(ctx, cnrID, tc.reqAttrKey, tc.reqAttrValue)
|
||||||
if tc.err != "" {
|
if tc.err != "" {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Contains(t, err.Error(), tc.err)
|
require.Contains(t, err.Error(), tc.err)
|
||||||
|
|
|
@ -27,7 +27,7 @@ const (
|
||||||
hdrContainerID = "X-Container-Id"
|
hdrContainerID = "X-Container-Id"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) {
|
func (h *Handler) headObject(ctx context.Context, req *fasthttp.RequestCtx, objectAddress oid.Address) {
|
||||||
var start = time.Now()
|
var start = time.Now()
|
||||||
|
|
||||||
btoken := bearerToken(ctx)
|
btoken := bearerToken(ctx)
|
||||||
|
@ -41,7 +41,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
||||||
|
|
||||||
obj, err := h.frostfs.HeadObject(ctx, prm)
|
obj, err := h.frostfs.HeadObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.handleFrostFSErr(err, start)
|
h.logAndSendError(ctx, req, logs.FailedToHeadObject, err, zap.Stringer("elapsed", time.Since(start)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,7 +65,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
||||||
case object.AttributeTimestamp:
|
case object.AttributeTimestamp:
|
||||||
value, err := strconv.ParseInt(val, 10, 64)
|
value, err := strconv.ParseInt(val, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.log.Info(logs.CouldntParseCreationDate,
|
h.reqLogger(ctx).Info(logs.CouldntParseCreationDate,
|
||||||
zap.String("key", key),
|
zap.String("key", key),
|
||||||
zap.String("val", val),
|
zap.String("val", val),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
|
@ -100,7 +100,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
||||||
return h.frostfs.RangeObject(ctx, prmRange)
|
return h.frostfs.RangeObject(ctx, prmRange)
|
||||||
}, filename)
|
}, filename)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
req.handleFrostFSErr(err, start)
|
h.logAndSendError(ctx, req, logs.FailedToDetectContentTypeFromPayload, err, zap.Stringer("elapsed", time.Since(start)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -116,40 +116,37 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format.
|
// HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format.
|
||||||
func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
func (h *Handler) HeadByAddressOrBucketName(req *fasthttp.RequestCtx) {
|
||||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.HeadByAddressOrBucketName")
|
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.HeadByAddressOrBucketName")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
cidParam, _ := c.UserValue("cid").(string)
|
cidParam, _ := req.UserValue("cid").(string)
|
||||||
oidParam, _ := c.UserValue("oid").(string)
|
oidParam, _ := req.UserValue("oid").(string)
|
||||||
|
|
||||||
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(
|
||||||
zap.String("cid", cidParam),
|
zap.String("cid", cidParam),
|
||||||
zap.String("oid", oidParam),
|
zap.String("oid", oidParam),
|
||||||
)
|
))
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
bktInfo, err := h.getBucketInfo(ctx, cidParam)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
||||||
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
||||||
log.Error(logs.FailedToCheckIfSettingsNodeExist, zap.String("cid", bktInfo.CID.String()),
|
h.logAndSendError(ctx, req, logs.FailedToCheckIfSettingsNodeExist, checkS3Err)
|
||||||
zap.Error(checkS3Err), logs.TagField(logs.TagExternalStorageTree))
|
|
||||||
logAndSendBucketError(c, log, checkS3Err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
req := newRequest(c, log)
|
|
||||||
|
|
||||||
var objID oid.ID
|
var objID oid.ID
|
||||||
if checkS3Err == nil {
|
if checkS3Err == nil {
|
||||||
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.headObject)
|
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.headObject)
|
||||||
} else if err = objID.DecodeString(oidParam); err == nil {
|
} else if err = objID.DecodeString(oidParam); err == nil {
|
||||||
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.headObject)
|
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.headObject)
|
||||||
} else {
|
} else {
|
||||||
logAndSendBucketError(c, log, checkS3Err)
|
h.logAndSendError(ctx, req, logs.InvalidOIDParam, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,7 +154,6 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
func (h *Handler) HeadByAttribute(c *fasthttp.RequestCtx) {
|
func (h *Handler) HeadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.HeadByAttribute")
|
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.HeadByAttribute")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
utils.SetContextToRequest(ctx, c)
|
|
||||||
|
|
||||||
h.byAttribute(c, h.headObject)
|
h.byAttribute(ctx, c, h.headObject)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -53,7 +54,7 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
||||||
func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
|
func (h *Handler) getPayload(ctx context.Context, p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
|
||||||
cid, ok := p.obj.Header.ContainerID()
|
cid, ok := p.obj.Header.ContainerID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, 0, errors.New("no container id set")
|
return nil, 0, errors.New("no container id set")
|
||||||
|
@ -66,7 +67,6 @@ func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
ctx := p.req.RequestCtx
|
|
||||||
params := PrmInitMultiObjectReader{
|
params := PrmInitMultiObjectReader{
|
||||||
Addr: newAddress(cid, oid),
|
Addr: newAddress(cid, oid),
|
||||||
Bearer: bearerToken(ctx),
|
Bearer: bearerToken(ctx),
|
||||||
|
|
|
@ -63,11 +63,10 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error), file
|
||||||
|
|
||||||
type getMultiobjectBodyParams struct {
|
type getMultiobjectBodyParams struct {
|
||||||
obj *Object
|
obj *Object
|
||||||
req request
|
|
||||||
strSize string
|
strSize string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address) {
|
func (h *Handler) receiveFile(ctx context.Context, req *fasthttp.RequestCtx, objAddress oid.Address) {
|
||||||
var (
|
var (
|
||||||
shouldDownload = req.QueryArgs().GetBool("download")
|
shouldDownload = req.QueryArgs().GetBool("download")
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
|
@ -85,12 +84,12 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
||||||
|
|
||||||
rObj, err := h.frostfs.GetObject(ctx, prm)
|
rObj, err := h.frostfs.GetObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.handleFrostFSErr(err, start)
|
h.logAndSendError(ctx, req, logs.FailedToGetObject, err, zap.Stringer("elapsed", time.Since(start)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// we can't close reader in this function, so how to do it?
|
// we can't close reader in this function, so how to do it?
|
||||||
req.setIDs(rObj.Header)
|
setIDs(req, rObj.Header)
|
||||||
payload := rObj.Payload
|
payload := rObj.Payload
|
||||||
payloadSize := rObj.Header.PayloadSize()
|
payloadSize := rObj.Header.PayloadSize()
|
||||||
for _, attr := range rObj.Header.Attributes() {
|
for _, attr := range rObj.Header.Attributes() {
|
||||||
|
@ -107,8 +106,8 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
||||||
case object.AttributeFileName:
|
case object.AttributeFileName:
|
||||||
filename = val
|
filename = val
|
||||||
case object.AttributeTimestamp:
|
case object.AttributeTimestamp:
|
||||||
if err = req.setTimestamp(val); err != nil {
|
if err = setTimestamp(req, val); err != nil {
|
||||||
req.log.Error(logs.CouldntParseCreationDate,
|
h.reqLogger(ctx).Error(logs.CouldntParseCreationDate,
|
||||||
zap.String("val", val),
|
zap.String("val", val),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
logs.TagField(logs.TagDatapath))
|
logs.TagField(logs.TagDatapath))
|
||||||
|
@ -118,13 +117,12 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
||||||
case object.AttributeFilePath:
|
case object.AttributeFilePath:
|
||||||
filepath = val
|
filepath = val
|
||||||
case attributeMultipartObjectSize:
|
case attributeMultipartObjectSize:
|
||||||
payload, payloadSize, err = h.getPayload(getMultiobjectBodyParams{
|
payload, payloadSize, err = h.getPayload(ctx, getMultiobjectBodyParams{
|
||||||
obj: rObj,
|
obj: rObj,
|
||||||
req: req,
|
|
||||||
strSize: val,
|
strSize: val,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.handleFrostFSErr(err, start)
|
h.logAndSendError(ctx, req, logs.FailedToGetObjectPayload, err, zap.Stringer("elapsed", time.Since(start)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,7 +131,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
||||||
filename = filepath
|
filename = filepath
|
||||||
}
|
}
|
||||||
|
|
||||||
req.setDisposition(shouldDownload, filename)
|
setDisposition(req, shouldDownload, filename)
|
||||||
|
|
||||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||||
|
|
||||||
|
@ -145,8 +143,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
||||||
return payload, nil
|
return payload, nil
|
||||||
}, filename)
|
}, filename)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.logAndSendError(ctx, req, logs.FailedToDetectContentTypeFromPayload, err, zap.Stringer("elapsed", time.Since(start)))
|
||||||
ResponseError(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,7 +162,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
||||||
req.Response.SetBodyStream(payload, int(payloadSize))
|
req.Response.SetBodyStream(payload, int(payloadSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) setIDs(obj object.Object) {
|
func setIDs(r *fasthttp.RequestCtx, obj object.Object) {
|
||||||
objID, _ := obj.ID()
|
objID, _ := obj.ID()
|
||||||
cnrID, _ := obj.ContainerID()
|
cnrID, _ := obj.ContainerID()
|
||||||
r.Response.Header.Set(hdrObjectID, objID.String())
|
r.Response.Header.Set(hdrObjectID, objID.String())
|
||||||
|
@ -173,7 +170,7 @@ func (r *request) setIDs(obj object.Object) {
|
||||||
r.Response.Header.Set(hdrContainerID, cnrID.String())
|
r.Response.Header.Set(hdrContainerID, cnrID.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) setDisposition(shouldDownload bool, filename string) {
|
func setDisposition(r *fasthttp.RequestCtx, shouldDownload bool, filename string) {
|
||||||
const (
|
const (
|
||||||
inlineDisposition = "inline"
|
inlineDisposition = "inline"
|
||||||
attachmentDisposition = "attachment"
|
attachmentDisposition = "attachment"
|
||||||
|
@ -187,7 +184,7 @@ func (r *request) setDisposition(shouldDownload bool, filename string) {
|
||||||
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) setTimestamp(timestamp string) error {
|
func setTimestamp(r *fasthttp.RequestCtx, timestamp string) error {
|
||||||
value, err := strconv.ParseInt(timestamp, 10, 64)
|
value, err := strconv.ParseInt(timestamp, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -50,44 +50,41 @@ func (pr *putResponse) encode(w io.Writer) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upload handles multipart upload request.
|
// Upload handles multipart upload request.
|
||||||
func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
||||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.Upload")
|
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.Upload")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
utils.SetContextToRequest(ctx, c)
|
|
||||||
|
|
||||||
var file MultipartFile
|
var file MultipartFile
|
||||||
|
|
||||||
scid, _ := c.UserValue("cid").(string)
|
scid, _ := req.UserValue("cid").(string)
|
||||||
bodyStream := c.RequestBodyStream()
|
bodyStream := req.RequestBodyStream()
|
||||||
drainBuf := make([]byte, drainBufSize)
|
drainBuf := make([]byte, drainBufSize)
|
||||||
|
|
||||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
log := h.reqLogger(ctx)
|
||||||
log := reqLog.With(zap.String("cid", scid))
|
ctx = utils.SetReqLog(ctx, log.With(zap.String("cid", scid)))
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
bktInfo, err := h.getBucketInfo(ctx, scid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
boundary := string(c.Request.Header.MultipartFormBoundary())
|
boundary := string(req.Request.Header.MultipartFormBoundary())
|
||||||
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
|
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
|
||||||
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.logAndSendError(ctx, req, logs.CouldNotReceiveMultipartForm, err)
|
||||||
ResponseError(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
filtered, err := filterHeaders(log, &c.Request.Header)
|
filtered, err := filterHeaders(log, &req.Request.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToFilterHeaders, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.logAndSendError(ctx, req, logs.FailedToFilterHeaders, err)
|
||||||
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.Request.Header.Peek(explodeArchiveHeader) != nil {
|
if req.Request.Header.Peek(explodeArchiveHeader) != nil {
|
||||||
h.explodeArchive(request{c, log}, bktInfo, file, filtered)
|
h.explodeArchive(ctx, req, bktInfo, file, filtered)
|
||||||
} else {
|
} else {
|
||||||
h.uploadSingleObject(request{c, log}, bktInfo, file, filtered)
|
h.uploadSingleObject(ctx, req, bktInfo, file, filtered)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Multipart is multipart and thus can contain more than one part which
|
// Multipart is multipart and thus can contain more than one part which
|
||||||
|
@ -104,46 +101,39 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) uploadSingleObject(req request, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
|
func (h *Handler) uploadSingleObject(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
|
||||||
c, log := req.RequestCtx, req.log
|
ctx, span := tracing.StartSpanFromContext(ctx, "handler.uploadSingleObject")
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.uploadSingleObject")
|
|
||||||
defer span.End()
|
defer span.End()
|
||||||
utils.SetContextToRequest(ctx, c)
|
|
||||||
|
|
||||||
setIfNotExist(filtered, object.AttributeFileName, file.FileName())
|
setIfNotExist(filtered, object.AttributeFileName, file.FileName())
|
||||||
|
|
||||||
attributes, err := h.extractAttributes(c, log, filtered)
|
attributes, err := h.extractAttributes(ctx, req, filtered)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err)
|
||||||
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
idObj, err := h.uploadObject(c, bkt, attributes, file)
|
idObj, err := h.uploadObject(ctx, bkt, attributes, file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.handlePutFrostFSErr(c, err, log)
|
h.logAndSendError(ctx, req, logs.FailedToUploadObject, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Debug(logs.ObjectUploaded,
|
h.reqLogger(ctx).Debug(logs.ObjectUploaded,
|
||||||
zap.String("oid", idObj.EncodeToString()),
|
zap.String("oid", idObj.EncodeToString()),
|
||||||
zap.String("FileName", file.FileName()),
|
zap.String("FileName", file.FileName()),
|
||||||
logs.TagField(logs.TagExternalStorage),
|
logs.TagField(logs.TagExternalStorage),
|
||||||
)
|
)
|
||||||
|
|
||||||
addr := newAddress(bkt.CID, idObj)
|
addr := newAddress(bkt.CID, idObj)
|
||||||
c.Response.Header.SetContentType(jsonHeader)
|
req.Response.Header.SetContentType(jsonHeader)
|
||||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||||
if err = newPutResponse(addr).encode(c); err != nil {
|
if err = newPutResponse(addr).encode(req); err != nil {
|
||||||
log.Error(logs.CouldNotEncodeResponse, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.logAndSendError(ctx, req, logs.CouldNotEncodeResponse, err)
|
||||||
ResponseError(c, "could not encode response", fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
|
func (h *Handler) uploadObject(ctx context.Context, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
|
||||||
ctx := utils.GetContextFromRequest(c)
|
|
||||||
|
|
||||||
obj := object.New()
|
obj := object.New()
|
||||||
obj.SetContainerID(bkt.CID)
|
obj.SetContainerID(bkt.CID)
|
||||||
obj.SetOwnerID(*h.ownerID)
|
obj.SetOwnerID(*h.ownerID)
|
||||||
|
@ -168,19 +158,18 @@ func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, att
|
||||||
return idObj, nil
|
return idObj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) extractAttributes(c *fasthttp.RequestCtx, log *zap.Logger, filtered map[string]string) ([]object.Attribute, error) {
|
func (h *Handler) extractAttributes(ctx context.Context, req *fasthttp.RequestCtx, filtered map[string]string) ([]object.Attribute, error) {
|
||||||
ctx := utils.GetContextFromRequest(c)
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||||
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
||||||
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err),
|
h.reqLogger(ctx).Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err),
|
||||||
logs.TagField(logs.TagDatapath))
|
logs.TagField(logs.TagDatapath))
|
||||||
} else {
|
} else {
|
||||||
now = parsed
|
now = parsed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := utils.PrepareExpirationHeader(ctx, h.frostfs, filtered, now); err != nil {
|
if err := utils.PrepareExpirationHeader(ctx, h.frostfs, filtered, now); err != nil {
|
||||||
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.reqLogger(ctx).Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
attributes := make([]object.Attribute, 0, len(filtered))
|
attributes := make([]object.Attribute, 0, len(filtered))
|
||||||
|
@ -207,38 +196,33 @@ func newAttribute(key string, val string) object.Attribute {
|
||||||
|
|
||||||
// explodeArchive read files from archive and creates objects for each of them.
|
// explodeArchive read files from archive and creates objects for each of them.
|
||||||
// Sets FilePath attribute with name from tar.Header.
|
// Sets FilePath attribute with name from tar.Header.
|
||||||
func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
|
func (h *Handler) explodeArchive(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
|
||||||
c, log := req.RequestCtx, req.log
|
ctx, span := tracing.StartSpanFromContext(ctx, "handler.explodeArchive")
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.explodeArchive")
|
|
||||||
defer span.End()
|
defer span.End()
|
||||||
utils.SetContextToRequest(ctx, c)
|
|
||||||
|
|
||||||
// remove user attributes which vary for each file in archive
|
// remove user attributes which vary for each file in archive
|
||||||
// to guarantee that they won't appear twice
|
// to guarantee that they won't appear twice
|
||||||
delete(filtered, object.AttributeFileName)
|
delete(filtered, object.AttributeFileName)
|
||||||
delete(filtered, object.AttributeFilePath)
|
delete(filtered, object.AttributeFilePath)
|
||||||
|
|
||||||
commonAttributes, err := h.extractAttributes(c, log, filtered)
|
commonAttributes, err := h.extractAttributes(ctx, req, filtered)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err)
|
||||||
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
attributes := commonAttributes
|
attributes := commonAttributes
|
||||||
|
|
||||||
reader := file
|
reader := file
|
||||||
if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
|
if bytes.EqualFold(req.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
|
||||||
log.Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath))
|
h.reqLogger(ctx).Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath))
|
||||||
gzipReader, err := gzip.NewReader(file)
|
gzipReader, err := gzip.NewReader(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToCreateGzipReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.logAndSendError(ctx, req, logs.FailedToCreateGzipReader, err)
|
||||||
ResponseError(c, "could read gzip file: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := gzipReader.Close(); err != nil {
|
if err := gzipReader.Close(); err != nil {
|
||||||
log.Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.reqLogger(ctx).Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
reader = gzipReader
|
reader = gzipReader
|
||||||
|
@ -250,8 +234,7 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
||||||
if errors.Is(err, io.EOF) {
|
if errors.Is(err, io.EOF) {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
log.Error(logs.FailedToReadFileFromTar, zap.Error(err), logs.TagField(logs.TagDatapath))
|
h.logAndSendError(ctx, req, logs.FailedToReadFileFromTar, err)
|
||||||
ResponseError(c, "could not get next entry: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,13 +248,13 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
||||||
attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name))
|
attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name))
|
||||||
attributes = append(attributes, newAttribute(object.AttributeFileName, fileName))
|
attributes = append(attributes, newAttribute(object.AttributeFileName, fileName))
|
||||||
|
|
||||||
idObj, err := h.uploadObject(c, bkt, attributes, tarReader)
|
idObj, err := h.uploadObject(ctx, bkt, attributes, tarReader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.handlePutFrostFSErr(c, err, log)
|
h.logAndSendError(ctx, req, logs.FailedToUploadObject, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug(logs.ObjectUploaded,
|
h.reqLogger(ctx).Debug(logs.ObjectUploaded,
|
||||||
zap.String("oid", idObj.EncodeToString()),
|
zap.String("oid", idObj.EncodeToString()),
|
||||||
zap.String("FileName", fileName),
|
zap.String("FileName", fileName),
|
||||||
logs.TagField(logs.TagExternalStorage),
|
logs.TagField(logs.TagExternalStorage),
|
||||||
|
@ -279,14 +262,6 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {
|
|
||||||
statusCode, msg, additionalFields := formErrorResponse("could not store file in frostfs", err)
|
|
||||||
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
|
|
||||||
|
|
||||||
log.Error(logs.CouldNotStoreFileInFrostfs, append(logFields, logs.TagField(logs.TagExternalStorage))...)
|
|
||||||
ResponseError(r, msg, statusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token {
|
func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token {
|
||||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
|
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
|
||||||
return tkn
|
return tkn
|
||||||
|
|
|
@ -5,13 +5,12 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
||||||
sdkstatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -19,30 +18,6 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type request struct {
|
|
||||||
*fasthttp.RequestCtx
|
|
||||||
log *zap.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
|
|
||||||
return request{
|
|
||||||
RequestCtx: ctx,
|
|
||||||
log: log,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *request) handleFrostFSErr(err error, start time.Time) {
|
|
||||||
logFields := []zap.Field{
|
|
||||||
zap.Stringer("elapsed", time.Since(start)),
|
|
||||||
zap.Error(err),
|
|
||||||
}
|
|
||||||
statusCode, msg, additionalFields := formErrorResponse("could not receive object", err)
|
|
||||||
logFields = append(logFields, additionalFields...)
|
|
||||||
|
|
||||||
r.log.Error(logs.CouldNotReceiveObject, append(logFields, logs.TagField(logs.TagExternalStorage))...)
|
|
||||||
ResponseError(r.RequestCtx, msg, statusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func bearerToken(ctx context.Context) *bearer.Token {
|
func bearerToken(ctx context.Context) *bearer.Token {
|
||||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
||||||
return tkn
|
return tkn
|
||||||
|
@ -84,14 +59,16 @@ func isValidValue(s string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
|
func (h *Handler) reqLogger(ctx context.Context) *zap.Logger {
|
||||||
r.loginov
commented
question: is this function intentionally not used here and here? It's just that if within the handler package we wrap the question: is this function intentionally not used [here](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/75328dd9340d5c1e4320ce95dc118b00f3a5b8c1/internal/handler/browse.go#L226) and [here](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/src/commit/75328dd9340d5c1e4320ce95dc118b00f3a5b8c1/internal/handler/browse.go#L261)? It's just that if within the handler package we wrap the `utils.GetReqLogOrDefault(ctx, h.log)` function in `reqLogger(ctx context.Context)`, don't we want to use it wherever possible?
|
|||||||
log.Error(logs.CouldNotGetBucket, zap.Error(err), logs.TagField(logs.TagDatapath))
|
return utils.GetReqLogOrDefault(ctx, h.log)
|
||||||
|
}
|
||||||
|
|
||||||
if client.IsErrContainerNotFound(err) {
|
func (h *Handler) logAndSendError(ctx context.Context, c *fasthttp.RequestCtx, msg string, err error, additional ...zap.Field) {
|
||||||
ResponseError(c, "Not Found", fasthttp.StatusNotFound)
|
utils.GetReqLogOrDefault(ctx, h.log).Error(msg,
|
||||||
return
|
append([]zap.Field{zap.Error(err), logs.TagField(logs.TagDatapath)}, additional...)...)
|
||||||
}
|
|
||||||
ResponseError(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
|
msg, code := formErrorResponse(err)
|
||||||
|
ResponseError(c, msg, code)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
||||||
|
@ -112,31 +89,23 @@ func ResponseError(r *fasthttp.RequestCtx, msg string, code int) {
|
||||||
r.Error(msg+"\n", code)
|
r.Error(msg+"\n", code)
|
||||||
}
|
}
|
||||||
|
|
||||||
func formErrorResponse(message string, err error) (int, string, []zap.Field) {
|
func formErrorResponse(err error) (string, int) {
|
||||||
var (
|
|
||||||
msg string
|
|
||||||
statusCode int
|
|
||||||
logFields []zap.Field
|
|
||||||
)
|
|
||||||
|
|
||||||
st := new(sdkstatus.ObjectAccessDenied)
|
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case errors.As(err, &st):
|
case errors.Is(err, ErrAccessDenied):
|
||||||
statusCode = fasthttp.StatusForbidden
|
return fmt.Sprintf("Storage Access Denied:\n%v", err), fasthttp.StatusForbidden
|
||||||
reason := st.Reason()
|
case errors.Is(err, layer.ErrNodeAccessDenied):
|
||||||
msg = fmt.Sprintf("%s: %v: %s", message, err, reason)
|
return fmt.Sprintf("Tree Access Denied:\n%v", err), fasthttp.StatusForbidden
|
||||||
logFields = append(logFields, zap.String("error_detail", reason))
|
|
||||||
case errors.Is(err, ErrQuotaLimitReached):
|
case errors.Is(err, ErrQuotaLimitReached):
|
||||||
statusCode = fasthttp.StatusConflict
|
return fmt.Sprintf("Quota Reached:\n%v", err), fasthttp.StatusConflict
|
||||||
msg = fmt.Sprintf("%s: %v", message, err)
|
case errors.Is(err, ErrContainerNotFound):
|
||||||
case client.IsErrObjectNotFound(err) || client.IsErrContainerNotFound(err):
|
return fmt.Sprintf("Container Not Found:\n%v", err), fasthttp.StatusNotFound
|
||||||
statusCode = fasthttp.StatusNotFound
|
case errors.Is(err, ErrObjectNotFound):
|
||||||
msg = "Not Found"
|
return fmt.Sprintf("Object Not Found:\n%v", err), fasthttp.StatusNotFound
|
||||||
|
case errors.Is(err, layer.ErrNodeNotFound):
|
||||||
|
return fmt.Sprintf("Tree Node Not Found:\n%v", err), fasthttp.StatusNotFound
|
||||||
|
case errors.Is(err, ErrGatewayTimeout):
|
||||||
|
return fmt.Sprintf("Gateway Timeout:\n%v", err), fasthttp.StatusGatewayTimeout
|
||||||
r.loginov
commented
Should we also note in the API documentation the possibility of returning a 504 GatewayTimeout? Should we also note in the API documentation the possibility of returning a 504 GatewayTimeout?
|
|||||||
default:
|
default:
|
||||||
statusCode = fasthttp.StatusBadRequest
|
return fmt.Sprintf("Bad Request:\n%v", err), fasthttp.StatusBadRequest
|
||||||
msg = fmt.Sprintf("%s: %v", message, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return statusCode, msg, logFields
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,7 @@ const (
|
||||||
// Log messages with the "datapath" tag.
|
// Log messages with the "datapath" tag.
|
||||||
const (
|
const (
|
||||||
CouldntParseCreationDate = "couldn't parse creation date"
|
CouldntParseCreationDate = "couldn't parse creation date"
|
||||||
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload"
|
FailedToDetectContentTypeFromPayload = "failed to detect Content-Type from payload"
|
||||||
FailedToAddObjectToArchive = "failed to add object to archive"
|
FailedToAddObjectToArchive = "failed to add object to archive"
|
||||||
CloseZipWriter = "close zip writer"
|
CloseZipWriter = "close zip writer"
|
||||||
IgnorePartEmptyFormName = "ignore part, empty form name"
|
IgnorePartEmptyFormName = "ignore part, empty form name"
|
||||||
|
@ -104,28 +104,32 @@ const (
|
||||||
CouldNotReceiveMultipartForm = "could not receive multipart/form"
|
CouldNotReceiveMultipartForm = "could not receive multipart/form"
|
||||||
ObjectsNotFound = "objects not found"
|
ObjectsNotFound = "objects not found"
|
||||||
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed"
|
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed"
|
||||||
CouldNotGetBucket = "could not get bucket"
|
FailedToGetBucketInfo = "could not get bucket info"
|
||||||
CouldNotResolveContainerID = "could not resolve container id"
|
FailedToSubmitTaskToPool = "failed to submit task to pool"
|
||||||
FailedToSumbitTaskToPool = "failed to submit task to pool"
|
ObjectWasDeleted = "object was deleted"
|
||||||
|
FailedToGetLatestVersionOfObject = "failed to get latest version of object"
|
||||||
|
FailedToCheckIfSettingsNodeExist = "failed to check if settings node exists"
|
||||||
|
FailedToListObjects = "failed to list objects"
|
||||||
|
FailedToParseTemplate = "failed to parse template"
|
||||||
|
FailedToExecuteTemplate = "failed to execute template"
|
||||||
|
FailedToUploadObject = "failed to upload object"
|
||||||
|
FailedToHeadObject = "failed to head object"
|
||||||
|
FailedToGetObject = "failed to get object"
|
||||||
|
FailedToGetObjectPayload = "failed to get object payload"
|
||||||
|
FailedToFindObjectByAttribute = "failed to get find object by attribute"
|
||||||
|
FailedToUnescapeOIDParam = "failed to unescape oid param"
|
||||||
|
InvalidOIDParam = "invalid oid param"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Log messages with the "external_storage" tag.
|
// Log messages with the "external_storage" tag.
|
||||||
const (
|
const (
|
||||||
CouldNotReceiveObject = "could not receive object"
|
|
||||||
CouldNotSearchForObjects = "could not search for objects"
|
|
||||||
ObjectNotFound = "object not found"
|
ObjectNotFound = "object not found"
|
||||||
ReadObjectListFailed = "read object list failed"
|
ReadObjectListFailed = "read object list failed"
|
||||||
CouldNotStoreFileInFrostfs = "could not store file in frostfs"
|
|
||||||
FailedToHeadObject = "failed to head object"
|
|
||||||
ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName"
|
ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName"
|
||||||
FailedToGetObject = "failed to get object"
|
|
||||||
ObjectUploaded = "object uploaded"
|
ObjectUploaded = "object uploaded"
|
||||||
CouldNotGetContainerInfo = "could not get container info"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Log messages with the "external_storage_tree" tag.
|
// Log messages with the "external_storage_tree" tag.
|
||||||
const (
|
const (
|
||||||
ObjectWasDeleted = "object was deleted"
|
FoundSeveralSystemTreeNodes = "found several system tree nodes"
|
||||||
FailedToGetLatestVersionOfObject = "failed to get latest version of object"
|
|
||||||
FailedToCheckIfSettingsNodeExist = "Failed to check if settings node exists"
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
qostagging "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
qostagging "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
@ -48,7 +49,7 @@ func (x *FrostFS) Container(ctx context.Context, containerPrm handler.PrmContain
|
||||||
|
|
||||||
res, err := x.pool.GetContainer(ctx, prm)
|
res, err := x.pool.GetContainer(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, handleObjectError("read container via connection pool", err)
|
return nil, handleStorageError("read container via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &res, nil
|
return &res, nil
|
||||||
|
@ -72,7 +73,7 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm handler.PrmObjectCreate)
|
||||||
|
|
||||||
idObj, err := x.pool.PutObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmPut)
|
idObj, err := x.pool.PutObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmPut)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oid.ID{}, handleObjectError("save object via connection pool", err)
|
return oid.ID{}, handleStorageError("save object via connection pool", err)
|
||||||
}
|
}
|
||||||
return idObj.ObjectID, nil
|
return idObj.ObjectID, nil
|
||||||
}
|
}
|
||||||
|
@ -88,7 +89,7 @@ func (x payloadReader) Read(p []byte) (int, error) {
|
||||||
if err != nil && errors.Is(err, io.EOF) {
|
if err != nil && errors.Is(err, io.EOF) {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
return n, handleObjectError("read payload", err)
|
return n, handleStorageError("read payload", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeadObject implements frostfs.FrostFS interface method.
|
// HeadObject implements frostfs.FrostFS interface method.
|
||||||
|
@ -105,7 +106,7 @@ func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*o
|
||||||
|
|
||||||
res, err := x.pool.HeadObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmHead)
|
res, err := x.pool.HeadObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmHead)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, handleObjectError("read object header via connection pool", err)
|
return nil, handleStorageError("read object header via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &res, nil
|
return &res, nil
|
||||||
|
@ -125,7 +126,7 @@ func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*han
|
||||||
|
|
||||||
res, err := x.pool.GetObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmGet)
|
res, err := x.pool.GetObject(qostagging.ContextWithIOTag(ctx, clientIOTag), prmGet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, handleObjectError("init full object reading via connection pool", err)
|
return nil, handleStorageError("init full object reading via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &handler.Object{
|
return &handler.Object{
|
||||||
|
@ -150,7 +151,7 @@ func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (
|
||||||
|
|
||||||
res, err := x.pool.ObjectRange(qostagging.ContextWithIOTag(ctx, clientIOTag), prmRange)
|
res, err := x.pool.ObjectRange(qostagging.ContextWithIOTag(ctx, clientIOTag), prmRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, handleObjectError("init payload range reading via connection pool", err)
|
return nil, handleStorageError("init payload range reading via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return payloadReader{&res}, nil
|
return payloadReader{&res}, nil
|
||||||
|
@ -171,7 +172,7 @@ func (x *FrostFS) SearchObjects(ctx context.Context, prm handler.PrmObjectSearch
|
||||||
|
|
||||||
res, err := x.pool.SearchObjects(qostagging.ContextWithIOTag(ctx, clientIOTag), prmSearch)
|
res, err := x.pool.SearchObjects(qostagging.ContextWithIOTag(ctx, clientIOTag), prmSearch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, handleObjectError("init object search via connection pool", err)
|
return nil, handleStorageError("init object search via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &res, nil
|
return &res, nil
|
||||||
|
@ -205,7 +206,7 @@ func (x *FrostFS) NetmapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||||
|
|
||||||
netmapSnapshot, err := x.pool.NetMapSnapshot(ctx)
|
netmapSnapshot, err := x.pool.NetMapSnapshot(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return netmapSnapshot, handleObjectError("get netmap via connection pool", err)
|
return netmapSnapshot, handleStorageError("get netmap via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return netmapSnapshot, nil
|
return netmapSnapshot, nil
|
||||||
|
@ -229,7 +230,7 @@ func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
|
||||||
|
|
||||||
networkInfo, err := x.pool.NetworkInfo(ctx)
|
networkInfo, err := x.pool.NetworkInfo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", handleObjectError("read network info via client", err)
|
return "", handleStorageError("read network info via client", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
domain := networkInfo.RawNetworkParameter("SystemDNS")
|
domain := networkInfo.RawNetworkParameter("SystemDNS")
|
||||||
|
@ -240,7 +241,7 @@ func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
|
||||||
return string(domain), nil
|
return string(domain), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleObjectError(msg string, err error) error {
|
func handleStorageError(msg string, err error) error {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -253,6 +254,14 @@ func handleObjectError(msg string, err error) error {
|
||||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason)
|
return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if client.IsErrContainerNotFound(err) {
|
||||||
|
return fmt.Errorf("%s: %w: %s", msg, handler.ErrContainerNotFound, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if client.IsErrObjectNotFound(err) {
|
||||||
|
return fmt.Errorf("%s: %w: %s", msg, handler.ErrObjectNotFound, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
if IsTimeoutError(err) {
|
if IsTimeoutError(err) {
|
||||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error())
|
return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error())
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ func TestHandleObjectError(t *testing.T) {
|
||||||
msg := "some msg"
|
msg := "some msg"
|
||||||
|
|
||||||
t.Run("nil error", func(t *testing.T) {
|
t.Run("nil error", func(t *testing.T) {
|
||||||
err := handleObjectError(msg, nil)
|
err := handleStorageError(msg, nil)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -27,7 +27,7 @@ func TestHandleObjectError(t *testing.T) {
|
||||||
inputErr := new(apistatus.ObjectAccessDenied)
|
inputErr := new(apistatus.ObjectAccessDenied)
|
||||||
inputErr.WriteReason(reason)
|
inputErr.WriteReason(reason)
|
||||||
|
|
||||||
err := handleObjectError(msg, inputErr)
|
err := handleStorageError(msg, inputErr)
|
||||||
require.ErrorIs(t, err, handler.ErrAccessDenied)
|
require.ErrorIs(t, err, handler.ErrAccessDenied)
|
||||||
require.Contains(t, err.Error(), reason)
|
require.Contains(t, err.Error(), reason)
|
||||||
require.Contains(t, err.Error(), msg)
|
require.Contains(t, err.Error(), msg)
|
||||||
|
@ -38,7 +38,7 @@ func TestHandleObjectError(t *testing.T) {
|
||||||
inputErr := new(apistatus.ObjectAccessDenied)
|
inputErr := new(apistatus.ObjectAccessDenied)
|
||||||
inputErr.WriteReason(reason)
|
inputErr.WriteReason(reason)
|
||||||
|
|
||||||
err := handleObjectError(msg, inputErr)
|
err := handleStorageError(msg, inputErr)
|
||||||
require.ErrorIs(t, err, handler.ErrQuotaLimitReached)
|
require.ErrorIs(t, err, handler.ErrQuotaLimitReached)
|
||||||
require.Contains(t, err.Error(), reason)
|
require.Contains(t, err.Error(), reason)
|
||||||
require.Contains(t, err.Error(), msg)
|
require.Contains(t, err.Error(), msg)
|
||||||
|
@ -47,7 +47,7 @@ func TestHandleObjectError(t *testing.T) {
|
||||||
t.Run("simple timeout", func(t *testing.T) {
|
t.Run("simple timeout", func(t *testing.T) {
|
||||||
inputErr := errors.New("timeout")
|
inputErr := errors.New("timeout")
|
||||||
|
|
||||||
err := handleObjectError(msg, inputErr)
|
err := handleStorageError(msg, inputErr)
|
||||||
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
||||||
require.Contains(t, err.Error(), inputErr.Error())
|
require.Contains(t, err.Error(), inputErr.Error())
|
||||||
require.Contains(t, err.Error(), msg)
|
require.Contains(t, err.Error(), msg)
|
||||||
|
@ -58,7 +58,7 @@ func TestHandleObjectError(t *testing.T) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
|
||||||
err := handleObjectError(msg, ctx.Err())
|
err := handleStorageError(msg, ctx.Err())
|
||||||
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
||||||
require.Contains(t, err.Error(), ctx.Err().Error())
|
require.Contains(t, err.Error(), ctx.Err().Error())
|
||||||
require.Contains(t, err.Error(), msg)
|
require.Contains(t, err.Error(), msg)
|
||||||
|
@ -67,7 +67,7 @@ func TestHandleObjectError(t *testing.T) {
|
||||||
t.Run("grpc deadline exceeded", func(t *testing.T) {
|
t.Run("grpc deadline exceeded", func(t *testing.T) {
|
||||||
inputErr := fmt.Errorf("wrap grpc error: %w", status.Error(codes.DeadlineExceeded, "error"))
|
inputErr := fmt.Errorf("wrap grpc error: %w", status.Error(codes.DeadlineExceeded, "error"))
|
||||||
|
|
||||||
err := handleObjectError(msg, inputErr)
|
err := handleStorageError(msg, inputErr)
|
||||||
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
||||||
require.Contains(t, err.Error(), inputErr.Error())
|
require.Contains(t, err.Error(), inputErr.Error())
|
||||||
require.Contains(t, err.Error(), msg)
|
require.Contains(t, err.Error(), msg)
|
||||||
|
@ -76,7 +76,7 @@ func TestHandleObjectError(t *testing.T) {
|
||||||
t.Run("unknown error", func(t *testing.T) {
|
t.Run("unknown error", func(t *testing.T) {
|
||||||
inputErr := errors.New("unknown error")
|
inputErr := errors.New("unknown error")
|
||||||
|
|
||||||
err := handleObjectError(msg, inputErr)
|
err := handleStorageError(msg, inputErr)
|
||||||
require.ErrorIs(t, err, inputErr)
|
require.ErrorIs(t, err, inputErr)
|
||||||
require.Contains(t, err.Error(), msg)
|
require.Contains(t, err.Error(), msg)
|
||||||
})
|
})
|
||||||
|
|
|
@ -64,7 +64,7 @@ func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([
|
||||||
|
|
||||||
nodes, err := w.p.GetNodes(qostagging.ContextWithIOTag(ctx, clientIOTag), poolPrm)
|
nodes, err := w.p.GetNodes(qostagging.ContextWithIOTag(ctx, clientIOTag), poolPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, handleError(err)
|
return nil, handleTreeError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res := make([]tree.NodeResponse, len(nodes))
|
res := make([]tree.NodeResponse, len(nodes))
|
||||||
|
@ -83,7 +83,7 @@ func getBearer(ctx context.Context) []byte {
|
||||||
return token.Marshal()
|
return token.Marshal()
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleError(err error) error {
|
func handleTreeError(err error) error {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -123,7 +123,7 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
|
|
||||||
subTreeReader, err := w.p.GetSubTree(qostagging.ContextWithIOTag(ctx, clientIOTag), poolPrm)
|
subTreeReader, err := w.p.GetSubTree(qostagging.ContextWithIOTag(ctx, clientIOTag), poolPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, handleError(err)
|
return nil, handleTreeError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var subtree []tree.NodeResponse
|
var subtree []tree.NodeResponse
|
||||||
|
@ -134,7 +134,7 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
node, err = subTreeReader.Next()
|
node, err = subTreeReader.Next()
|
||||||
}
|
}
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
return nil, handleError(err)
|
return nil, handleTreeError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return subtree, nil
|
return subtree, nil
|
||||||
|
|
17
tree/tree.go
17
tree/tree.go
|
@ -8,14 +8,18 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
Tree struct {
|
Tree struct {
|
||||||
service ServiceClient
|
service ServiceClient
|
||||||
|
log *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceClient is a client to interact with tree service.
|
// ServiceClient is a client to interact with tree service.
|
||||||
|
@ -73,8 +77,8 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewTree creates instance of Tree using provided address and create grpc connection.
|
// NewTree creates instance of Tree using provided address and create grpc connection.
|
||||||
func NewTree(service ServiceClient) *Tree {
|
func NewTree(service ServiceClient, log *zap.Logger) *Tree {
|
||||||
return &Tree{service: service}
|
return &Tree{service: service, log: log}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Meta interface {
|
type Meta interface {
|
||||||
|
@ -257,6 +261,9 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return nil, layer.ErrNodeNotFound
|
return nil, layer.ErrNodeNotFound
|
||||||
}
|
}
|
||||||
|
if len(nodes) != 1 {
|
||||||
|
c.reqLogger(ctx).Warn(logs.FoundSeveralSystemTreeNodes, zap.String("name", name), logs.TagField(logs.TagExternalStorageTree))
|
||||||
|
}
|
||||||
|
|
||||||
return newMultiNode(nodes)
|
return newMultiNode(nodes)
|
||||||
}
|
}
|
||||||
|
@ -296,7 +303,7 @@ func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if targetIndexNode == -1 {
|
if targetIndexNode == -1 {
|
||||||
return nil, layer.ErrNodeNotFound
|
return nil, fmt.Errorf("latest version: %w", layer.ErrNodeNotFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nodes[targetIndexNode], nil
|
return nodes[targetIndexNode], nil
|
||||||
|
@ -423,6 +430,10 @@ func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, tr
|
||||||
return intermediateNodes, nil
|
return intermediateNodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
|
||||||
|
return utils.GetReqLogOrDefault(ctx, c.log)
|
||||||
|
}
|
||||||
|
|
||||||
func GetFilename(node NodeResponse) string {
|
func GetFilename(node NodeResponse) string {
|
||||||
for _, kv := range node.GetMeta() {
|
for _, kv := range node.GetMeta() {
|
||||||
if kv.GetKey() == FileNameKey {
|
if kv.GetKey() == FileNameKey {
|
||||||
|
|
Loading…
Add table
Reference in a new issue
We can't seem to get a status of 409 when downloading the archive.