[#151] index page: Add browse via native protocol

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
Nikita Zinkevich 2024-10-10 11:59:53 +03:00
parent 821f8c2248
commit 181568f442
Signed by: nzinkevich
GPG key ID: 748EA1D0B2E6420A
13 changed files with 394 additions and 115 deletions

View file

@ -40,6 +40,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@ -96,6 +97,7 @@ type (
clientCut bool clientCut bool
returnIndexPage bool returnIndexPage bool
indexPageTemplate string indexPageTemplate string
workerPoolSize int
bufferMaxSizeForPut uint64 bufferMaxSizeForPut uint64
namespaceHeader string namespaceHeader string
defaultNamespaces []string defaultNamespaces []string
@ -184,6 +186,7 @@ func (a *app) initAppSettings() {
a.settings = &appSettings{ a.settings = &appSettings{
reconnectInterval: fetchReconnectInterval(a.cfg), reconnectInterval: fetchReconnectInterval(a.cfg),
dialerSource: getDialerSource(a.log, a.cfg), dialerSource: getDialerSource(a.log, a.cfg),
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize),
} }
a.settings.update(a.cfg, a.log) a.settings.update(a.cfg, a.log)
} }
@ -490,7 +493,13 @@ func (a *app) setHealthStatus() {
} }
func (a *app) Serve() { func (a *app) Serve() {
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool))) workerPool := a.initWorkerPool()
defer func() {
workerPool.Release()
close(a.webDone)
}()
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool)
// Configure router. // Configure router.
a.configureRouter(handler) a.configureRouter(handler)
@ -532,8 +541,14 @@ LOOP:
a.metrics.Shutdown() a.metrics.Shutdown()
a.stopServices() a.stopServices()
a.shutdownTracing() a.shutdownTracing()
}
close(a.webDone) func (a *app) initWorkerPool() *ants.Pool {
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
if err != nil {
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
}
return workerPool
} }
func (a *app) shutdownTracing() { func (a *app) shutdownTracing() {
@ -609,6 +624,7 @@ func (a *app) stopServices() {
svc.ShutDown(ctx) svc.ShutDown(ctx)
} }
} }
func (a *app) configureRouter(handler *handler.Handler) { func (a *app) configureRouter(handler *handler.Handler) {
r := router.New() r := router.New()
r.RedirectTrailingSlash = true r.RedirectTrailingSlash = true

View file

@ -70,6 +70,7 @@ const (
cfgIndexPageEnabled = "index_page.enabled" cfgIndexPageEnabled = "index_page.enabled"
cfgIndexPageTemplatePath = "index_page.template_path" cfgIndexPageTemplatePath = "index_page.template_path"
cfgWorkerPoolSize = "worker_pool_size"
// Web. // Web.
cfgWebReadBufferSize = "web.read_buffer_size" cfgWebReadBufferSize = "web.read_buffer_size"
@ -227,9 +228,6 @@ func settings() *viper.Viper {
// pool: // pool:
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold) v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
v.SetDefault(cfgIndexPageEnabled, false)
v.SetDefault(cfgIndexPageTemplatePath, "")
// frostfs: // frostfs:
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut) v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
@ -241,6 +239,7 @@ func settings() *viper.Viper {
v.SetDefault(cfgWebStreamRequestBody, true) v.SetDefault(cfgWebStreamRequestBody, true)
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize) v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
v.SetDefault(cfgWorkerPoolSize, 1000)
// upload header // upload header
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false) v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)

View file

