[#137] Add index page support
All checks were successful
/ DCO (pull_request) Successful in 56s
/ Builds (pull_request) Successful in 54s
/ Vulncheck (pull_request) Successful in 1m20s
/ Lint (pull_request) Successful in 2m11s
/ Tests (pull_request) Successful in 1m4s

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
Nikita Zinkevich 2024-09-20 15:09:44 +03:00
parent 843708a558
commit 26b36e442c
17 changed files with 704 additions and 87 deletions

View file

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/signal"
@ -90,6 +91,9 @@ type (
defaultTimestamp bool
zipCompression bool
clientCut bool
returnIndexPage bool
indexPageTemplatePath string
indexPageTemplate string
bufferMaxSizeForPut uint64
namespaceHeader string
defaultNamespaces []string
@ -154,6 +158,7 @@ func newApp(ctx context.Context, opt ...Option) App {
a.initResolver()
a.initMetrics()
a.initTracing(ctx)
a.loadIndexPageTemplate()
return a
}
@ -176,12 +181,67 @@ func (s *appSettings) ZipCompression() bool {
return s.zipCompression
}
func (s *appSettings) IndexPageEnabled() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.returnIndexPage
}
func (s *appSettings) IndexPageTemplatePath() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.indexPageTemplatePath
}
func (s *appSettings) IndexPageTemplate() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.indexPageTemplate
}
func (s *appSettings) setZipCompression(val bool) {
s.mu.Lock()
s.zipCompression = val
s.mu.Unlock()
}
func (s *appSettings) setReturnIndexPage(val bool) {
s.mu.Lock()
s.returnIndexPage = val
s.mu.Unlock()
}
func (s *appSettings) setIndexTemplatePath(val string) {
s.mu.Lock()
s.indexPageTemplatePath = val
s.mu.Unlock()
}
func (s *appSettings) setIndexTemplate(val string) {
s.mu.Lock()
s.indexPageTemplate = val
s.mu.Unlock()
}
func (a *app) loadIndexPageTemplate() {
if !a.settings.IndexPageEnabled() {
return
}
reader, err := os.Open(a.settings.IndexPageTemplatePath())
if err != nil {
a.settings.setIndexTemplate("")
a.log.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err))
return
}
tmpl, err := io.ReadAll(reader)
if err != nil {
a.settings.setIndexTemplate("")
a.log.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err))
return
}
a.settings.setIndexTemplate(string(tmpl))
}
func (s *appSettings) ClientCut() bool {
s.mu.RLock()
defer s.mu.RUnlock()
@ -490,6 +550,7 @@ func (a *app) configReload(ctx context.Context) {
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.initTracing(ctx)
a.loadIndexPageTemplate()
a.setHealthStatus()
a.log.Info(logs.SIGHUPConfigReloadCompleted)
@ -498,6 +559,8 @@ func (a *app) configReload(ctx context.Context) {
func (a *app) updateSettings() {
a.settings.setDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp))
a.settings.setZipCompression(a.cfg.GetBool(cfgZipCompression))
a.settings.setReturnIndexPage(a.cfg.GetBool(cfgIndexPageEnabled))
a.settings.setIndexTemplatePath(a.cfg.GetString(cfgIndexPageTemplatePath))
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
a.settings.setNamespaceHeader(a.cfg.GetString(cfgResolveNamespaceHeader))

View file

@ -60,6 +60,9 @@ const (
cfgReconnectInterval = "reconnect_interval"
cfgIndexPageEnabled = "index_page.enabled"
cfgIndexPageTemplatePath = "index_page.template_path"
// Web.
cfgWebReadBufferSize = "web.read_buffer_size"
cfgWebWriteBufferSize = "web.write_buffer_size"
@ -191,6 +194,9 @@ func settings() *viper.Viper {
// pool:
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
v.SetDefault(cfgIndexPageEnabled, false)
v.SetDefault(cfgIndexPageTemplatePath, "")
// frostfs:
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)

View file

@ -101,6 +101,11 @@ request_timeout: 5s # Timeout to check node health during rebalance.
rebalance_timer: 30s # Interval to check nodes health.
pool_error_threshold: 100 # The number of errors on connection after which node is considered as unhealthy.
# Enable index page to see objects list for specified container and prefix
index_page:
enabled: false
template_path: /templates/index.gotmpl
zip:
compression: false # Enable zip compression to download files by common prefix.

View file

@ -95,12 +95,12 @@ The `filename` field from the multipart form will be set as `FileName` attribute
## Get object
Route: `/get/{cid}/{oid}?[download=true]`
Route: `/get/{cid}/{oid}?[download=false]`
| Route parameter | Type | Description |
|-----------------|--------|------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `cid` | Single | Base58 encoded container ID or container name from NNS. |
| `oid` | Single | Base58 encoded object ID. |
| `cid` | Single | Base58 encoded `container ID` or `container name` from NNS or `bucket name`. |
| `oid` | Single | Base58 encoded `object ID`. Also could be `S3 object name` if `cid` is specified as bucket name. |
| `download` | Query | Set the `Content-Disposition` header as `attachment` in response.<br/> This make the browser to download object as file instead of showing it on the page. |
### Methods
@ -141,6 +141,13 @@ Get an object (payload and attributes) by an address.
| 400 | Some error occurred during object downloading. |
| 404 | Container or object not found. |
###### Body
Returns object data. If request performed from browser, either displays raw data or downloads it as
attachment if `download` query parameter is set to `true`.
If `index_page.enabled` is set to `true`, returns HTML with index-page if no object with specified
S3-name was found.
#### HEAD
Get an object attributes by an address.

View file

@ -57,6 +57,7 @@ $ cat http.log
| `frostfs` | [Frostfs configuration](#frostfs-section) |
| `cache` | [Cache configuration](#cache-section) |
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
| `index_page` | [Index page configuration](#index_page-section) |
# General section
@ -76,7 +77,7 @@ reconnect_interval: 1m
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|------------------------|------------|---------------|----------------|------------------------------------------------------------------------------------|
|------------------------|------------|---------------|---------------|-------------------------------------------------------------------------------------------------|
| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. |
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. |
| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. |
@ -336,3 +337,18 @@ resolve_bucket:
|----------------------|------------|---------------|-----------------------|--------------------------------------------------------------------------------------------------------------------------|
| `namespace_header` | `string` | yes | `X-Frostfs-Namespace` | Header to determine zone to resolve bucket name. |
| `default_namespaces` | `[]string` | yes | ["","root"] | Namespaces that should be handled as default. |
# `index_page` section
Parameters for index HTML-page output with S3-bucket or S3-subdir content for `Get object` request
```yaml
index_page:
enabled: false
template_path: ""
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|-----------------|----------|---------------|---------------|---------------------------------------------------------------------------------|
| `enabled` | `bool` | yes | `false` | Flag to enable index_page return if no object with specified S3-name was found. |
| `template_path` | `string` | yes | `""` | Path to .gotmpl file with html template for index_page. |

2
go.mod
View file

@ -8,6 +8,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240718141740-ce8270568d36
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
github.com/bluele/gcache v0.0.2
github.com/docker/go-units v0.4.0
github.com/fasthttp/router v1.4.1
github.com/nspcc-dev/neo-go v0.106.2
github.com/prometheus/client_golang v1.19.0
@ -49,7 +50,6 @@ require (
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v20.10.14+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect

View file

@ -8,6 +8,7 @@ import (
type NodeVersion struct {
BaseNodeVersion
DeleteMarker bool
IsPrefixNode bool
}
// BaseNodeVersion is minimal node info from tree service.

View file

@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
@ -15,16 +17,16 @@ type GetNodeByPathResponseInfoWrapper struct {
response *grpcService.GetNodeByPathResponse_Info
}
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() []uint64 {
return []uint64{n.response.GetNodeId()}
}
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
return n.response.GetParentId()
func (n GetNodeByPathResponseInfoWrapper) GetParentID() []uint64 {
return []uint64{n.response.GetParentId()}
}
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() []uint64 {
return []uint64{n.response.GetTimestamp()}
}
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
@ -89,3 +91,73 @@ func handleError(err error) error {
return err
}
func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]tree.NodeResponse, error) {
order := treepool.NoneOrder
if sort {
order = treepool.AscendingOrder
}
poolPrm := treepool.GetSubTreeParams{
CID: bktInfo.CID,
TreeID: treeID,
RootID: rootID,
Depth: depth,
BearerToken: getBearer(ctx),
Order: order,
}
if len(rootID) == 1 && rootID[0] == 0 {
// storage node interprets 'nil' value as []uint64{0}
// gate wants to send 'nil' value instead of []uint64{0}, because
// it provides compatibility with previous tree service api where
// single uint64(0) value is dropped from signature
poolPrm.RootID = nil
}
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
}
var subtree []tree.NodeResponse
node, err := subTreeReader.Next()
for err == nil {
subtree = append(subtree, GetSubTreeResponseBodyWrapper{node})
node, err = subTreeReader.Next()
}
if err != nil && err != io.EOF {
return nil, handleError(err)
}
return subtree, nil
}
type GetSubTreeResponseBodyWrapper struct {
response *grpcService.GetSubTreeResponse_Body
}
func (n GetSubTreeResponseBodyWrapper) GetNodeID() []uint64 {
return n.response.GetNodeId()
}
func (n GetSubTreeResponseBodyWrapper) GetParentID() []uint64 {
resp := n.response.GetParentId()
if resp == nil {
// storage sends nil that should be interpreted as []uint64{0}
// due to protobuf compatibility, see 'GetSubTree' function
return []uint64{0}
}
return resp
}
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() []uint64 {
return n.response.GetTimestamp()
}
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
}
return res
}

139
internal/handler/browse.go Normal file
View file

@ -0,0 +1,139 @@
package handler
import (
"cmp"
_ "embed"
"html/template"
"net/url"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"github.com/docker/go-units"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
const (
dateFormat = "02-01-2006 15:04"
attrOID, attrCreated, attrFileName, attrSize = "OID", "Created", "FileName", "Size"
)
type (
BrowsePageData struct {
BucketName,
Prefix string
Objects []ResponseObject
}
ResponseObject struct {
OID string
Created string
FileName string
Size string
}
)
//go:embed templates/index.gotmpl
var defaultTemplate string
func parseTimestamp(tstamp string) (time.Time, error) {
millis, err := strconv.ParseInt(tstamp, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.UnixMilli(millis), nil
}
func NewResponseObject(nodes map[string]string) ResponseObject {
return ResponseObject{
OID: nodes[attrOID],
Created: nodes[attrCreated],
FileName: nodes[attrFileName],
Size: nodes[attrSize],
}
}
func formatTimestamp(strdate string) string {
date, err := parseTimestamp(strdate)
if err != nil || date.IsZero() {
return ""
}
return date.Format(dateFormat)
}
func formatSize(strsize string) string {
size, err := strconv.ParseFloat(strsize, 64)
if err != nil {
return ""
}
return units.HumanSize(size)
}
func trimPrefix(prefix string) string {
slashIndex := strings.LastIndex(prefix, "/")
if slashIndex == -1 {
return ""
}
return prefix[:slashIndex]
}
func urlencode(prefix string) string {
escaped := url.QueryEscape(prefix)
replaced := strings.ReplaceAll(escaped, ".", "%2E")
return replaced
}
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, bucketName, prefix string) {
var log = h.log.With(zap.String("bucket", bucketName))
ctx := utils.GetContextFromRequest(c)
nodes, err := h.listObjects(ctx, bucketName, prefix)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
respObjects := make([]ResponseObject, len(nodes))
for i, node := range nodes {
respObjects[i] = NewResponseObject(node)
}
slices.SortFunc(respObjects, func(a, b ResponseObject) int {
aIsDir := a.Size == ""
bIsDir := b.Size == ""
// prefix objects go first
if aIsDir && !bIsDir {
return -1
} else if !aIsDir && bIsDir {
return 1
}
return cmp.Compare(a.FileName, b.FileName)
})
indexTemplate := h.config.IndexPageTemplate()
if indexTemplate == "" {
indexTemplate = defaultTemplate
}
tmpl, err := template.New("index").Funcs(template.FuncMap{
"formatTimestamp": formatTimestamp,
"formatSize": formatSize,
"trimPrefix": trimPrefix,
"urlencode": urlencode,
}).Parse(indexTemplate)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
if err = tmpl.Execute(c, &BrowsePageData{
BucketName: bucketName,
Prefix: prefix,
Objects: respObjects,
}); err != nil {
logAndSendBucketError(c, log, err)
return
}
}

View file

@ -30,6 +30,9 @@ type Config interface {
DefaultTimestamp() bool
ZipCompression() bool
ClientCut() bool
IndexPageEnabled() bool
IndexPageTemplatePath() string
IndexPageTemplate() string
BufferMaxSizeForPut() uint64
NamespaceHeader() string
}
@ -208,41 +211,53 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it.
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
var (
bucketname = req.UserValue("cid").(string)
key = req.UserValue("oid").(string)
bucketname = c.UserValue("cid").(string)
key = c.UserValue("oid").(string)
download = c.Request.URI().QueryArgs().GetBool("download")
log = h.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
)
unescapedKey, err := url.QueryUnescape(key)
if err != nil {
logAndSendBucketError(req, log, err)
logAndSendBucketError(c, log, err)
return
}
ctx := utils.GetContextFromRequest(req)
ctx := utils.GetContextFromRequest(c)
bktInfo, err := h.getBucketInfo(ctx, bucketname, log)
if err != nil {
logAndSendBucketError(req, log, err)
logAndSendBucketError(c, log, err)
return
}
needIndexPage := h.config.IndexPageEnabled()
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, unescapedKey)
if err != nil {
if errors.Is(err, tree.ErrNodeAccessDenied) {
response.Error(req, "Access Denied", fasthttp.StatusForbidden)
response.Error(c, "Access Denied", fasthttp.StatusForbidden)
} else if needIndexPage && string(c.Method()) != fasthttp.MethodHead {
h.browseObjects(c, bucketname, unescapedKey)
} else {
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
response.Error(c, "object wasn't found", fasthttp.StatusNotFound)
}
return
}
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
checkForNodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bktInfo, unescapedKey, true)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
if needIndexPage && !download && len(checkForNodes) > 1 {
h.browseObjects(c, bucketname, unescapedKey)
return
}
if foundOid.DeleteMarker {
log.Error(logs.ObjectWasDeleted)
response.Error(req, "object deleted", fasthttp.StatusNotFound)
response.Error(c, "object deleted", fasthttp.StatusNotFound)
return
}
@ -250,7 +265,7 @@ func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context,
addr.SetContainer(bktInfo.CID)
addr.SetObject(foundOid.OID)
f(ctx, *h.newRequest(req, log), addr)
f(ctx, *h.newRequest(c, log), addr)
}
// byAttribute is a wrapper similar to byAddress.
@ -379,3 +394,33 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
return bktInfo, err
}
func (h *Handler) listObjects(ctx context.Context, bucketName, prefix string) ([]map[string]string, error) {
var (
log = h.log.With(zap.String("bucket", bucketName))
)
bucketInfo, err := h.getBucketInfo(ctx, bucketName, log)
if err != nil {
return nil, err
}
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
if err != nil {
return nil, err
}
var objects = make([]map[string]string, 0, len(nodes))
for _, node := range nodes {
meta := node.GetMeta()
node.GetMeta()
if meta == nil {
continue
}
var obj = make(map[string]string, len(meta))
for _, m := range meta {
obj[m.GetKey()] = string(m.GetValue())
}
objects = append(objects, obj)
}
return objects, nil
}

View file

@ -12,6 +12,7 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
@ -37,6 +38,10 @@ func (t *treeClientMock) GetNodes(context.Context, *tree.GetNodesParams) ([]tree
return nil, nil
}
func (t *treeClientMock) GetSubTree(context.Context, *data.BucketInfo, string, []uint64, uint32, bool) ([]tree.NodeResponse, error) {
return nil, nil
}
type configMock struct {
}
@ -48,6 +53,17 @@ func (c *configMock) ZipCompression() bool {
return false
}
func (c *configMock) IndexPageEnabled() bool {
return false
}
func (c *configMock) IndexPageTemplatePath() string {
return ""
}
func (c *configMock) IndexPageTemplate() string {
return ""
}
func (c *configMock) ClientCut() bool {
return false
}

View file

@ -0,0 +1,77 @@
{{$bucketName := .BucketName}}
{{ $prefix := trimPrefix .Prefix }}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8"/>
<title>Index of s3://{{$bucketName}}{{if $prefix}}/{{$prefix}}/{{end}}</title>
<style>
table {
width: 80%;
border-collapse: collapse;
}
body {
background: #f2f2f2;
}
table, th, td {
border: 0 solid transparent;
}
th, td {
padding: 10px;
text-align: left;
}
th {
background-color: #c3bcbc;
}
tr:nth-child(even) {background-color: #ebe7e7;}
</style>
</head>
<body>
<h1>Index of s3://{{$bucketName}}/{{if $prefix}}{{$prefix}}/{{end}}</h1>
<table>
<thead>
<tr>
<th>Filename</th>
<th>Size</th>
<th>Created</th>
<th>Download</th>
</tr>
</thead>
<tbody>
{{if $prefix}}
<tr>
<td><a href="../">../</a></td>
<td></td>
<td></td>
</tr>
{{else}}
<tr>
<td><a href="/get/{{ $bucketName}}/">../</a></td>
<td></td>
<td></td>
</tr>
{{end}}
{{range .Objects}}
{{if .FileName}}
<tr>
<td>
<a href="/get/{{ $bucketName}}/{{ $prefix}}{{if $prefix}}/{{end}}{{ .FileName}}{{if not .Size}}/{{end}}">
{{.FileName}}{{if not .Size}}/{{end}}
</a>
</td>
<td>{{ formatSize .Size }}</td>
<td>{{ formatTimestamp .Created }}</td>
<td>
{{ if .Size }}
<a href="/get/{{ $bucketName}}/{{ $prefix}}{{if $prefix}}/{{end}}{{ .FileName}}?download=true">
Link
</a>
{{ end }}
</td>
</tr>
{{end}}
{{end}}
</tbody>
</table>
</body>
</html>

View file

@ -43,22 +43,22 @@ func (pr *putResponse) encode(w io.Writer) error {
}
// Upload handles multipart upload request.
func (h *Handler) Upload(req *fasthttp.RequestCtx) {
func (h *Handler) Upload(c *fasthttp.RequestCtx) {
var (
file MultipartFile
idObj oid.ID
addr oid.Address
scid, _ = req.UserValue("cid").(string)
scid, _ = c.UserValue("cid").(string)
log = h.log.With(zap.String("cid", scid))
bodyStream = req.RequestBodyStream()
bodyStream = c.RequestBodyStream()
drainBuf = make([]byte, drainBufSize)
)
ctx := utils.GetContextFromRequest(req)
ctx := utils.GetContextFromRequest(c)
bktInfo, err := h.getBucketInfo(ctx, scid, log)
if err != nil {
logAndSendBucketError(req, log, err)
logAndSendBucketError(c, log, err)
return
}
@ -75,21 +75,21 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
zap.Error(err),
)
}()
boundary := string(req.Request.Header.MultipartFormBoundary())
boundary := string(c.Request.Header.MultipartFormBoundary())
if file, err = fetchMultipartFile(h.log, bodyStream, boundary); err != nil {
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err))
response.Error(req, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
return
}
filtered, err := filterHeaders(h.log, &req.Request.Header)
filtered, err := filterHeaders(h.log, &c.Request.Header)
if err != nil {
log.Error(logs.CouldNotProcessHeaders, zap.Error(err))
response.Error(req, err.Error(), fasthttp.StatusBadRequest)
response.Error(c, err.Error(), fasthttp.StatusBadRequest)
return
}
now := time.Now()
if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err))
} else {
@ -97,9 +97,9 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
}
}
if err = utils.PrepareExpirationHeader(req, h.frostfs, filtered, now); err != nil {
if err = utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil {
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
response.Error(req, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
response.Error(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
return
}
@ -143,7 +143,7 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
}
if idObj, err = h.frostfs.CreateObject(ctx, prm); err != nil {
h.handlePutFrostFSErr(req, err)
h.handlePutFrostFSErr(c, err)
return
}
@ -151,9 +151,9 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
addr.SetContainer(bktInfo.CID)
// Try to return the response, otherwise, if something went wrong, throw an error.
if err = newPutResponse(addr).encode(req); err != nil {
if err = newPutResponse(addr).encode(c); err != nil {
log.Error(logs.CouldNotEncodeResponse, zap.Error(err))
response.Error(req, "could not encode response", fasthttp.StatusBadRequest)
response.Error(c, "could not encode response", fasthttp.StatusBadRequest)
return
}
@ -170,8 +170,8 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
}
}
// Report status code and content type.
req.Response.SetStatusCode(fasthttp.StatusOK)
req.Response.Header.SetContentType(jsonHeader)
c.Response.SetStatusCode(fasthttp.StatusOK)
c.Response.Header.SetContentType(jsonHeader)
}
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error) {

View file

@ -31,6 +31,7 @@ const (
CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go
AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go
FailedToReadIndexPageTemplate = "failed to read index page template, set default" // Warn in ../../app.go
ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go
MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go
NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun = "no wallet path specified, creating ephemeral key automatically for this run" // Info in ../../app.go

View file

@ -52,8 +52,8 @@ func BearerTokenFromCookie(h *fasthttp.RequestHeader) []byte {
// StoreBearerTokenAppCtx extracts a bearer token from the header or cookie and stores
// it in the application context.
func StoreBearerTokenAppCtx(ctx context.Context, req *fasthttp.RequestCtx) (context.Context, error) {
tkn, err := fetchBearerToken(req)
func StoreBearerTokenAppCtx(ctx context.Context, c *fasthttp.RequestCtx) (context.Context, error) {
tkn, err := fetchBearerToken(c)
if err != nil {
return nil, err
}

View file

@ -2,11 +2,13 @@ package tree
import (
"context"
"errors"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
@ -20,6 +22,7 @@ type (
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]NodeResponse, error)
}
treeNode struct {
@ -29,6 +32,7 @@ type (
GetNodesParams struct {
CnrID cid.ID
BktInfo *data.BucketInfo
TreeID string
Path []string
Meta []string
@ -54,6 +58,7 @@ const (
// keys for delete marker nodes.
isDeleteMarkerKV = "IsDeleteMarker"
sizeKV = "Size"
// versionTree -- ID of a tree with object versions.
versionTree = "version"
@ -73,26 +78,28 @@ type Meta interface {
type NodeResponse interface {
GetMeta() []Meta
GetTimestamp() uint64
GetTimestamp() []uint64
GetNodeID() []uint64
GetParentID() []uint64
}
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
treeNode := &treeNode{
tNode := &treeNode{
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
}
for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() {
case oidKV:
if err := treeNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
if err := tNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
return nil, err
}
default:
treeNode.Meta[kv.GetKey()] = string(kv.GetValue())
tNode.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
return treeNode, nil
return tNode, nil
}
func (n *treeNode) Get(key string) (string, bool) {
@ -106,29 +113,44 @@ func (n *treeNode) FileName() (string, bool) {
}
func newNodeVersion(node NodeResponse) (*api.NodeVersion, error) {
treeNode, err := newTreeNode(node)
tNode, err := newTreeNode(node)
if err != nil {
return nil, fmt.Errorf("invalid tree node: %w", err)
}
return newNodeVersionFromTreeNode(treeNode), nil
return newNodeVersionFromTreeNode(tNode), nil
}
func newNodeVersionFromTreeNode(treeNode *treeNode) *api.NodeVersion {
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
size, _ := treeNode.Get(sizeKV)
version := &api.NodeVersion{
BaseNodeVersion: api.BaseNodeVersion{
OID: treeNode.ObjID,
},
DeleteMarker: isDeleteMarker,
IsPrefixNode: size == "",
}
return version
}
func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*api.NodeVersion, error) {
meta := []string{oidKV, isDeleteMarkerKV}
nodes, err := c.GetVersions(ctx, cnrID, objectName)
if err != nil {
return nil, err
}
latestNode, err := getLatestVersionNode(nodes)
if err != nil {
return nil, err
}
return newNodeVersion(latestNode)
}
func (c *Tree) GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]NodeResponse, error) {
meta := []string{oidKV, isDeleteMarkerKV, sizeKV}
path := pathFromName(objectName)
p := &GetNodesParams{
@ -139,30 +161,24 @@ func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName s
LatestOnly: false,
AllAttrs: false,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
latestNode, err := getLatestNode(nodes)
if err != nil {
return nil, err
}
return newNodeVersion(latestNode)
return c.service.GetNodes(ctx, p)
}
func getLatestNode(nodes []NodeResponse) (NodeResponse, error) {
func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
var (
maxCreationTime uint64
targetIndexNode = -1
)
for i, node := range nodes {
currentCreationTime := node.GetTimestamp()
if checkExistOID(node.GetMeta()) && currentCreationTime > maxCreationTime {
maxCreationTime = currentCreationTime
if !checkExistOID(node.GetMeta()) {
continue
}
if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime {
targetIndexNode = i
maxCreationTime = currentCreationTime
}
}
@ -187,3 +203,149 @@ func checkExistOID(meta []Meta) bool {
func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
}
func (c *Tree) GetSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, versionTree, prefix)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
subTree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, rootID, 2, false)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil
}
return nil, "", err
}
nodesMap := make(map[string][]NodeResponse, len(subTree))
for _, node := range subTree {
if MultiID(rootID).Equal(node.GetNodeID()) {
continue
}
fileName := getFilename(node)
if !strings.HasPrefix(fileName, tailPrefix) {
continue
}
nodes := nodesMap[fileName]
// Add all nodes if flag latestOnly is false.
// Add all intermediate nodes
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
if len(nodes) == 0 {
nodes = []NodeResponse{node}
} else if !latestOnly || isIntermediate(node) {
nodes = append(nodes, node)
} else if isIntermediate(nodes[0]) {
nodes = append([]NodeResponse{node}, nodes...)
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
nodes[0] = node
}
nodesMap[fileName] = nodes
}
result := make([]NodeResponse, 0, len(subTree))
for _, nodes := range nodesMap {
result = append(result, nodes...)
}
return result, strings.TrimSuffix(prefix, tailPrefix), nil
}
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) ([]uint64, string, error) {
rootID := []uint64{0}
path := strings.Split(prefix, separator)
tailPrefix := path[len(path)-1]
if len(path) > 1 {
var err error
rootID, err = c.getPrefixNodeID(ctx, bktInfo, treeID, path[:len(path)-1])
if err != nil {
return nil, "", err
}
}
return rootID, tailPrefix, nil
}
func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, treeID string, prefixPath []string) ([]uint64, error) {
p := &GetNodesParams{
CnrID: bktInfo.CID,
BktInfo: bktInfo,
TreeID: treeID,
Path: prefixPath,
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
var intermediateNodes []uint64
for _, node := range nodes {
if isIntermediate(node) {
intermediateNodes = append(intermediateNodes, node.GetNodeID()...)
}
}
if len(intermediateNodes) == 0 {
return nil, layer.ErrNodeNotFound
}
return intermediateNodes, nil
}
func getFilename(node NodeResponse) string {
for _, kv := range node.GetMeta() {
if kv.GetKey() == FileNameKey {
return string(kv.GetValue())
}
}
return ""
}
func isIntermediate(node NodeResponse) bool {
if len(node.GetMeta()) != 1 {
return false
}
return node.GetMeta()[0].GetKey() == FileNameKey
}
func getMaxTimestamp(node NodeResponse) uint64 {
var maxTimestamp uint64
for _, timestamp := range node.GetTimestamp() {
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}
return maxTimestamp
}
type MultiID []uint64
func (m MultiID) Equal(id MultiID) bool {
seen := make(map[uint64]struct{}, len(m))
for i := range m {
seen[m[i]] = struct{}{}
}
for i := range id {
if _, ok := seen[id[i]]; !ok {
return false
}
}
return true
}

View file

@ -24,8 +24,8 @@ type nodeResponse struct {
timestamp uint64
}
func (n nodeResponse) GetTimestamp() uint64 {
return n.timestamp
func (n nodeResponse) GetTimestamp() []uint64 {
return []uint64{n.timestamp}
}
func (n nodeResponse) GetMeta() []Meta {
@ -36,6 +36,13 @@ func (n nodeResponse) GetMeta() []Meta {
return res
}
func (n nodeResponse) GetNodeID() []uint64 {
return nil
}
func (n nodeResponse) GetParentID() []uint64 {
return nil
}
func TestGetLatestNode(t *testing.T) {
for _, tc := range []struct {
name string
@ -130,7 +137,7 @@ func TestGetLatestNode(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
actualNode, err := getLatestNode(tc.nodes)
actualNode, err := getLatestVersionNode(tc.nodes)
if tc.error {
require.Error(t, err)
return