[#151] Index page for FrostFS containers #153
|
@ -40,6 +40,7 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -89,6 +90,7 @@ type (
|
|||
appSettings struct {
|
||||
reconnectInterval time.Duration
|
||||
dialerSource *internalnet.DialerSource
|
||||
workerPoolSize int
|
||||
|
||||
mu sync.RWMutex
|
||||
defaultTimestamp bool
|
||||
|
@ -184,6 +186,7 @@ func (a *app) initAppSettings() {
|
|||
a.settings = &appSettings{
|
||||
reconnectInterval: fetchReconnectInterval(a.cfg),
|
||||
dialerSource: getDialerSource(a.log, a.cfg),
|
||||
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize),
|
||||
}
|
||||
a.settings.update(a.cfg, a.log)
|
||||
}
|
||||
|
@ -490,7 +493,13 @@ func (a *app) setHealthStatus() {
|
|||
}
|
||||
|
||||
func (a *app) Serve() {
|
||||
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)))
|
||||
workerPool := a.initWorkerPool()
|
||||
defer func() {
|
||||
workerPool.Release()
|
||||
close(a.webDone)
|
||||
}()
|
||||
|
||||
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool)
|
||||
|
||||
// Configure router.
|
||||
a.configureRouter(handler)
|
||||
|
@ -532,8 +541,14 @@ LOOP:
|
|||
a.metrics.Shutdown()
|
||||
a.stopServices()
|
||||
a.shutdownTracing()
|
||||
}
|
||||
|
||||
close(a.webDone)
|
||||
func (a *app) initWorkerPool() *ants.Pool {
|
||||
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
|
||||
}
|
||||
return workerPool
|
||||
}
|
||||
|
||||
func (a *app) shutdownTracing() {
|
||||
|
@ -609,6 +624,7 @@ func (a *app) stopServices() {
|
|||
svc.ShutDown(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) configureRouter(handler *handler.Handler) {
|
||||
r := router.New()
|
||||
r.RedirectTrailingSlash = true
|
||||
|
|
|
@ -71,6 +71,8 @@ const (
|
|||
cfgIndexPageEnabled = "index_page.enabled"
|
||||
cfgIndexPageTemplatePath = "index_page.template_path"
|
||||
|
||||
cfgWorkerPoolSize = "worker_pool_size"
|
||||
|
||||
// Web.
|
||||
cfgWebReadBufferSize = "web.read_buffer_size"
|
||||
cfgWebWriteBufferSize = "web.write_buffer_size"
|
||||
|
@ -228,9 +230,6 @@ func settings() *viper.Viper {
|
|||
// pool:
|
||||
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
||||
|
||||
v.SetDefault(cfgIndexPageEnabled, false)
|
||||
v.SetDefault(cfgIndexPageTemplatePath, "")
|
||||
|
||||
// frostfs:
|
||||
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
|
||||
|
||||
|
@ -242,6 +241,7 @@ func settings() *viper.Viper {
|
|||
v.SetDefault(cfgWebStreamRequestBody, true)
|
||||
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
|
||||
|
||||
v.SetDefault(cfgWorkerPoolSize, 1000)
|
||||
// upload header
|
||||
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
|
||||
|
||||
|
|
|
@ -150,3 +150,12 @@ HTTP_GW_MULTINET_FALLBACK_DELAY=300ms
|
|||
# List of subnets and IP addresses to use as source for those subnets
|
||||
HTTP_GW_MULTINET_SUBNETS_1_MASK=1.2.3.4/24
|
||||
HTTP_GW_MULTINET_SUBNETS_1_SOURCE_IPS=1.2.3.4 1.2.3.5
|
||||
|
||||
# Number of workers in handler's worker pool
|
||||
HTTP_GW_WORKER_POOL_SIZE=1000
|
||||
|
||||
# Index page
|
||||
# Enable index page support
|
||||
HTTP_GW_INDEX_PAGE_ENABLED=false
|
||||
# Index page template path
|
||||
HTTP_GW_INDEX_PAGE_TEMPLATE_PATH=internal/handler/templates/index.gotmpl
|
|
@ -113,6 +113,9 @@ request_timeout: 5s # Timeout to check node health during rebalance.
|
|||
rebalance_timer: 30s # Interval to check nodes health.
|
||||
pool_error_threshold: 100 # The number of errors on connection after which node is considered as unhealthy.
|
||||
|
||||
# Number of workers in handler's worker pool
|
||||
worker_pool_size: 1000
|
||||
|
||||
# Enable index page to see objects list for specified container and prefix
|
||||
index_page:
|
||||
enabled: false
|
||||
|
|
|
@ -75,18 +75,21 @@ request_timeout: 5s
|
|||
rebalance_timer: 30s
|
||||
pool_error_threshold: 100
|
||||
reconnect_interval: 1m
|
||||
worker_pool_size: 1000
|
||||
|
||||
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|------------------------|------------|---------------|---------------|-------------------------------------------------------------------------------------------------|
|
||||
| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. |
|
||||
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. |
|
||||
| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. |
|
||||
| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. |
|
||||
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
||||
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
||||
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
||||
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|------------------------|------------|---------------|---------------|------------------------------------------------------------------------------------|
|
||||
| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. |
|
||||
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. |
|
||||
| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. |
|
||||
| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. |
|
||||
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
||||
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
||||
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
||||
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
|
||||
| `worker_pool_size` | `int` | no | `1000` | Maximum worker count in handler's worker pool. |
|
||||
|
||||
# `wallet` section
|
||||
|
||||
|
@ -374,7 +377,12 @@ resolve_bucket:
|
|||
|
||||
# `index_page` section
|
||||
|
||||
Parameters for index HTML-page output with S3-bucket or S3-subdir content for `Get object` request
|
||||
Parameters for index HTML-page output. Activates if `GetObject` request returns `not found`. Two
|
||||
index page modes available:
|
||||
|
||||
* `s3` mode uses tree service for listing objects,
|
||||
* `native` sends requests to nodes via native protocol.
|
||||
If request pass S3-bucket name instead of CID, `s3` mode will be used, otherwise `native`.
|
||||
|
||||
```yaml
|
||||
index_page:
|
||||
|
|
1
go.mod
|
@ -12,6 +12,7 @@ require (
|
|||
github.com/docker/go-units v0.4.0
|
||||
github.com/fasthttp/router v1.4.1
|
||||
github.com/nspcc-dev/neo-go v0.106.2
|
||||
github.com/panjf2000/ants/v2 v2.5.0
|
||||
github.com/prometheus/client_golang v1.19.0
|
||||
github.com/prometheus/client_model v0.5.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
|
|
2
go.sum
|
@ -682,6 +682,8 @@ github.com/opencontainers/selinux v1.6.0/go.mod h1:VVGKuOLlE7v4PJyT6h7mNWvq1rzqi
|
|||
github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3ogry1nUQF8Evvo=
|
||||
github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8=
|
||||
github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI=
|
||||
github.com/panjf2000/ants/v2 v2.5.0 h1:1rWGWSnxCsQBga+nQbA4/iY6VMeNoOIAM0ZWh9u3q2Q=
|
||||
github.com/panjf2000/ants/v2 v2.5.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc=
|
||||
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=
|
||||
|
|
|
@ -1,15 +1,21 @@
|
|||||
package handler
|
||||||
|
||||||
import (
|
||||||
"context"
|
||||||
"html/template"
|
||||||
"net/url"
|
||||||
"sort"
|
||||||
"strconv"
|
||||||
"strings"
|
||||||
"sync"
|
||||||
"time"
|
||||||
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/docker/go-units"
|
||||||
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
||||||
|
@ -25,19 +31,68 @@ const (
|
|||||
|
||||||
alexvanin
commented
I think this is basically unused and can be moved to test file. In the app you defined default pool size in settings.go I think this is basically unused and can be moved to test file. In the app you defined default pool size in settings.go
|
||||||
type (
|
||||||
BrowsePageData struct {
|
||||||
BucketName,
|
||||||
Prefix string
|
||||||
Objects []ResponseObject
|
||||||
HasErrors bool
|
||||||
Container string
|
||||||
Prefix string
|
||||||
Protocol string
|
||||||
Objects []ResponseObject
|
||||||
}
|
||||||
ResponseObject struct {
|
||||||
OID string
|
||||||
Created string
|
||||||
FileName string
|
||||||
FilePath string
|
||||||
Size string
|
||||||
IsDir bool
|
||||||
GetURL string
|
||||||
}
|
||||||
)
|
||||||
|
||||||
func newListObjectsResponseS3(attrs map[string]string) ResponseObject {
|
||||||
return ResponseObject{
|
||||||
Created: formatTimestamp(attrs[attrCreated]),
|
||||||
OID: attrs[attrOID],
|
||||||
FileName: attrs[attrFileName],
|
||||||
Size: attrs[attrSize],
|
||||||
IsDir: attrs[attrOID] == "",
|
||||||
}
|
||||||
}
|
||||||
|
||||||
func newListObjectsResponseNative(attrs map[string]string) ResponseObject {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Maybe it's better to save to fields value in appropriate format and don't invoke any functions from template? Maybe it's better to save to fields value in appropriate format and don't invoke any functions from template?
dkirillov
commented
Will we change this? It still invokes Will we change this? It still invokes `formatTimestamp` from template
|
||||||
filename := lastPathElement(attrs[object.AttributeFilePath])
|
||||||
if filename == "" {
|
||||||
filename = attrs[attrFileName]
|
||||||
}
|
||||||
return ResponseObject{
|
||||||
OID: attrs[attrOID],
|
||||||
Created: formatTimestamp(attrs[object.AttributeTimestamp] + "000"),
|
||||||
FileName: filename,
|
||||||
FilePath: attrs[object.AttributeFilePath],
|
||||||
Size: attrs[attrSize],
|
||||||
IsDir: false,
|
||||||
}
|
||||||
}
|
||||||
|
||||||
func getNextDir(filepath, prefix string) string {
|
||||||
restPath := strings.Replace(filepath, prefix, "", 1)
|
||||||
index := strings.Index(restPath, "/")
|
||||||
if index == -1 {
|
||||||
return ""
|
||||||
}
|
||||||
return restPath[:index]
|
||||||
}
|
||||||
|
||||||
func lastPathElement(path string) string {
|
||||||
if path == "" {
|
||||||
return path
|
||||||
}
|
||||||
index := strings.LastIndex(path, "/")
|
||||||
if index == len(path)-1 {
|
||||||
index = strings.LastIndex(path[:index], "/")
|
||||||
}
|
||||||
return path[index+1:]
|
||||||
}
|
||||||
|
||||||
func parseTimestamp(tstamp string) (time.Time, error) {
|
||||||
millis, err := strconv.ParseInt(tstamp, 10, 64)
|
||||||
if err != nil {
|
||||||
|
@ -47,16 +102,6 @@ func parseTimestamp(tstamp string) (time.Time, error) {
|
|||||
return time.UnixMilli(millis), nil
|
||||||
}
|
||||||
|
||||||
func NewResponseObject(nodes map[string]string) ResponseObject {
|
||||||
return ResponseObject{
|
||||||
OID: nodes[attrOID],
|
||||||
Created: nodes[attrCreated],
|
||||||
FileName: nodes[attrFileName],
|
||||||
Size: nodes[attrSize],
|
||||||
IsDir: nodes[attrOID] == "",
|
||||||
}
|
||||||
}
|
||||||
|
||||||
func formatTimestamp(strdate string) string {
|
||||||
date, err := parseTimestamp(strdate)
|
||||||
if err != nil || date.IsZero() {
|
||||||
|
@ -94,12 +139,9 @@ func trimPrefix(encPrefix string) string {
|
|||||
return prefix[:slashIndex]
|
||||||
}
|
||||||
|
||||||
func urlencode(prefix, filename string) string {
|
||||||
func urlencode(path string) string {
|
||||||
var res strings.Builder
|
||||||
path := filename
|
||||||
if prefix != "" {
|
||||||
path = strings.Join([]string{prefix, filename}, "/")
|
||||||
}
|
||||||
|
||||||
prefixParts := strings.Split(path, "/")
|
||||||
for _, prefixPart := range prefixParts {
|
||||||
prefixPart = "/" + url.PathEscape(prefixPart)
|
||||||
|
@ -112,46 +154,220 @@ func urlencode(prefix, filename string) string {
|
|||||
return res.String()
|
||||||
}
|
||||||
|
||||||
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, bucketInfo *data.BucketInfo, prefix string) {
|
||||||
type GetObjectsResponse struct {
|
||||||
objects []ResponseObject
|
||||||
hasErrors bool
|
||||||
}
|
||||||
|
||||||
func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) {
|
||||||
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
|
||||||
if err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
|
||||||
result := &GetObjectsResponse{
|
||||||
dkirillov
commented
Please don't use
Additionally, we can skip setting Please don't use `var` when we initialize struct:
```golang
result:= &GetObjectReponse{
objects: make([]ResponseObject, 0, len(nodes)),
hasErrors: false,
}
```
Additionally, we can skip setting `hasErrors: false`
|
||||||
objects: make([]ResponseObject, 0, len(nodes)),
|
||||||
}
|
||||||
for _, node := range nodes {
|
||||||
meta := node.GetMeta()
|
||||||
if meta == nil {
|
||||||
continue
|
||||||
}
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Can we search all object if prefix is empty to be able to handle objects that have Can we search all object if prefix is empty to be able to handle objects that have `FileName` rather than `FilePath`?
nzinkevich
commented
Fixed Fixed
dkirillov
commented
I meant write something like this:
I meant write something like this:
```golang
filters := object.NewSearchFilters()
filters.AddRootFilter()
if prefix != "" {
filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix)
}
```
|
||||||
var attrs = make(map[string]string, len(meta))
|
||||||
for _, m := range meta {
|
||||||
attrs[m.GetKey()] = string(m.GetValue())
|
||||||
}
|
||||||
obj := newListObjectsResponseS3(attrs)
|
||||||
obj.FilePath = prefix + obj.FileName
|
||||||
obj.GetURL = "/get/" + bucketInfo.Name + urlencode(obj.FilePath)
|
||||||
result.objects = append(result.objects, obj)
|
||||||
}
|
||||||
|
||||||
return result, nil
|
||||||
}
|
||||||
|
||||||
func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) {
|
||||||
var basePath string
|
||||||
if ind := strings.LastIndex(prefix, "/"); ind != -1 {
|
||||||
basePath = prefix[:ind+1]
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Please use Please use `:=` syntax where possible
|
||||||
}
|
||||||
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Matter of taste but
Matter of taste but
```golang
var basePath string
if ind := strings.LastIndex(prefix, "/"); ind != -1 {
basePath = prefix[:ind+1]
}
```
|
||||||
filters := object.NewSearchFilters()
|
||||||
filters.AddRootFilter()
|
||||||
if prefix != "" {
|
||||||
filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||||
}
|
||||||
|
||||||
prm := PrmObjectSearch{
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
It's better to create such channel somewhere in It's better to create such channel somewhere in `headDirObjects` and close it there when no more objects be available
|
||||||
PrmAuth: PrmAuth{
|
||||||
BearerToken: bearerToken(ctx),
|
||||||
},
|
||||||
Container: bucketInfo.CID,
|
||||||
Filters: filters,
|
||||||
}
|
||||||
objectIDs, err := h.frostfs.SearchObjects(ctx, prm)
|
||||||
if err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
defer objectIDs.Close()
|
||||||
|
||||||
resp, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
|
||||||
if err != nil {
|
||||||
return nil, err
|
||||||
}
|
||||||
|
||||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||||
dirs := make(map[string]struct{})
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
What do we want to do on such error?
cc @alexvanin What do we want to do on such error?
* Return error to client?
* Ignore and list other?
* Stop further heading and return client current objects?
cc @alexvanin
|
||||||
result := &GetObjectsResponse{
|
||||||
objects: make([]ResponseObject, 0, 100),
|
||||||
}
|
||||||
for objExt := range resp {
|
||||||
if objExt.Error != nil {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We cannot use We cannot use `addr` from other goroutine.
|
||||||
log.Error(logs.FailedToHeadObject, zap.Error(objExt.Error))
|
||||||
result.hasErrors = true
|
||||||
continue
|
||||||
}
|
||||||
if objExt.Object.IsDir {
|
||||||
if _, ok := dirs[objExt.Object.FileName]; ok {
|
||||||
continue
|
||||||
}
|
||||||
objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + urlencode(objExt.Object.FilePath)
|
||||||
dirs[objExt.Object.FileName] = struct{}{}
|
||||||
} else {
|
||||||
objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + "/" + objExt.Object.OID
|
||||||
}
|
||||||
result.objects = append(result.objects, objExt.Object)
|
||||||
}
|
||||||
return result, nil
|
||||||
}
|
||||||
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why don't we use channel for this? Why don't we use channel for this?
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Let's write
Let's write
```golang
func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (<-chan ResponseObjectExtended, error) {
```
|
||||||
type ResponseObjectExtended struct {
|
||||||
Object ResponseObject
|
||||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
As far as I understand, every simultaneous browse request will produce new pool with up to 1000 routines by default. That might increase goroutine usage very very fast under the load. I suggest to use one pool of workers in the handler. As far as I understand, every simultaneous browse request will produce new pool with up to 1000 routines by default. That might increase goroutine usage very very fast under the load.
I suggest to use one pool of workers in the handler.
|
||||||
Error error
|
||||||
}
|
||||||
|
||||||
func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (<-chan ResponseObjectExtended, error) {
|
||||||
res := make(chan ResponseObjectExtended)
|
||||||
|
||||||
go func() {
|
||||||
defer close(res)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Please, write:
Or
Please, write:
```golang
var wg sync.WaitGroup
var dirs sync.Map
```
Or
```golang
var (
wg sync.WaitGroup
dirs sync.Map
)
```
|
||||||
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
||||||
zap.String("cid", cnrID.EncodeToString()),
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Pool size should be configurable. And default should be at least 100 (maybe 1000) Pool size should be configurable. And default should be at least 100 (maybe 1000)
|
||||||
zap.String("path", basePath),
|
||||||
)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
If I understand correctly, we stop iteration if any error is occurred. So here we should If I understand correctly, we stop iteration if any error is occurred. So here we should `return true` if context is canceled
|
||||||
var wg sync.WaitGroup
|
||||||
err := objectIDs.Iterate(func(id oid.ID) bool {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
It's better to restrict number of goroutines. See for example s3-gw listing
It's better to restrict number of goroutines. See for example s3-gw listing https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/commit/e35b582fe25a97c018367ef9244dfe93d4500ad9/api/layer/listing.go#L530
|
||||||
wg.Add(1)
|
||||||
err := h.workerPool.Submit(func() {
|
||||||
defer wg.Done()
|
||||||
var obj ResponseObjectExtended
|
||||||
obj.Object, obj.Error = h.headDirObject(ctx, cnrID, id, basePath)
|
||||||
res <- obj
|
||||||
})
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Here we also should add Here we also should add `wg.Done`
|
||||||
if err != nil {
|
||||||
wg.Done()
|
||||||
log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err))
|
||||||
}
|
||||||
select {
|
||||||
case <-ctx.Done():
|
||||||
return true
|
||||||
default:
|
||||||
return false
|
||||||
}
|
||||||
})
|
||||||
if err != nil {
|
||||||
log.Error(logs.FailedToIterateOverResponse, zap.Error(err))
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
This 4 lines seems confusing to me.
This 4 lines seems confusing to me.
Can we rewrite this code to make it more strightforward?
For example:
```diff
diff --git a/internal/handler/browse.go b/internal/handler/browse.go
index a2373d1..d3f84ae 100644
--- a/internal/handler/browse.go
+++ b/internal/handler/browse.go
@@ -12,7 +12,6 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
- "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@@ -202,116 +201,118 @@ func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.Buck
}
defer objectIDs.Close()
- return h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
-}
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
-type workerParams struct {
- cnrID cid.ID
- objectIDs ResObjectSearch
- basePath string
- errCh chan error
- objCh chan ResponseObject
- cancel context.CancelFunc
-}
+ resp, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
+ if err != nil {
+ return nil, err
+ }
-func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) ([]ResponseObject, error) {
- const initialSliceCapacity = 100
+ log := utils.GetReqLogOrDefault(ctx, h.log)
- var wg sync.WaitGroup
- var dirs sync.Map
- log := h.log.With(
- zap.String("cid", cnrID.EncodeToString()),
- zap.String("path", basePath),
- )
- done := make(chan struct{})
- objects := make([]ResponseObject, 0, initialSliceCapacity)
- ctx, cancel := context.WithCancel(ctx)
- p := workerParams{
- cnrID: cnrID,
- objectIDs: objectIDs,
- basePath: basePath,
- errCh: make(chan error, 1),
- objCh: make(chan ResponseObject, 1),
- cancel: cancel,
- }
- defer cancel()
+ dirs := make(map[string]struct{})
+ objects := make([]ResponseObject, 0, 100)
+ for objExt := range resp {
+ if objExt.Error != nil {
+ log.Error("error", zap.Error(objExt.Error))
+ cancel()
+ continue
+ }
- go func() {
- for err := range p.errCh {
- if err != nil {
- log.Error(logs.FailedToHeadObject, zap.Error(err))
+ if objExt.Object.IsDir {
+ if _, ok := dirs[objExt.Object.FileName]; ok {
+ continue
}
+ dirs[objExt.Object.FileName] = struct{}{}
}
- done <- struct{}{}
- }()
- go func() {
- for obj := range p.objCh {
- objects = append(objects, obj)
- }
- done <- struct{}{}
- }()
+ objects = append(objects, objExt.Object)
+ }
+ return objects, nil
+}
+
+type ResponseObjectExtended struct {
+ Object ResponseObject
+ Error error
+}
+
+func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (chan ResponseObjectExtended, error) {
pool, err := ants.NewPool(runtime.NumCPU())
if err != nil {
return nil, err
}
- defer pool.Release()
- err = objectIDs.Iterate(func(id oid.ID) bool {
- wg.Add(1)
- if err = pool.Submit(func() {
- defer wg.Done()
- h.headDirObject(ctx, id, &dirs, p)
- }); err != nil {
- p.errCh <- err
- }
- select {
- case <-ctx.Done():
- return true
- default:
- return false
+
+ res := make(chan ResponseObjectExtended)
+
+ go func() {
+ defer func() {
+ pool.Release()
+ close(res)
+ }()
+
+ log := utils.GetReqLogOrDefault(ctx, h.log).With(
+ zap.String("cid", cnrID.EncodeToString()),
+ zap.String("path", basePath),
+ )
+
+ var wg sync.WaitGroup
+ err = objectIDs.Iterate(func(id oid.ID) bool {
+ wg.Add(1)
+ err = pool.Submit(func() {
+ defer wg.Done()
+
+ var obj ResponseObjectExtended
+ obj.Object, obj.Error = h.headDirObject(ctx, cnrID, id, basePath)
+ res <- obj
+ })
+ if err != nil {
+ wg.Done()
+ log.Warn("FailedToSubmitTaskToPool", zap.Error(err))
+ }
+
+ select {
+ case <-ctx.Done():
+ return true
+ default:
+ return false
+ }
+ })
+ if err != nil {
+ log.Error("iterate", zap.Error(err))
}
- })
- wg.Wait()
- close(p.errCh)
- close(p.objCh)
- <-done
- <-done
- if err != nil {
- return nil, err
- }
+ wg.Wait()
+ }()
- return objects, nil
+ return res, nil
}
-func (h *Handler) headDirObject(ctx context.Context, objID oid.ID, dirs *sync.Map, p workerParams) {
- addr := newAddress(p.cnrID, objID)
+func (h *Handler) headDirObject(ctx context.Context, cnrID cid.ID, objID oid.ID, basePath string) (ResponseObject, error) {
+ addr := newAddress(cnrID, objID)
obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)},
Address: addr,
})
if err != nil {
- p.errCh <- err
- p.cancel()
- return
+ return ResponseObject{}, err
}
attrs := loadAttributes(obj.Attributes())
attrs[attrOID] = objID.EncodeToString()
attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
- dirname := getNextDir(attrs[object.AttributeFilePath], p.basePath)
+ dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
if dirname == "" {
- p.objCh <- newListObjectsResponseNative(attrs)
- } else if _, ok := dirs.Load(dirname); !ok {
- p.objCh <- ResponseObject{
- FileName: dirname,
- FilePath: p.basePath + dirname,
- IsDir: true,
- }
- dirs.Store(dirname, true)
+ return newListObjectsResponseNative(attrs), nil
}
+
+ return ResponseObject{
+ FileName: dirname,
+ FilePath: basePath + dirname,
+ IsDir: true,
+ }, nil
}
type browseParams struct {
```
|
||||||
}
|
||||||
wg.Wait()
|
||||||
}()
|
||||||
|
||||||
return res, nil
|
||||||
}
|
||||||
|
||||||
func (h *Handler) headDirObject(ctx context.Context, cnrID cid.ID, objID oid.ID, basePath string) (ResponseObject, error) {
|
||||||
addr := newAddress(cnrID, objID)
|
||||||
obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
|
||||||
PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)},
|
||||||
Address: addr,
|
||||||
})
|
||||||
if err != nil {
|
||||||
return ResponseObject{}, err
|
||||||
}
|
||||||
|
||||||
attrs := loadAttributes(obj.Attributes())
|
||||||
attrs[attrOID] = objID.EncodeToString()
|
||||||
if multipartSize, ok := attrs[attributeMultipartObjectSize]; ok {
|
||||||
attrs[attrSize] = multipartSize
|
||||||
} else {
|
||||||
attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
|
||||||
}
|
||||||
|
||||||
dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
|
||||||
if dirname == "" {
|
||||||
return newListObjectsResponseNative(attrs), nil
|
||||||
}
|
||||||
|
||||||
return ResponseObject{
|
||||||
FileName: dirname,
|
||||||
FilePath: basePath + dirname,
|
||||||
IsDir: true,
|
||||||
}, nil
|
||||||
}
|
||||||
|
||||||
type browseParams struct {
|
||||||
bucketInfo *data.BucketInfo
|
||||||
prefix string
|
||||||
isNative bool
|
||||||
listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error)
|
||||||
}
|
||||||
|
||||||
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
|
||||||
const S3Protocol = "s3"
|
||||||
const FrostfsProtocol = "frostfs"
|
||||||
|
||||||
ctx := utils.GetContextFromRequest(c)
|
||||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||||
log := reqLog.With(zap.String("bucket", bucketInfo.Name))
|
||||||
|
||||||
nodes, err := h.listObjects(ctx, bucketInfo, prefix)
|
||||||
log := reqLog.With(
|
||||||
zap.String("bucket", p.bucketInfo.Name),
|
||||||
zap.String("container", p.bucketInfo.CID.EncodeToString()),
|
||||||
zap.String("prefix", p.prefix),
|
||||||
)
|
||||||
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
|
||||||
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
||||||
return
|
||||||
}
|
||||||
|
||||||
respObjects := make([]ResponseObject, len(nodes))
|
||||||
|
||||||
for i, node := range nodes {
|
||||||
respObjects[i] = NewResponseObject(node)
|
||||||
}
|
||||||
|
||||||
sort.Slice(respObjects, func(i, j int) bool {
|
||||||
if respObjects[i].IsDir == respObjects[j].IsDir {
|
||||||
return respObjects[i].FileName < respObjects[j].FileName
|
||||||
objects := resp.objects
|
||||||
sort.Slice(objects, func(i, j int) bool {
|
||||||
if objects[i].IsDir == objects[j].IsDir {
|
||||||
return objects[i].FileName < objects[j].FileName
|
||||||
}
|
||||||
return respObjects[i].IsDir
|
||||||
return objects[i].IsDir
|
||||||
})
|
||||||
indexTemplate := h.config.IndexPageTemplate()
|
||||||
|
||||||
tmpl, err := template.New("index").Funcs(template.FuncMap{
|
||||||
"formatTimestamp": formatTimestamp,
|
||||||
"formatSize": formatSize,
|
||||||
"trimPrefix": trimPrefix,
|
||||||
"urlencode": urlencode,
|
||||||
"parentDir": parentDir,
|
||||||
}).Parse(indexTemplate)
|
||||||
"formatSize": formatSize,
|
||||||
"trimPrefix": trimPrefix,
|
||||||
"urlencode": urlencode,
|
||||||
"parentDir": parentDir,
|
||||||
}).Parse(h.config.IndexPageTemplate())
|
||||||
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
||||||
return
|
||||||
}
|
||||||
bucketName := p.bucketInfo.Name
|
||||||
protocol := S3Protocol
|
||||||
if p.isNative {
|
||||||
bucketName = p.bucketInfo.CID.EncodeToString()
|
||||||
protocol = FrostfsProtocol
|
||||||
}
|
||||||
if err = tmpl.Execute(c, &BrowsePageData{
|
||||||
BucketName: bucketInfo.Name,
|
||||||
Prefix: prefix,
|
||||||
Objects: respObjects,
|
||||||
Container: bucketName,
|
||||||
Prefix: p.prefix,
|
||||||
Objects: objects,
|
||||||
Protocol: protocol,
|
||||||
HasErrors: resp.hasErrors,
|
||||||
}); err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
||||||
return
|
||||||
|
|
|
@ -23,13 +23,16 @@ import (
|
|||
|
||||
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
|
||||
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||
test, _ := c.UserValue("oid").(string)
|
||||
var id oid.ID
|
||||
err := id.DecodeString(test)
|
||||
if err != nil {
|
||||
h.byObjectName(c, h.receiveFile)
|
||||
} else {
|
||||
h.byAddress(c, h.receiveFile)
|
||||
oidURLParam := c.UserValue("oid").(string)
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why did we change Why did we change `oid` to `cid`?
dkirillov
commented
Oh. It seems I got it. Then we should rename Oh. It seems I got it. Then we should rename `h.byObjectName` and `h.byAddress` so that it reflect reality.
And/or be able to handle urls like `get/<bucket-name>/<oid>`
|
||||
downloadQueryParam := c.QueryArgs().GetBool("download")
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We usually use
We usually use `cnrID` name. So let's write:
```golang
cnrIDStr, _ := c.UserValue("cid").(string)
var cnrID cid.ID
```
|
||||
|
||||
alexvanin
commented
What's the difference between checking What's the difference between checking `oid` before and `cid` now?
|
||||
switch {
|
||||
case isObjectID(oidURLParam):
|
||||
dkirillov
commented
Why don't we check Why don't we check `cid` also?
nzinkevich
commented
There may be also container identifier from NNS, and we want to resolve them too. So I check only There may be also container identifier from NNS, and we want to resolve them too. So I check only `oid`, as it was before
dkirillov
commented
Then if we create object Then if we create object `5rnoerMhK3kSxDEUh2FrSQvSW86P2deahSifFLxFBF9v` into bucket `my-bucket` and I wanted to get it by http-gw like `curl http://<http-gw>/get/my-bucket/5rnoerMhK3kSxDEUh2FrSQvSW86P2deahSifFLxFBF9v` I wil got `404`. Is it ok?
|
||||
h.byNativeAddress(c, h.receiveFile)
|
||||
case !isContainerRoot(oidURLParam) && (downloadQueryParam || !isDir(oidURLParam)):
|
||||
h.byS3Path(c, h.receiveFile)
|
||||
default:
|
||||
h.browseIndex(c)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -45,7 +48,7 @@ func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
|||
h.byAttribute(c, h.receiveFile)
|
||||
}
|
||||
|
||||
func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
|
||||
func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
|
||||
filters := object.NewSearchFilters()
|
||||
filters.AddRootFilter()
|
||||
filters.AddFilter(key, val, op)
|
||||
|
@ -54,7 +57,7 @@ func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op
|
|||
PrmAuth: PrmAuth{
|
||||
BearerToken: bearerToken(ctx),
|
||||
},
|
||||
Container: *cnrID,
|
||||
Container: cnrID,
|
||||
Filters: filters,
|
||||
}
|
||||
|
||||
|
@ -102,7 +105,7 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
return
|
||||
}
|
||||
|
||||
resSearch, err := h.search(ctx, &bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -165,6 +166,7 @@ type Handler struct {
|
|||
containerResolver ContainerResolver
|
||||
tree *tree.Tree
|
||||
cache *cache.BucketCache
|
||||
workerPool *ants.Pool
|
||||
}
|
||||
|
||||
type AppParams struct {
|
||||
|
@ -175,7 +177,7 @@ type AppParams struct {
|
|||
Cache *cache.BucketCache
|
||||
}
|
||||
|
||||
func New(params *AppParams, config Config, tree *tree.Tree) *Handler {
|
||||
func New(params *AppParams, config Config, tree *tree.Tree, workerPool *ants.Pool) *Handler {
|
||||
return &Handler{
|
||||
log: params.Logger,
|
||||
frostfs: params.FrostFS,
|
||||
|
@ -184,14 +186,15 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler {
|
|||
containerResolver: params.Resolver,
|
||||
tree: tree,
|
||||
cache: params.Cache,
|
||||
workerPool: workerPool,
|
||||
}
|
||||
}
|
||||
|
||||
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byNativeAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
After internal demo, let's modify this code a bit. We want to determine which browse function to use based on tree data. After internal demo, let's modify this code a bit.
We want to determine which browse function to use based on tree data.
If tree data is present, then use `h.getDirObjectsS3`,
If tree data is absent, then use `h.getDirObjectsNative`.
|
||||
idCnr, _ := c.UserValue("cid").(string)
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Please, write:
Please, write:
```golang
idCnr, _ := c.UserValue("cid").(string)
idObj, _ := url.PathUnescape(c.UserValue("oid").(string))
log := h.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
```
|
||||
idObj, _ := c.UserValue("oid").(string)
|
||||
idObj, _ := url.PathUnescape(c.UserValue("oid").(string))
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
|
@ -215,12 +218,11 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
f(ctx, *h.newRequest(c, log), addr)
|
||||
}
|
||||
|
||||
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We do the same By the way, do we really need this? If We do the same `GetObject` request in `f` function. It's better to use `HeadObject`.
By the way, do we really need this? If `oid` is actual object id why do we list objects?
nzinkevich
commented
We only checked before that We only checked before that `idObj`string is decodable to oid type, but I think we should also output index page if no object found. That's why I try to `GetObject` (Head is better, I agree)
dkirillov
commented
Could you explain why? > I think we should also output index page if no object found
Could you explain why?
nzinkevich
commented
Well, I thought the S3 index page worked similarly, but I forgot that I removed this feature in the final implementation. Apache also does not return an index page if page not found. So I agree, it shouldn't return index page in that case Well, I thought the S3 index page worked similarly, but I forgot that I removed this feature in the final implementation. Apache also does not return an index page if page not found. So I agree, it shouldn't return index page in that case
|
||||
// resolves object address from S3-like path <bucket name>/<object key>.
|
||||
func (h *Handler) byS3Path(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
bucketname := c.UserValue("cid").(string)
|
||||
key := c.UserValue("oid").(string)
|
||||
download := c.QueryArgs().GetBool("download")
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
|
@ -239,15 +241,6 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
|
|||
}
|
||||
|
||||
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, unescapedKey)
|
||||
if h.config.IndexPageEnabled() && !download && string(c.Method()) != fasthttp.MethodHead {
|
||||
if isDir(unescapedKey) || isContainerRoot(unescapedKey) {
|
||||
if code := checkErrorType(err); code == fasthttp.StatusNotFound || code == fasthttp.StatusOK {
|
||||
c.SetStatusCode(code)
|
||||
h.browseObjects(c, bktInfo, unescapedKey)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if errors.Is(err, tree.ErrNodeAccessDenied) {
|
||||
response.Error(c, "Access Denied", fasthttp.StatusForbidden)
|
||||
|
@ -267,7 +260,7 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
|
|||
f(ctx, *h.newRequest(c, log), addr)
|
||||
}
|
||||
|
||||
// byAttribute is a wrapper similar to byAddress.
|
||||
// byAttribute is a wrapper similar to byNativeAddress.
|
||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
key, _ := c.UserValue("attr_key").(string)
|
||||
|
@ -298,7 +291,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
|||
return
|
||||
}
|
||||
|
||||
res, err := h.search(ctx, &bktInfo.CID, key, val, object.MatchStringEqual)
|
||||
res, err := h.search(ctx, bktInfo.CID, key, val, object.MatchStringEqual)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
|
@ -395,24 +388,50 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
|
|||
return bktInfo, err
|
||||
}
|
||||
|
||||
func (h *Handler) listObjects(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]map[string]string, error) {
|
||||
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
|
||||
func (h *Handler) browseIndex(c *fasthttp.RequestCtx) {
|
||||
if !h.config.IndexPageEnabled() {
|
||||
c.SetStatusCode(fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
cidURLParam := c.UserValue("cid").(string)
|
||||
oidURLParam := c.UserValue("oid").(string)
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
log := reqLog.With(zap.String("cid", cidURLParam), zap.String("oid", oidURLParam))
|
||||
|
||||
unescapedKey, err := url.QueryUnescape(oidURLParam)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
var objects = make([]map[string]string, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
meta := node.GetMeta()
|
||||
if meta == nil {
|
||||
continue
|
||||
}
|
||||
var obj = make(map[string]string, len(meta))
|
||||
for _, m := range meta {
|
||||
obj[m.GetKey()] = string(m.GetValue())
|
||||
}
|
||||
objects = append(objects, obj)
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
return objects, nil
|
||||
listFunc := h.getDirObjectsS3
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we need this if we have the same on line 433 ? Why do we need this if we have the same on line 433 ?
dkirillov
commented
And why we set status here rather than in And why we set status here rather than in `h.browseObjects`?
dkirillov
commented
By the way, why do we ever need to set success status? By the way, why do we ever need to set success status?
|
||||
isNativeList := false
|
||||
|
||||
err = h.tree.CheckSettingsNodeExist(ctx, bktInfo)
|
||||
if err != nil {
|
||||
if errors.Is(err, tree.ErrNodeNotFound) {
|
||||
// tree probe failed, try to use native
|
||||
listFunc = h.getDirObjectsNative
|
||||
isNativeList = true
|
||||
} else {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
h.browseObjects(c, browseParams{
|
||||
bucketInfo: bktInfo,
|
||||
prefix: unescapedKey,
|
||||
listObjects: listFunc,
|
||||
isNative: isNativeList,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
|
@ -57,10 +58,11 @@ func (c *configMock) IndexPageEnabled() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (c *configMock) IndexPageTemplatePath() string {
|
||||
func (c *configMock) IndexPageTemplate() string {
|
||||
return ""
|
||||
}
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Please, add new line Please, add new line
|
||||
func (c *configMock) IndexPageTemplate() string {
|
||||
|
||||
func (c *configMock) IndexPageNativeTemplate() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
|
@ -126,7 +128,11 @@ func prepareHandlerContext() (*handlerContext, error) {
|
|||
treeMock := &treeClientMock{}
|
||||
cfgMock := &configMock{}
|
||||
|
||||
handler := New(params, cfgMock, tree.NewTree(treeMock))
|
||||
workerPool, err := ants.NewPool(1000)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
handler := New(params, cfgMock, tree.NewTree(treeMock), workerPool)
|
||||
|
||||
return &handlerContext{
|
||||
key: key,
|
||||
|
|
|
@ -107,9 +107,9 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
|||
|
||||
err := id.DecodeString(test)
|
||||
if err != nil {
|
||||
h.byObjectName(c, h.headObject)
|
||||
h.byS3Path(c, h.headObject)
|
||||
} else {
|
||||
h.byAddress(c, h.headObject)
|
||||
h.byNativeAddress(c, h.headObject)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,17 +2,16 @@ package handler
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
|
@ -46,19 +45,21 @@ func isDir(name string) bool {
|
|||
return strings.HasSuffix(name, "/")
|
||||
}
|
||||
|
||||
func isObjectID(s string) bool {
|
||||
var objID oid.ID
|
||||
return objID.DecodeString(s) == nil
|
||||
}
|
||||
|
||||
func isContainerRoot(key string) bool {
|
||||
return key == ""
|
||||
}
|
||||
|
||||
func checkErrorType(err error) int {
|
||||
switch {
|
||||
case err == nil:
|
||||
return fasthttp.StatusOK
|
||||
case errors.Is(err, tree.ErrNodeAccessDenied):
|
||||
return fasthttp.StatusForbidden
|
||||
default:
|
||||
return fasthttp.StatusNotFound
|
||||
func loadAttributes(attrs []object.Attribute) map[string]string {
|
||||
result := make(map[string]string)
|
||||
for _, attr := range attrs {
|
||||
result[attr.Key()] = attr.Value()
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func isValidToken(s string) bool {
|
||||
|
|
|
@ -31,7 +31,8 @@ const (
|
|||
CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go
|
||||
AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go
|
||||
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go
|
||||
FailedToReadIndexPageTemplate = "failed to read index page template, set default" // Warn in ../../app.go
|
||||
FailedToCreateWorkerPool = "failed to create worker pool" // Fatal in ../../app.go
|
||||
FailedToReadIndexPageTemplate = "failed to read index page template" // Error in ../../app.go
|
||||
SetCustomIndexPageTemplate = "set custom index page template" // Info in ../../app.go
|
||||
ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go
|
||||
MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go
|
||||
|
@ -71,6 +72,9 @@ const (
|
|||
AddedStoragePeer = "added storage peer" // Info in ../../settings.go
|
||||
CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go
|
||||
CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go
|
||||
FailedToSumbitTaskToPool = "failed to submit task to pool" // Error in ../handler/browse.go
|
||||
FailedToHeadObject = "failed to head object" // Error in ../handler/browse.go
|
||||
FailedToIterateOverResponse = "failed to iterate over search response" // Error in ../handler/browse.go
|
||||
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go
|
||||
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
|
||||
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
|
||||
|
|
|
@ -1,11 +1,20 @@
|
|||
{{$bucketName := .BucketName}}
|
||||
{{$container := .Container}}
|
||||
{{ $prefix := trimPrefix .Prefix }}
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8"/>
|
||||
<title>Index of s3://{{$bucketName}}/{{if $prefix}}/{{$prefix}}/{{end}}</title>
|
||||
<title>Index of {{.Protocol}}://{{$container}}
|
||||
/{{if $prefix}}/{{$prefix}}/{{end}}</title>
|
||||
<style>
|
||||
.alert {
|
||||
width: 80%;
|
||||
box-sizing: border-box;
|
||||
padding: 20px;
|
||||
background-color: #f44336;
|
||||
color: white;
|
||||
margin-bottom: 15px;
|
||||
}
|
||||
table {
|
||||
width: 80%;
|
||||
border-collapse: collapse;
|
||||
|
@ -23,15 +32,25 @@
|
|||
th {
|
||||
background-color: #c3bcbc;
|
||||
}
|
||||
|
||||
h1 {
|
||||
font-size: 1.5em;
|
||||
}
|
||||
tr:nth-child(even) {background-color: #ebe7e7;}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Index of s3://{{$bucketName}}/{{if $prefix}}{{$prefix}}/{{end}}</h1>
|
||||
<h1>Index of {{.Protocol}}://{{$container}}/{{if $prefix}}{{$prefix}}/{{end}}</h1>
|
||||
{{ if .HasErrors }}
|
||||
<div class="alert">
|
||||
Errors occurred while processing the request. Perhaps some objects are missing
|
||||
</div>
|
||||
{{ end }}
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Filename</th>
|
||||
<th>OID</th>
|
||||
<th>Size</th>
|
||||
<th>Created</th>
|
||||
<th>Download</th>
|
||||
|
@ -42,20 +61,22 @@
|
|||
{{if $trimmedPrefix }}
|
||||
<tr>
|
||||
<td>
|
||||
⮐<a href="/get/{{$bucketName}}{{ urlencode $trimmedPrefix "" }}">..</a>
|
||||
⮐<a href="/get/{{$container}}{{ urlencode $trimmedPrefix }}/">..</a>
|
||||
</td>
|
||||
<td></td>
|
||||
<td></td>
|
||||
<td></td>
|
||||
<td></td>
|
||||
</tr>
|
||||
{{else}}
|
||||
<tr>
|
||||
<td>
|
||||
⮐<a href="/get/{{ $bucketName }}/">..</a>
|
||||
⮐<a href="/get/{{$container}}/">..</a>
|
||||
</td>
|
||||
<td></td>
|
||||
<td></td>
|
||||
<td></td>
|
||||
<td></td>
|
||||
</tr>
|
||||
{{end}}
|
||||
{{range .Objects}}
|
||||
|
@ -63,21 +84,22 @@
|
|||
<td>
|
||||
{{if .IsDir}}
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we change condition from Why do we change condition from `if not .IsDir` to `if .Size`?
nzinkevich
commented
Oh, .IsDir would be better to mark empty files with '0B' size Oh, .IsDir would be better to mark empty files with '0B' size
|
||||
🗀
|
||||
<a href="/get/{{ $bucketName }}{{ urlencode $prefix .FileName }}/">
|
||||
<a href="{{.GetURL}}/">
|
||||
{{.FileName}}/
|
||||
</a>
|
||||
{{else}}
|
||||
🗎
|
||||
<a href="/get/{{ $bucketName }}{{ urlencode $prefix .FileName }}">
|
||||
<a href="{{ .GetURL }}">
|
||||
{{.FileName}}
|
||||
</a>
|
||||
{{end}}
|
||||
</td>
|
||||
<td>{{.OID}}</td>
|
||||
<td>{{if not .IsDir}}{{ formatSize .Size }}{{end}}</td>
|
||||
<td>{{if not .IsDir}}{{ formatTimestamp .Created }}{{end}}</td>
|
||||
<td>{{ .Created }}</td>
|
||||
<td>
|
||||
{{ if not .IsDir }}
|
||||
<a href="/get/{{ $bucketName}}{{ urlencode $prefix .FileName }}?download=true">
|
||||
{{ if .OID }}
|
||||
<a href="{{ .GetURL }}?download=true">
|
||||
Link
|
||||
</a>
|
||||
{{ end }}
|
||||
|
|
104
tree/tree.go
|
@ -30,6 +30,11 @@ type (
|
|||
Meta map[string]string
|
||||
}
|
||||
|
||||
multiSystemNode struct {
|
||||
// the first element is latest
|
||||
nodes []*treeNode
|
||||
}
|
||||
|
||||
GetNodesParams struct {
|
||||
CnrID cid.ID
|
||||
BktInfo *data.BucketInfo
|
||||
|
@ -50,18 +55,19 @@ var (
|
|||
)
|
||||
|
||||
const (
|
||||
FileNameKey = "FileName"
|
||||
)
|
||||
FileNameKey = "FileName"
|
||||
settingsFileName = "bucket-settings"
|
||||
|
||||
const (
|
||||
oidKV = "OID"
|
||||
oidKV = "OID"
|
||||
uploadIDKV = "UploadId"
|
||||
sizeKV = "Size"
|
||||
|
||||
// keys for delete marker nodes.
|
||||
isDeleteMarkerKV = "IsDeleteMarker"
|
||||
sizeKV = "Size"
|
||||
|
||||
// versionTree -- ID of a tree with object versions.
|
||||
versionTree = "version"
|
||||
systemTree = "system"
|
||||
|
||||
separator = "/"
|
||||
)
|
||||
|
@ -135,6 +141,45 @@ func newNodeVersionFromTreeNode(treeNode *treeNode) *api.NodeVersion {
|
|||
return version
|
||||
}
|
||||
|
||||
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
|
||||
var (
|
||||
err error
|
||||
index int
|
||||
maxTimestamp uint64
|
||||
)
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return nil, errors.New("multi node must have at least one node")
|
||||
}
|
||||
|
||||
treeNodes := make([]*treeNode, len(nodes))
|
||||
|
||||
for i, node := range nodes {
|
||||
if treeNodes[i], err = newTreeNode(node); err != nil {
|
||||
return nil, fmt.Errorf("parse system node response: %w", err)
|
||||
}
|
||||
|
||||
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
|
||||
index = i
|
||||
maxTimestamp = timestamp
|
||||
}
|
||||
}
|
||||
|
||||
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
|
||||
|
||||
return &multiSystemNode{
|
||||
nodes: treeNodes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *multiSystemNode) Latest() *treeNode {
|
||||
return m.nodes[0]
|
||||
}
|
||||
|
||||
func (m *multiSystemNode) Old() []*treeNode {
|
||||
return m.nodes[1:]
|
||||
}
|
||||
|
||||
func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*api.NodeVersion, error) {
|
||||
nodes, err := c.GetVersions(ctx, cnrID, objectName)
|
||||
if err != nil {
|
||||
|
@ -165,6 +210,55 @@ func (c *Tree) GetVersions(ctx context.Context, cnrID *cid.ID, objectName string
|
|||
return c.service.GetNodes(ctx, p)
|
||||
}
|
||||
|
||||
func (c *Tree) CheckSettingsNodeExist(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||
_, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
|
||||
p := &GetNodesParams{
|
||||
CnrID: bktInfo.CID,
|
||||
BktInfo: bktInfo,
|
||||
TreeID: systemTree,
|
||||
Path: []string{name},
|
||||
LatestOnly: false,
|
||||
AllAttrs: true,
|
||||
}
|
||||
nodes, err := c.service.GetNodes(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes = filterMultipartNodes(nodes)
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return nil, ErrNodeNotFound
|
||||
}
|
||||
|
||||
return newMultiNode(nodes)
|
||||
}
|
||||
|
||||
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {
|
||||
res := make([]NodeResponse, 0, len(nodes))
|
||||
|
||||
LOOP:
|
||||
for _, node := range nodes {
|
||||
for _, meta := range node.GetMeta() {
|
||||
if meta.GetKey() == uploadIDKV {
|
||||
continue LOOP
|
||||
}
|
||||
}
|
||||
|
||||
res = append(res, node)
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
|
||||
var (
|
||||
maxCreationTime uint64
|
||||
|
|
It should be added to config files too I suppose