@ -75,18 +75,21 @@ request_timeout: 5s
rebalance_timer: 30s rebalance_timer: 30s
pool_error_threshold: 100 pool_error_threshold: 100
reconnect_interval: 1m reconnect_interval: 1m
worker_pool_size: 1000
``` ```
| Parameter | Type | SIGHUP reload | Default value | Description | | Parameter | Type | SIGHUP reload | Default value | Description |
|------------------------|------------|---------------|---------------|-------------------------------------------------------------------------------------------------| |------------------------|------------|---------------|---------------|------------------------------------------------------------------------------------|
| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. | | `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. |
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. | | `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. |
| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. | | `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. |
| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. | | `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. |
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. | | `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. | | `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. | | `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. | | `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
| `worker_pool_size` | `int` | no | `1000` | Maximum worker count in handler's worker pool. |
# `wallet` section # `wallet` section
@ -352,7 +355,12 @@ resolve_bucket:
# `index_page` section # `index_page` section
Parameters for index HTML-page output with S3-bucket or S3-subdir content for `Get object` request Parameters for index HTML-page output. Activates if `GetObject` request returns `not found`. Two
index page modes available:
* `s3` mode uses tree service for listing objects,
* `native` sends requests to nodes via native protocol.
If request pass S3-bucket name instead of CID, `s3` mode will be used, otherwise `native`.
```yaml ```yaml
index_page: index_page:

1
go.mod
View file

@ -12,6 +12,7 @@ require (
github.com/docker/go-units v0.4.0 github.com/docker/go-units v0.4.0
github.com/fasthttp/router v1.4.1 github.com/fasthttp/router v1.4.1
github.com/nspcc-dev/neo-go v0.106.2 github.com/nspcc-dev/neo-go v0.106.2
github.com/panjf2000/ants/v2 v2.5.0
github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.5.0 github.com/prometheus/client_model v0.5.0
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5

2
go.sum
View file

@ -682,6 +682,8 @@ github.com/opencontainers/selinux v1.6.0/go.mod h1:VVGKuOLlE7v4PJyT6h7mNWvq1rzqi
github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3ogry1nUQF8Evvo= github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3ogry1nUQF8Evvo=
github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8= github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8=
github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI=
github.com/panjf2000/ants/v2 v2.5.0 h1:1rWGWSnxCsQBga+nQbA4/iY6VMeNoOIAM0ZWh9u3q2Q=
github.com/panjf2000/ants/v2 v2.5.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc=
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU= github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=

View file

@ -1,15 +1,21 @@
package handler package handler
import ( import (
"context"
"html/template" "html/template"
"net/url" "net/url"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/docker/go-units" "github.com/docker/go-units"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
@ -25,19 +31,68 @@ const (
type ( type (
BrowsePageData struct { BrowsePageData struct {
BucketName, HasErrors bool
Prefix string Container string
Objects []ResponseObject Prefix string
Protocol string
Objects []ResponseObject
} }
ResponseObject struct { ResponseObject struct {
OID string OID string
Created string Created string
FileName string FileName string
FilePath string
Size string Size string
IsDir bool IsDir bool
GetURL string
} }
) )
func newListObjectsResponseS3(attrs map[string]string) ResponseObject {
return ResponseObject{
Created: formatTimestamp(attrs[attrCreated]),
OID: attrs[attrOID],
FileName: attrs[attrFileName],
Size: attrs[attrSize],
IsDir: attrs[attrOID] == "",
}
}
func newListObjectsResponseNative(attrs map[string]string) ResponseObject {
filename := lastPathElement(attrs[object.AttributeFilePath])
if filename == "" {
filename = attrs[attrFileName]
}
return ResponseObject{
OID: attrs[attrOID],
Created: formatTimestamp(attrs[object.AttributeTimestamp] + "000"),
FileName: filename,
FilePath: attrs[object.AttributeFilePath],
Size: attrs[attrSize],
IsDir: false,
}
}
func getNextDir(filepath, prefix string) string {
restPath := strings.Replace(filepath, prefix, "", 1)
index := strings.Index(restPath, "/")
if index == -1 {
return ""
}
return restPath[:index]
}
func lastPathElement(path string) string {
if path == "" {
return path
}
index := strings.LastIndex(path, "/")
if index == len(path)-1 {
index = strings.LastIndex(path[:index], "/")
}
return path[index+1:]
}
func parseTimestamp(tstamp string) (time.Time, error) { func parseTimestamp(tstamp string) (time.Time, error) {
millis, err := strconv.ParseInt(tstamp, 10, 64) millis, err := strconv.ParseInt(tstamp, 10, 64)
if err != nil { if err != nil {
@ -47,16 +102,6 @@ func parseTimestamp(tstamp string) (time.Time, error) {
return time.UnixMilli(millis), nil return time.UnixMilli(millis), nil
} }
func NewResponseObject(nodes map[string]string) ResponseObject {
return ResponseObject{
OID: nodes[attrOID],
Created: nodes[attrCreated],
FileName: nodes[attrFileName],
Size: nodes[attrSize],
IsDir: nodes[attrOID] == "",
}
}
func formatTimestamp(strdate string) string { func formatTimestamp(strdate string) string {
date, err := parseTimestamp(strdate) date, err := parseTimestamp(strdate)
if err != nil || date.IsZero() { if err != nil || date.IsZero() {
@ -94,12 +139,9 @@ func trimPrefix(encPrefix string) string {
return prefix[:slashIndex] return prefix[:slashIndex]
} }
func urlencode(prefix, filename string) string { func urlencode(path string) string {
var res strings.Builder var res strings.Builder
path := filename
if prefix != "" {
path = strings.Join([]string{prefix, filename}, "/")
}
prefixParts := strings.Split(path, "/") prefixParts := strings.Split(path, "/")
for _, prefixPart := range prefixParts { for _, prefixPart := range prefixParts {
prefixPart = "/" + url.PathEscape(prefixPart) prefixPart = "/" + url.PathEscape(prefixPart)
@ -112,46 +154,221 @@ func urlencode(prefix, filename string) string {
return res.String() return res.String()
} }
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, bucketInfo *data.BucketInfo, prefix string) { type GetObjectsResponse struct {
objects []ResponseObject
hasErrors bool
}
func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) {
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
if err != nil {
return nil, err
}
var result = &GetObjectsResponse{
objects: make([]ResponseObject, 0, len(nodes)),
hasErrors: false,
}
for _, node := range nodes {
meta := node.GetMeta()
if meta == nil {
continue
}
var attrs = make(map[string]string, len(meta))
for _, m := range meta {
attrs[m.GetKey()] = string(m.GetValue())
}
obj := newListObjectsResponseS3(attrs)
obj.FilePath = prefix + obj.FileName
obj.GetURL = "/get/" + bucketInfo.Name + urlencode(obj.FilePath)
result.objects = append(result.objects, obj)
}
return result, nil
}
func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) {
var basePath string
if ind := strings.LastIndex(prefix, "/"); ind != -1 {
basePath = prefix[:ind+1]
}
filters := object.NewSearchFilters()
filters.AddRootFilter()
if prefix != "" {
filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix)
}
prm := PrmObjectSearch{
PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx),
},
Container: bucketInfo.CID,
Filters: filters,
}
objectIDs, err := h.frostfs.SearchObjects(ctx, prm)
if err != nil {
return nil, err
}
defer objectIDs.Close()
resp, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
if err != nil {
return nil, err
}
log := utils.GetReqLogOrDefault(ctx, h.log)
dirs := make(map[string]struct{})
result := &GetObjectsResponse{
objects: make([]ResponseObject, 0, 100),
}
for objExt := range resp {
if objExt.Error != nil {
log.Error(logs.FailedToHeadObject, zap.Error(objExt.Error))
result.hasErrors = true
continue
}
if objExt.Object.IsDir {
if _, ok := dirs[objExt.Object.FileName]; ok {
continue
}
objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + urlencode(objExt.Object.FilePath)
dirs[objExt.Object.FileName] = struct{}{}
} else {
objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + "/" + objExt.Object.OID
}
result.objects = append(result.objects, objExt.Object)
}
return result, nil
}
type ResponseObjectExtended struct {
Object ResponseObject
Error error
}
func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (<-chan ResponseObjectExtended, error) {
res := make(chan ResponseObjectExtended)
go func() {
defer close(res)
log := utils.GetReqLogOrDefault(ctx, h.log).With(
zap.String("cid", cnrID.EncodeToString()),
zap.String("path", basePath),
)
var wg sync.WaitGroup
err := objectIDs.Iterate(func(id oid.ID) bool {
wg.Add(1)
err := h.workerPool.Submit(func() {
defer wg.Done()
var obj ResponseObjectExtended
obj.Object, obj.Error = h.headDirObject(ctx, cnrID, id, basePath)
res <- obj
})
if err != nil {
wg.Done()
log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err))
}
select {
case <-ctx.Done():
return true
default:
return false
}
})
if err != nil {
log.Error(logs.FailedToIterateOverResponse, zap.Error(err))
}
wg.Wait()
}()
return res, nil
}
func (h *Handler) headDirObject(ctx context.Context, cnrID cid.ID, objID oid.ID, basePath string) (ResponseObject, error) {
addr := newAddress(cnrID, objID)
obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)},
Address: addr,
})
if err != nil {
return ResponseObject{}, err
}
attrs := loadAttributes(obj.Attributes())
attrs[attrOID] = objID.EncodeToString()
if multipartSize, ok := attrs[attributeMultipartObjectSize]; ok {
attrs[attrSize] = multipartSize
} else {
attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
}
dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
if dirname == "" {
return newListObjectsResponseNative(attrs), nil
}
return ResponseObject{
FileName: dirname,
FilePath: basePath + dirname,
IsDir: true,
}, nil
}
type browseParams struct {
bucketInfo *data.BucketInfo
prefix string
isNative bool
listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error)
}
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
const S3Protocol = "s3"
const FrostfsProtocol = "frostfs"
ctx := utils.GetContextFromRequest(c) ctx := utils.GetContextFromRequest(c)
reqLog := utils.GetReqLogOrDefault(ctx, h.log) reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(zap.String("bucket", bucketInfo.Name)) log := reqLog.With(
zap.String("bucket", p.bucketInfo.Name),
nodes, err := h.listObjects(ctx, bucketInfo, prefix) zap.String("container", p.bucketInfo.CID.EncodeToString()),
zap.String("prefix", p.prefix),
)
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) logAndSendBucketError(c, log, err)
return return
} }
respObjects := make([]ResponseObject, len(nodes)) objects := resp.objects
sort.Slice(objects, func(i, j int) bool {
for i, node := range nodes { if objects[i].IsDir == objects[j].IsDir {
respObjects[i] = NewResponseObject(node) return objects[i].FileName < objects[j].FileName
}
sort.Slice(respObjects, func(i, j int) bool {
if respObjects[i].IsDir == respObjects[j].IsDir {
return respObjects[i].FileName < respObjects[j].FileName
} }
return respObjects[i].IsDir return objects[i].IsDir
}) })
indexTemplate := h.config.IndexPageTemplate()
tmpl, err := template.New("index").Funcs(template.FuncMap{ tmpl, err := template.New("index").Funcs(template.FuncMap{
"formatTimestamp": formatTimestamp, "formatSize": formatSize,
"formatSize": formatSize, "trimPrefix": trimPrefix,
"trimPrefix": trimPrefix, "urlencode": urlencode,
"urlencode": urlencode, "parentDir": parentDir,
"parentDir": parentDir, }).Parse(h.config.IndexPageTemplate())
}).Parse(indexTemplate)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) logAndSendBucketError(c, log, err)
return return
} }
bucketName := p.bucketInfo.Name
protocol := S3Protocol
if p.isNative {
bucketName = p.bucketInfo.CID.EncodeToString()
protocol = FrostfsProtocol
}
if err = tmpl.Execute(c, &BrowsePageData{ if err = tmpl.Execute(c, &BrowsePageData{
BucketName: bucketInfo.Name, Container: bucketName,
Prefix: prefix, Prefix: p.prefix,
Objects: respObjects, Objects: objects,
Protocol: protocol,
HasErrors: resp.hasErrors,
}); err != nil { }); err != nil {
logAndSendBucketError(c, log, err) logAndSendBucketError(c, log, err)
return return

View file

@ -23,13 +23,12 @@ import (
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format. // DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) { func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
test, _ := c.UserValue("oid").(string) cnrIDStr, _ := c.UserValue("cid").(string)
var id oid.ID var cnrID cid.ID
err := id.DecodeString(test) if err := cnrID.DecodeString(cnrIDStr); err != nil {
if err != nil { h.byS3Path(c, h.receiveFile)
h.byObjectName(c, h.receiveFile)
} else { } else {
h.byAddress(c, h.receiveFile) h.byNativeAddress(c, h.receiveFile)
} }
} }
@ -45,7 +44,7 @@ func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
h.byAttribute(c, h.receiveFile) h.byAttribute(c, h.receiveFile)
} }
func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) { func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
filters := object.NewSearchFilters() filters := object.NewSearchFilters()
filters.AddRootFilter() filters.AddRootFilter()
filters.AddFilter(key, val, op) filters.AddFilter(key, val, op)
@ -54,7 +53,7 @@ func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op
PrmAuth: PrmAuth{ PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx), BearerToken: bearerToken(ctx),
}, },
Container: *cnrID, Container: cnrID,
Filters: filters, Filters: filters,
} }
@ -102,7 +101,7 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
return return
} }
resSearch, err := h.search(ctx, &bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil { if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)

View file

@ -22,6 +22,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/panjf2000/ants/v2"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -165,6 +166,7 @@ type Handler struct {
containerResolver ContainerResolver containerResolver ContainerResolver
tree *tree.Tree tree *tree.Tree
cache *cache.BucketCache cache *cache.BucketCache
workerPool *ants.Pool
} }
type AppParams struct { type AppParams struct {
@ -175,7 +177,7 @@ type AppParams struct {
Cache *cache.BucketCache Cache *cache.BucketCache
} }
func New(params *AppParams, config Config, tree *tree.Tree) *Handler { func New(params *AppParams, config Config, tree *tree.Tree, workerPool *ants.Pool) *Handler {
return &Handler{ return &Handler{
log: params.Logger, log: params.Logger,
frostfs: params.FrostFS, frostfs: params.FrostFS,
@ -184,14 +186,15 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler {
containerResolver: params.Resolver, containerResolver: params.Resolver,
tree: tree, tree: tree,
cache: params.Cache, cache: params.Cache,
workerPool: workerPool,
} }
} }
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it. // prepares request and object address to it.
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { func (h *Handler) byNativeAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
idCnr, _ := c.UserValue("cid").(string) idCnr, _ := c.UserValue("cid").(string)
idObj, _ := c.UserValue("oid").(string) idObj, _ := url.PathUnescape(c.UserValue("oid").(string))
ctx := utils.GetContextFromRequest(c) ctx := utils.GetContextFromRequest(c)
reqLog := utils.GetReqLogOrDefault(ctx, h.log) reqLog := utils.GetReqLogOrDefault(ctx, h.log)
@ -205,6 +208,16 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
objID := new(oid.ID) objID := new(oid.ID)
if err = objID.DecodeString(idObj); err != nil { if err = objID.DecodeString(idObj); err != nil {
if h.config.IndexPageEnabled() {
c.SetStatusCode(fasthttp.StatusNotFound)
h.browseObjects(c, browseParams{
bucketInfo: bktInfo,
prefix: idObj,
listObjects: h.getDirObjectsNative,
isNative: true,
})
return
}
log.Error(logs.WrongObjectID, zap.Error(err)) log.Error(logs.WrongObjectID, zap.Error(err))
response.Error(c, "wrong object id", fasthttp.StatusBadRequest) response.Error(c, "wrong object id", fasthttp.StatusBadRequest)
return return
@ -215,9 +228,9 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
f(ctx, *h.newRequest(c, log), addr) f(ctx, *h.newRequest(c, log), addr)
} }
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that // byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it. // resolves object address from S3-like path <bucket name>/<object key>.
func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { func (h *Handler) byS3Path(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
bucketname := c.UserValue("cid").(string) bucketname := c.UserValue("cid").(string)
key := c.UserValue("oid").(string) key := c.UserValue("oid").(string)
download := c.QueryArgs().GetBool("download") download := c.QueryArgs().GetBool("download")
@ -243,7 +256,12 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
if isDir(unescapedKey) || isContainerRoot(unescapedKey) { if isDir(unescapedKey) || isContainerRoot(unescapedKey) {
if code := checkErrorType(err); code == fasthttp.StatusNotFound || code == fasthttp.StatusOK { if code := checkErrorType(err); code == fasthttp.StatusNotFound || code == fasthttp.StatusOK {
c.SetStatusCode(code) c.SetStatusCode(code)
h.browseObjects(c, bktInfo, unescapedKey) h.browseObjects(c, browseParams{
bucketInfo: bktInfo,
prefix: unescapedKey,
listObjects: h.getDirObjectsS3,
isNative: false,
})
return return
} }
} }
@ -267,7 +285,7 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
f(ctx, *h.newRequest(c, log), addr) f(ctx, *h.newRequest(c, log), addr)
} }
// byAttribute is a wrapper similar to byAddress. // byAttribute is a wrapper similar to byNativeAddress.
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
scid, _ := c.UserValue("cid").(string) scid, _ := c.UserValue("cid").(string)
key, _ := c.UserValue("attr_key").(string) key, _ := c.UserValue("attr_key").(string)
@ -298,7 +316,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
return return
} }
res, err := h.search(ctx, &bktInfo.CID, key, val, object.MatchStringEqual) res, err := h.search(ctx, bktInfo.CID, key, val, object.MatchStringEqual)
if err != nil { if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
@ -394,25 +412,3 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
return bktInfo, err return bktInfo, err
} }
func (h *Handler) listObjects(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]map[string]string, error) {
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
if err != nil {
return nil, err
}
var objects = make([]map[string]string, 0, len(nodes))
for _, node := range nodes {
meta := node.GetMeta()
if meta == nil {
continue
}
var obj = make(map[string]string, len(meta))
for _, m := range meta {
obj[m.GetKey()] = string(m.GetValue())
}
objects = append(objects, obj)
}
return objects, nil
}

View file

@ -26,6 +26,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
@ -57,10 +58,11 @@ func (c *configMock) IndexPageEnabled() bool {
return false return false
} }
func (c *configMock) IndexPageTemplatePath() string { func (c *configMock) IndexPageTemplate() string {
return "" return ""
} }
func (c *configMock) IndexPageTemplate() string {
func (c *configMock) IndexPageNativeTemplate() string {
return "" return ""
} }
@ -126,7 +128,11 @@ func prepareHandlerContext() (*handlerContext, error) {
treeMock := &treeClientMock{} treeMock := &treeClientMock{}
cfgMock := &configMock{} cfgMock := &configMock{}
handler := New(params, cfgMock, tree.NewTree(treeMock)) workerPool, err := ants.NewPool(1000)
if err != nil {
return nil, err
}
handler := New(params, cfgMock, tree.NewTree(treeMock), workerPool)
return &handlerContext{ return &handlerContext{
key: key, key: key,

View file

@ -107,9 +107,9 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
err := id.DecodeString(test) err := id.DecodeString(test)
if err != nil { if err != nil {
h.byObjectName(c, h.headObject) h.byS3Path(c, h.headObject)
} else { } else {
h.byAddress(c, h.headObject) h.byNativeAddress(c, h.headObject)
} }
} }

View file

@ -13,6 +13,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
@ -61,6 +62,14 @@ func checkErrorType(err error) int {
} }
} }
func loadAttributes(attrs []object.Attribute) map[string]string {
result := make(map[string]string)
for _, attr := range attrs {
result[attr.Key()] = attr.Value()
}
return result
}
func isValidToken(s string) bool { func isValidToken(s string) bool {
for _, c := range s { for _, c := range s {
if c <= ' ' || c > 127 { if c <= ' ' || c > 127 {

View file

@ -31,7 +31,8 @@ const (
CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go
AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go
FailedToReadIndexPageTemplate = "failed to read index page template, set default" // Warn in ../../app.go FailedToCreateWorkerPool = "failed to create worker pool" // Fatal in ../../app.go
FailedToReadIndexPageTemplate = "failed to read index page template" // Error in ../../app.go
SetCustomIndexPageTemplate = "set custom index page template" // Info in ../../app.go SetCustomIndexPageTemplate = "set custom index page template" // Info in ../../app.go
ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go
MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go
@ -71,6 +72,9 @@ const (
AddedStoragePeer = "added storage peer" // Info in ../../settings.go AddedStoragePeer = "added storage peer" // Info in ../../settings.go
CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go
CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go
FailedToSumbitTaskToPool = "failed to submit task to pool" // Error in ../handler/browse.go
FailedToHeadObject = "failed to head object" // Error in ../handler/browse.go
FailedToIterateOverResponse = "failed to iterate over search response" // Error in ../handler/browse.go
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go

View file

@ -1,11 +1,20 @@
{{$bucketName := .BucketName}} {{$container := .Container}}
{{ $prefix := trimPrefix .Prefix }} {{ $prefix := trimPrefix .Prefix }}
<!DOCTYPE html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8"/> <meta charset="UTF-8"/>
<title>Index of s3://{{$bucketName}}/{{if $prefix}}/{{$prefix}}/{{end}}</title> <title>Index of {{.Protocol}}://{{$container}}
/{{if $prefix}}/{{$prefix}}/{{end}}</title>
<style> <style>
.alert {
width: 80%;
box-sizing: border-box;
padding: 20px;
background-color: #f44336;
color: white;
margin-bottom: 15px;
}
table { table {
width: 80%; width: 80%;
border-collapse: collapse; border-collapse: collapse;
@ -23,15 +32,25 @@
th { th {
background-color: #c3bcbc; background-color: #c3bcbc;
} }
h1 {
font-size: 1.5em;
}
tr:nth-child(even) {background-color: #ebe7e7;} tr:nth-child(even) {background-color: #ebe7e7;}
</style> </style>
</head> </head>
<body> <body>
<h1>Index of s3://{{$bucketName}}/{{if $prefix}}{{$prefix}}/{{end}}</h1> <h1>Index of {{.Protocol}}://{{$container}}/{{if $prefix}}{{$prefix}}/{{end}}</h1>
{{ if .HasErrors }}
<div class="alert">
Errors occurred while processing the request. Perhaps some objects are missing
</div>
{{ end }}
<table> <table>
<thead> <thead>
<tr> <tr>
<th>Filename</th> <th>Filename</th>
<th>OID</th>
<th>Size</th> <th>Size</th>
<th>Created</th> <th>Created</th>
<th>Download</th> <th>Download</th>
@ -42,20 +61,22 @@
{{if $trimmedPrefix }} {{if $trimmedPrefix }}
<tr> <tr>
<td> <td>
⮐<a href="/get/{{$bucketName}}{{ urlencode $trimmedPrefix "" }}">..</a> ⮐<a href="/get/{{$container}}{{ urlencode $trimmedPrefix }}/">..</a>
</td> </td>
<td></td> <td></td>
<td></td> <td></td>
<td></td> <td></td>
<td></td>
</tr> </tr>
{{else}} {{else}}
<tr> <tr>
<td> <td>
⮐<a href="/get/{{ $bucketName }}/">..</a> ⮐<a href="/get/{{$container}}/">..</a>
</td> </td>
<td></td> <td></td>
<td></td> <td></td>
<td></td> <td></td>
<td></td>
</tr> </tr>
{{end}} {{end}}
{{range .Objects}} {{range .Objects}}
@ -63,21 +84,22 @@
<td> <td>
{{if .IsDir}} {{if .IsDir}}
🗀 🗀
<a href="/get/{{ $bucketName }}{{ urlencode $prefix .FileName }}/"> <a href="{{.GetURL}}/">
{{.FileName}}/ {{.FileName}}/
</a> </a>
{{else}} {{else}}
🗎 🗎
<a href="/get/{{ $bucketName }}{{ urlencode $prefix .FileName }}"> <a href="{{ .GetURL }}">
{{.FileName}} {{.FileName}}
</a> </a>
{{end}} {{end}}
</td> </td>
<td>{{.OID}}</td>
<td>{{if not .IsDir}}{{ formatSize .Size }}{{end}}</td> <td>{{if not .IsDir}}{{ formatSize .Size }}{{end}}</td>
<td>{{if not .IsDir}}{{ formatTimestamp .Created }}{{end}}</td> <td>{{ .Created }}</td>
<td> <td>
{{ if not .IsDir }} {{ if .OID }}
<a href="/get/{{ $bucketName}}{{ urlencode $prefix .FileName }}?download=true"> <a href="{{ .GetURL }}?download=true">
Link Link
</a> </a>
{{ end }} {{ end }}