[#151] Index page for FrostFS containers #153

Merged
alexvanin merged 1 commit from nzinkevich/frostfs-http-gw:feature/index_page_native into master 2024-11-20 06:32:34 +00:00
16 changed files with 537 additions and 133 deletions

View file

@ -40,6 +40,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@ -89,6 +90,7 @@ type (
appSettings struct { appSettings struct {
reconnectInterval time.Duration reconnectInterval time.Duration
dialerSource *internalnet.DialerSource dialerSource *internalnet.DialerSource
workerPoolSize int
mu sync.RWMutex mu sync.RWMutex
defaultTimestamp bool defaultTimestamp bool
@ -184,6 +186,7 @@ func (a *app) initAppSettings() {
a.settings = &appSettings{ a.settings = &appSettings{
reconnectInterval: fetchReconnectInterval(a.cfg), reconnectInterval: fetchReconnectInterval(a.cfg),
dialerSource: getDialerSource(a.log, a.cfg), dialerSource: getDialerSource(a.log, a.cfg),
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize),
} }
a.settings.update(a.cfg, a.log) a.settings.update(a.cfg, a.log)
} }
@ -490,7 +493,13 @@ func (a *app) setHealthStatus() {
} }
func (a *app) Serve() { func (a *app) Serve() {
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool))) workerPool := a.initWorkerPool()
defer func() {
workerPool.Release()
close(a.webDone)
}()
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool)
// Configure router. // Configure router.
a.configureRouter(handler) a.configureRouter(handler)
@ -532,8 +541,14 @@ LOOP:
a.metrics.Shutdown() a.metrics.Shutdown()
a.stopServices() a.stopServices()
a.shutdownTracing() a.shutdownTracing()
}
close(a.webDone) func (a *app) initWorkerPool() *ants.Pool {
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
if err != nil {
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
}
return workerPool
} }
func (a *app) shutdownTracing() { func (a *app) shutdownTracing() {
@ -609,6 +624,7 @@ func (a *app) stopServices() {
svc.ShutDown(ctx) svc.ShutDown(ctx)
} }
} }
func (a *app) configureRouter(handler *handler.Handler) { func (a *app) configureRouter(handler *handler.Handler) {
r := router.New() r := router.New()
r.RedirectTrailingSlash = true r.RedirectTrailingSlash = true

View file

@ -71,6 +71,8 @@ const (
cfgIndexPageEnabled = "index_page.enabled" cfgIndexPageEnabled = "index_page.enabled"
cfgIndexPageTemplatePath = "index_page.template_path" cfgIndexPageTemplatePath = "index_page.template_path"
cfgWorkerPoolSize = "worker_pool_size"
// Web. // Web.
cfgWebReadBufferSize = "web.read_buffer_size" cfgWebReadBufferSize = "web.read_buffer_size"
cfgWebWriteBufferSize = "web.write_buffer_size" cfgWebWriteBufferSize = "web.write_buffer_size"
@ -228,9 +230,6 @@ func settings() *viper.Viper {
// pool: // pool:
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold) v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
v.SetDefault(cfgIndexPageEnabled, false)
v.SetDefault(cfgIndexPageTemplatePath, "")
// frostfs: // frostfs:
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut) v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
@ -242,6 +241,7 @@ func settings() *viper.Viper {
v.SetDefault(cfgWebStreamRequestBody, true) v.SetDefault(cfgWebStreamRequestBody, true)
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize) v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
v.SetDefault(cfgWorkerPoolSize, 1000)
// upload header // upload header
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false) v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)

View file

@ -150,3 +150,12 @@ HTTP_GW_MULTINET_FALLBACK_DELAY=300ms
# List of subnets and IP addresses to use as source for those subnets # List of subnets and IP addresses to use as source for those subnets
HTTP_GW_MULTINET_SUBNETS_1_MASK=1.2.3.4/24 HTTP_GW_MULTINET_SUBNETS_1_MASK=1.2.3.4/24
HTTP_GW_MULTINET_SUBNETS_1_SOURCE_IPS=1.2.3.4 1.2.3.5 HTTP_GW_MULTINET_SUBNETS_1_SOURCE_IPS=1.2.3.4 1.2.3.5
# Number of workers in handler's worker pool
HTTP_GW_WORKER_POOL_SIZE=1000
# Index page
# Enable index page support
HTTP_GW_INDEX_PAGE_ENABLED=false
# Index page template path
HTTP_GW_INDEX_PAGE_TEMPLATE_PATH=internal/handler/templates/index.gotmpl

View file

@ -113,6 +113,9 @@ request_timeout: 5s # Timeout to check node health during rebalance.
rebalance_timer: 30s # Interval to check nodes health. rebalance_timer: 30s # Interval to check nodes health.
pool_error_threshold: 100 # The number of errors on connection after which node is considered as unhealthy. pool_error_threshold: 100 # The number of errors on connection after which node is considered as unhealthy.
# Number of workers in handler's worker pool
worker_pool_size: 1000
# Enable index page to see objects list for specified container and prefix # Enable index page to see objects list for specified container and prefix
index_page: index_page:
enabled: false enabled: false

View file

@ -75,18 +75,21 @@ request_timeout: 5s
rebalance_timer: 30s rebalance_timer: 30s
pool_error_threshold: 100 pool_error_threshold: 100
reconnect_interval: 1m reconnect_interval: 1m
worker_pool_size: 1000

It should be added to config files too I suppose

It should be added to config files too I suppose
``` ```
| Parameter | Type | SIGHUP reload | Default value | Description | | Parameter | Type | SIGHUP reload | Default value | Description |
|------------------------|------------|---------------|---------------|-------------------------------------------------------------------------------------------------| |------------------------|------------|---------------|---------------|------------------------------------------------------------------------------------|
| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. | | `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. |
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. | | `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. |
| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. | | `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. |
| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. | | `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. |
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. | | `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. | | `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. | | `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. | | `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
| `worker_pool_size` | `int` | no | `1000` | Maximum worker count in handler's worker pool. |
# `wallet` section # `wallet` section
@ -374,7 +377,12 @@ resolve_bucket:
# `index_page` section # `index_page` section
Parameters for index HTML-page output with S3-bucket or S3-subdir content for `Get object` request Parameters for index HTML-page output. Activates if `GetObject` request returns `not found`. Two
index page modes available:
* `s3` mode uses tree service for listing objects,
* `native` sends requests to nodes via native protocol.
If request pass S3-bucket name instead of CID, `s3` mode will be used, otherwise `native`.
```yaml ```yaml
index_page: index_page:

1
go.mod
View file

@ -12,6 +12,7 @@ require (
github.com/docker/go-units v0.4.0 github.com/docker/go-units v0.4.0
github.com/fasthttp/router v1.4.1 github.com/fasthttp/router v1.4.1
github.com/nspcc-dev/neo-go v0.106.2 github.com/nspcc-dev/neo-go v0.106.2
github.com/panjf2000/ants/v2 v2.5.0
github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.5.0 github.com/prometheus/client_model v0.5.0
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5

2
go.sum
View file

@ -682,6 +682,8 @@ github.com/opencontainers/selinux v1.6.0/go.mod h1:VVGKuOLlE7v4PJyT6h7mNWvq1rzqi
github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3ogry1nUQF8Evvo= github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3ogry1nUQF8Evvo=
github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8= github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8=
github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI=
github.com/panjf2000/ants/v2 v2.5.0 h1:1rWGWSnxCsQBga+nQbA4/iY6VMeNoOIAM0ZWh9u3q2Q=
github.com/panjf2000/ants/v2 v2.5.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc=
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU= github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=

View file

@ -1,15 +1,21 @@
package handler package handler
import ( import (
"context"
"html/template" "html/template"
"net/url" "net/url"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/docker/go-units" "github.com/docker/go-units"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
@ -25,19 +31,68 @@ const (

I think this is basically unused and can be moved to test file. In the app you defined default pool size in settings.go

I think this is basically unused and can be moved to test file. In the app you defined default pool size in settings.go
type ( type (
BrowsePageData struct { BrowsePageData struct {
BucketName, HasErrors bool
Prefix string Container string
Objects []ResponseObject Prefix string
Protocol string
Objects []ResponseObject
} }
ResponseObject struct { ResponseObject struct {
OID string OID string
Created string Created string
FileName string FileName string
FilePath string
Size string Size string
IsDir bool IsDir bool
GetURL string
} }
) )
func newListObjectsResponseS3(attrs map[string]string) ResponseObject {
return ResponseObject{
Created: formatTimestamp(attrs[attrCreated]),
OID: attrs[attrOID],
FileName: attrs[attrFileName],
Size: attrs[attrSize],
IsDir: attrs[attrOID] == "",
}
}
func newListObjectsResponseNative(attrs map[string]string) ResponseObject {
dkirillov marked this conversation as resolved Outdated

Maybe it's better to save to fields value in appropriate format and don't invoke any functions from template?

Maybe it's better to save to fields value in appropriate format and don't invoke any functions from template?

Will we change this? It still invokes formatTimestamp from template

Will we change this? It still invokes `formatTimestamp` from template
filename := lastPathElement(attrs[object.AttributeFilePath])
if filename == "" {
filename = attrs[attrFileName]
}
return ResponseObject{
OID: attrs[attrOID],
Created: formatTimestamp(attrs[object.AttributeTimestamp] + "000"),
FileName: filename,
FilePath: attrs[object.AttributeFilePath],
Size: attrs[attrSize],
IsDir: false,
}
}
func getNextDir(filepath, prefix string) string {
restPath := strings.Replace(filepath, prefix, "", 1)
index := strings.Index(restPath, "/")
if index == -1 {
return ""
}
return restPath[:index]
}
func lastPathElement(path string) string {
if path == "" {
return path
}
index := strings.LastIndex(path, "/")
if index == len(path)-1 {
index = strings.LastIndex(path[:index], "/")
}
return path[index+1:]
}
func parseTimestamp(tstamp string) (time.Time, error) { func parseTimestamp(tstamp string) (time.Time, error) {
millis, err := strconv.ParseInt(tstamp, 10, 64) millis, err := strconv.ParseInt(tstamp, 10, 64)
if err != nil { if err != nil {
@ -47,16 +102,6 @@ func parseTimestamp(tstamp string) (time.Time, error) {
return time.UnixMilli(millis), nil return time.UnixMilli(millis), nil
} }
func NewResponseObject(nodes map[string]string) ResponseObject {
return ResponseObject{
OID: nodes[attrOID],
Created: nodes[attrCreated],
FileName: nodes[attrFileName],
Size: nodes[attrSize],
IsDir: nodes[attrOID] == "",
}
}
func formatTimestamp(strdate string) string { func formatTimestamp(strdate string) string {
date, err := parseTimestamp(strdate) date, err := parseTimestamp(strdate)
if err != nil || date.IsZero() { if err != nil || date.IsZero() {
@ -94,12 +139,9 @@ func trimPrefix(encPrefix string) string {
return prefix[:slashIndex] return prefix[:slashIndex]
} }
func urlencode(prefix, filename string) string { func urlencode(path string) string {
var res strings.Builder var res strings.Builder
path := filename
if prefix != "" {
path = strings.Join([]string{prefix, filename}, "/")
}
prefixParts := strings.Split(path, "/") prefixParts := strings.Split(path, "/")
for _, prefixPart := range prefixParts { for _, prefixPart := range prefixParts {
prefixPart = "/" + url.PathEscape(prefixPart) prefixPart = "/" + url.PathEscape(prefixPart)
@ -112,46 +154,220 @@ func urlencode(prefix, filename string) string {
return res.String() return res.String()
} }
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, bucketInfo *data.BucketInfo, prefix string) { type GetObjectsResponse struct {
objects []ResponseObject
hasErrors bool
}
func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) {
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
if err != nil {
return nil, err
}
result := &GetObjectsResponse{

Please don't use var when we initialize struct:

result:= &GetObjectReponse{
    objects:   make([]ResponseObject, 0, len(nodes)),
    hasErrors: false,
}

Additionally, we can skip setting hasErrors: false

Please don't use `var` when we initialize struct: ```golang result:= &GetObjectReponse{ objects: make([]ResponseObject, 0, len(nodes)), hasErrors: false, } ``` Additionally, we can skip setting `hasErrors: false`
objects: make([]ResponseObject, 0, len(nodes)),
}
for _, node := range nodes {
meta := node.GetMeta()
if meta == nil {
continue
}
dkirillov marked this conversation as resolved Outdated

Can we search all object if prefix is empty to be able to handle objects that have FileName rather than FilePath?

Can we search all object if prefix is empty to be able to handle objects that have `FileName` rather than `FilePath`?

Fixed

Fixed

I meant write something like this:

filters := object.NewSearchFilters()
filters.AddRootFilter()

if prefix != "" {
    filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix)
}
I meant write something like this: ```golang filters := object.NewSearchFilters() filters.AddRootFilter() if prefix != "" { filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix) } ```
var attrs = make(map[string]string, len(meta))
for _, m := range meta {
attrs[m.GetKey()] = string(m.GetValue())
}
obj := newListObjectsResponseS3(attrs)
obj.FilePath = prefix + obj.FileName
obj.GetURL = "/get/" + bucketInfo.Name + urlencode(obj.FilePath)
result.objects = append(result.objects, obj)
}
return result, nil
}
func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) {
var basePath string
if ind := strings.LastIndex(prefix, "/"); ind != -1 {
basePath = prefix[:ind+1]
dkirillov marked this conversation as resolved Outdated

Please use := syntax where possible

Please use `:=` syntax where possible
}
dkirillov marked this conversation as resolved Outdated

Matter of taste but

	var basePath string
	if ind := strings.LastIndex(prefix, "/"); ind != -1 {
		basePath = prefix[:ind+1]
	}
Matter of taste but ```golang var basePath string if ind := strings.LastIndex(prefix, "/"); ind != -1 { basePath = prefix[:ind+1] } ```
filters := object.NewSearchFilters()
filters.AddRootFilter()
if prefix != "" {
filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix)
}
prm := PrmObjectSearch{
dkirillov marked this conversation as resolved Outdated

It's better to create such channel somewhere in headDirObjects and close it there when no more objects be available

It's better to create such channel somewhere in `headDirObjects` and close it there when no more objects be available
PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx),
},
Container: bucketInfo.CID,
Filters: filters,
}
objectIDs, err := h.frostfs.SearchObjects(ctx, prm)
if err != nil {
return nil, err
}
defer objectIDs.Close()
resp, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
if err != nil {
return nil, err
}
log := utils.GetReqLogOrDefault(ctx, h.log)
dirs := make(map[string]struct{})
dkirillov marked this conversation as resolved Outdated

What do we want to do on such error?

  • Return error to client?
  • Ignore and list other?
  • Stop further heading and return client current objects?

cc @alexvanin

What do we want to do on such error? * Return error to client? * Ignore and list other? * Stop further heading and return client current objects? cc @alexvanin
result := &GetObjectsResponse{
objects: make([]ResponseObject, 0, 100),
}
for objExt := range resp {
if objExt.Error != nil {
dkirillov marked this conversation as resolved Outdated

We cannot use addr from other goroutine.

We cannot use `addr` from other goroutine.
log.Error(logs.FailedToHeadObject, zap.Error(objExt.Error))
result.hasErrors = true
continue
}
if objExt.Object.IsDir {
if _, ok := dirs[objExt.Object.FileName]; ok {
continue
}
objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + urlencode(objExt.Object.FilePath)
dirs[objExt.Object.FileName] = struct{}{}
} else {
objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + "/" + objExt.Object.OID
}
result.objects = append(result.objects, objExt.Object)
}
return result, nil
}
dkirillov marked this conversation as resolved Outdated

Why don't we use channel for this?

Why don't we use channel for this?
dkirillov marked this conversation as resolved Outdated

Let's write

func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (<-chan ResponseObjectExtended, error) {
Let's write ```golang func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (<-chan ResponseObjectExtended, error) { ```
type ResponseObjectExtended struct {
Object ResponseObject
alexvanin marked this conversation as resolved Outdated

As far as I understand, every simultaneous browse request will produce new pool with up to 1000 routines by default. That might increase goroutine usage very very fast under the load.

I suggest to use one pool of workers in the handler.

As far as I understand, every simultaneous browse request will produce new pool with up to 1000 routines by default. That might increase goroutine usage very very fast under the load. I suggest to use one pool of workers in the handler.
Error error
}
func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (<-chan ResponseObjectExtended, error) {
res := make(chan ResponseObjectExtended)
go func() {
defer close(res)
dkirillov marked this conversation as resolved Outdated

Please, write:

var wg sync.WaitGroup
var dirs sync.Map

Or

var (
    wg sync.WaitGroup
    dirs sync.Map
)
Please, write: ```golang var wg sync.WaitGroup var dirs sync.Map ``` Or ```golang var ( wg sync.WaitGroup dirs sync.Map ) ```
log := utils.GetReqLogOrDefault(ctx, h.log).With(
zap.String("cid", cnrID.EncodeToString()),
dkirillov marked this conversation as resolved Outdated

Pool size should be configurable. And default should be at least 100 (maybe 1000)

Pool size should be configurable. And default should be at least 100 (maybe 1000)
zap.String("path", basePath),
)
dkirillov marked this conversation as resolved Outdated

If I understand correctly, we stop iteration if any error is occurred. So here we should return true if context is canceled

If I understand correctly, we stop iteration if any error is occurred. So here we should `return true` if context is canceled
var wg sync.WaitGroup
err := objectIDs.Iterate(func(id oid.ID) bool {
dkirillov marked this conversation as resolved Outdated

It's better to restrict number of goroutines. See for example s3-gw listing

It's better to restrict number of goroutines. See for example s3-gw listing https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/commit/e35b582fe25a97c018367ef9244dfe93d4500ad9/api/layer/listing.go#L530
wg.Add(1)
err := h.workerPool.Submit(func() {
defer wg.Done()
var obj ResponseObjectExtended
obj.Object, obj.Error = h.headDirObject(ctx, cnrID, id, basePath)
res <- obj
})
dkirillov marked this conversation as resolved Outdated

Here we also should add wg.Done

Here we also should add `wg.Done`
if err != nil {
wg.Done()
log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err))
}
select {
case <-ctx.Done():
return true
default:
return false
}
})
if err != nil {
log.Error(logs.FailedToIterateOverResponse, zap.Error(err))
dkirillov marked this conversation as resolved Outdated

This 4 lines seems confusing to me.
Can we rewrite this code to make it more strightforward?
For example:

diff --git a/internal/handler/browse.go b/internal/handler/browse.go
index a2373d1..d3f84ae 100644
--- a/internal/handler/browse.go
+++ b/internal/handler/browse.go
@@ -12,7 +12,6 @@ import (
 	"time"
 
 	"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
-	"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
 	"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
 	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
 	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@@ -202,116 +201,118 @@ func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.Buck
 	}
 	defer objectIDs.Close()
 
-	return h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
-}
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
 
-type workerParams struct {
-	cnrID     cid.ID
-	objectIDs ResObjectSearch
-	basePath  string
-	errCh     chan error
-	objCh     chan ResponseObject
-	cancel    context.CancelFunc
-}
+	resp, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
+	if err != nil {
+		return nil, err
+	}
 
-func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) ([]ResponseObject, error) {
-	const initialSliceCapacity = 100
+	log := utils.GetReqLogOrDefault(ctx, h.log)
 
-	var wg sync.WaitGroup
-	var dirs sync.Map
-	log := h.log.With(
-		zap.String("cid", cnrID.EncodeToString()),
-		zap.String("path", basePath),
-	)
-	done := make(chan struct{})
-	objects := make([]ResponseObject, 0, initialSliceCapacity)
-	ctx, cancel := context.WithCancel(ctx)
-	p := workerParams{
-		cnrID:     cnrID,
-		objectIDs: objectIDs,
-		basePath:  basePath,
-		errCh:     make(chan error, 1),
-		objCh:     make(chan ResponseObject, 1),
-		cancel:    cancel,
-	}
-	defer cancel()
+	dirs := make(map[string]struct{})
+	objects := make([]ResponseObject, 0, 100)
+	for objExt := range resp {
+		if objExt.Error != nil {
+			log.Error("error", zap.Error(objExt.Error))
+			cancel()
+			continue
+		}
 
-	go func() {
-		for err := range p.errCh {
-			if err != nil {
-				log.Error(logs.FailedToHeadObject, zap.Error(err))
+		if objExt.Object.IsDir {
+			if _, ok := dirs[objExt.Object.FileName]; ok {
+				continue
 			}
+			dirs[objExt.Object.FileName] = struct{}{}
 		}
-		done <- struct{}{}
-	}()
 
-	go func() {
-		for obj := range p.objCh {
-			objects = append(objects, obj)
-		}
-		done <- struct{}{}
-	}()
+		objects = append(objects, objExt.Object)
+	}
 
+	return objects, nil
+}
+
+type ResponseObjectExtended struct {
+	Object ResponseObject
+	Error  error
+}
+
+func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (chan ResponseObjectExtended, error) {
 	pool, err := ants.NewPool(runtime.NumCPU())
 	if err != nil {
 		return nil, err
 	}
-	defer pool.Release()
-	err = objectIDs.Iterate(func(id oid.ID) bool {
-		wg.Add(1)
-		if err = pool.Submit(func() {
-			defer wg.Done()
-			h.headDirObject(ctx, id, &dirs, p)
-		}); err != nil {
-			p.errCh <- err
-		}
-		select {
-		case <-ctx.Done():
-			return true
-		default:
-			return false
+
+	res := make(chan ResponseObjectExtended)
+
+	go func() {
+		defer func() {
+			pool.Release()
+			close(res)
+		}()
+
+		log := utils.GetReqLogOrDefault(ctx, h.log).With(
+			zap.String("cid", cnrID.EncodeToString()),
+			zap.String("path", basePath),
+		)
+
+		var wg sync.WaitGroup
+		err = objectIDs.Iterate(func(id oid.ID) bool {
+			wg.Add(1)
+			err = pool.Submit(func() {
+				defer wg.Done()
+
+				var obj ResponseObjectExtended
+				obj.Object, obj.Error = h.headDirObject(ctx, cnrID, id, basePath)
+				res <- obj
+			})
+			if err != nil {
+				wg.Done()
+				log.Warn("FailedToSubmitTaskToPool", zap.Error(err))
+			}
+
+			select {
+			case <-ctx.Done():
+				return true
+			default:
+				return false
+			}
+		})
+		if err != nil {
+			log.Error("iterate", zap.Error(err))
 		}
-	})
-	wg.Wait()
-	close(p.errCh)
-	close(p.objCh)
-	<-done
-	<-done
 
-	if err != nil {
-		return nil, err
-	}
+		wg.Wait()
+	}()
 
-	return objects, nil
+	return res, nil
 }
 
-func (h *Handler) headDirObject(ctx context.Context, objID oid.ID, dirs *sync.Map, p workerParams) {
-	addr := newAddress(p.cnrID, objID)
+func (h *Handler) headDirObject(ctx context.Context, cnrID cid.ID, objID oid.ID, basePath string) (ResponseObject, error) {
+	addr := newAddress(cnrID, objID)
 	obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
 		PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)},
 		Address: addr,
 	})
 	if err != nil {
-		p.errCh <- err
-		p.cancel()
-		return
+		return ResponseObject{}, err
 	}
 
 	attrs := loadAttributes(obj.Attributes())
 	attrs[attrOID] = objID.EncodeToString()
 	attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
 
-	dirname := getNextDir(attrs[object.AttributeFilePath], p.basePath)
+	dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
 	if dirname == "" {
-		p.objCh <- newListObjectsResponseNative(attrs)
-	} else if _, ok := dirs.Load(dirname); !ok {
-		p.objCh <- ResponseObject{
-			FileName: dirname,
-			FilePath: p.basePath + dirname,
-			IsDir:    true,
-		}
-		dirs.Store(dirname, true)
+		return newListObjectsResponseNative(attrs), nil
 	}
+
+	return ResponseObject{
+		FileName: dirname,
+		FilePath: basePath + dirname,
+		IsDir:    true,
+	}, nil
 }
 
 type browseParams struct {

This 4 lines seems confusing to me. Can we rewrite this code to make it more strightforward? For example: ```diff diff --git a/internal/handler/browse.go b/internal/handler/browse.go index a2373d1..d3f84ae 100644 --- a/internal/handler/browse.go +++ b/internal/handler/browse.go @@ -12,7 +12,6 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -202,116 +201,118 @@ func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.Buck } defer objectIDs.Close() - return h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath) -} + ctx, cancel := context.WithCancel(ctx) + defer cancel() -type workerParams struct { - cnrID cid.ID - objectIDs ResObjectSearch - basePath string - errCh chan error - objCh chan ResponseObject - cancel context.CancelFunc -} + resp, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath) + if err != nil { + return nil, err + } -func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) ([]ResponseObject, error) { - const initialSliceCapacity = 100 + log := utils.GetReqLogOrDefault(ctx, h.log) - var wg sync.WaitGroup - var dirs sync.Map - log := h.log.With( - zap.String("cid", cnrID.EncodeToString()), - zap.String("path", basePath), - ) - done := make(chan struct{}) - objects := make([]ResponseObject, 0, initialSliceCapacity) - ctx, cancel := context.WithCancel(ctx) - p := workerParams{ - cnrID: cnrID, - objectIDs: objectIDs, - basePath: basePath, - errCh: make(chan error, 1), - objCh: make(chan ResponseObject, 1), - cancel: cancel, - } - defer cancel() + dirs := make(map[string]struct{}) + objects := make([]ResponseObject, 0, 100) + for objExt := range resp { + if objExt.Error != nil { + log.Error("error", zap.Error(objExt.Error)) + cancel() + continue + } - go func() { - for err := range p.errCh { - if err != nil { - log.Error(logs.FailedToHeadObject, zap.Error(err)) + if objExt.Object.IsDir { + if _, ok := dirs[objExt.Object.FileName]; ok { + continue } + dirs[objExt.Object.FileName] = struct{}{} } - done <- struct{}{} - }() - go func() { - for obj := range p.objCh { - objects = append(objects, obj) - } - done <- struct{}{} - }() + objects = append(objects, objExt.Object) + } + return objects, nil +} + +type ResponseObjectExtended struct { + Object ResponseObject + Error error +} + +func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (chan ResponseObjectExtended, error) { pool, err := ants.NewPool(runtime.NumCPU()) if err != nil { return nil, err } - defer pool.Release() - err = objectIDs.Iterate(func(id oid.ID) bool { - wg.Add(1) - if err = pool.Submit(func() { - defer wg.Done() - h.headDirObject(ctx, id, &dirs, p) - }); err != nil { - p.errCh <- err - } - select { - case <-ctx.Done(): - return true - default: - return false + + res := make(chan ResponseObjectExtended) + + go func() { + defer func() { + pool.Release() + close(res) + }() + + log := utils.GetReqLogOrDefault(ctx, h.log).With( + zap.String("cid", cnrID.EncodeToString()), + zap.String("path", basePath), + ) + + var wg sync.WaitGroup + err = objectIDs.Iterate(func(id oid.ID) bool { + wg.Add(1) + err = pool.Submit(func() { + defer wg.Done() + + var obj ResponseObjectExtended + obj.Object, obj.Error = h.headDirObject(ctx, cnrID, id, basePath) + res <- obj + }) + if err != nil { + wg.Done() + log.Warn("FailedToSubmitTaskToPool", zap.Error(err)) + } + + select { + case <-ctx.Done(): + return true + default: + return false + } + }) + if err != nil { + log.Error("iterate", zap.Error(err)) } - }) - wg.Wait() - close(p.errCh) - close(p.objCh) - <-done - <-done - if err != nil { - return nil, err - } + wg.Wait() + }() - return objects, nil + return res, nil } -func (h *Handler) headDirObject(ctx context.Context, objID oid.ID, dirs *sync.Map, p workerParams) { - addr := newAddress(p.cnrID, objID) +func (h *Handler) headDirObject(ctx context.Context, cnrID cid.ID, objID oid.ID, basePath string) (ResponseObject, error) { + addr := newAddress(cnrID, objID) obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{ PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)}, Address: addr, }) if err != nil { - p.errCh <- err - p.cancel() - return + return ResponseObject{}, err } attrs := loadAttributes(obj.Attributes()) attrs[attrOID] = objID.EncodeToString() attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10) - dirname := getNextDir(attrs[object.AttributeFilePath], p.basePath) + dirname := getNextDir(attrs[object.AttributeFilePath], basePath) if dirname == "" { - p.objCh <- newListObjectsResponseNative(attrs) - } else if _, ok := dirs.Load(dirname); !ok { - p.objCh <- ResponseObject{ - FileName: dirname, - FilePath: p.basePath + dirname, - IsDir: true, - } - dirs.Store(dirname, true) + return newListObjectsResponseNative(attrs), nil } + + return ResponseObject{ + FileName: dirname, + FilePath: basePath + dirname, + IsDir: true, + }, nil } type browseParams struct { ```
}
wg.Wait()
}()
return res, nil
}
func (h *Handler) headDirObject(ctx context.Context, cnrID cid.ID, objID oid.ID, basePath string) (ResponseObject, error) {
addr := newAddress(cnrID, objID)
obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)},
Address: addr,
})
if err != nil {
return ResponseObject{}, err
}
attrs := loadAttributes(obj.Attributes())
attrs[attrOID] = objID.EncodeToString()
if multipartSize, ok := attrs[attributeMultipartObjectSize]; ok {
attrs[attrSize] = multipartSize
} else {
attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
}
dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
if dirname == "" {
return newListObjectsResponseNative(attrs), nil
}
return ResponseObject{
FileName: dirname,
FilePath: basePath + dirname,
IsDir: true,
}, nil
}
type browseParams struct {
bucketInfo *data.BucketInfo
prefix string
isNative bool
listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error)
}
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
const S3Protocol = "s3"
const FrostfsProtocol = "frostfs"
ctx := utils.GetContextFromRequest(c) ctx := utils.GetContextFromRequest(c)
reqLog := utils.GetReqLogOrDefault(ctx, h.log) reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(zap.String("bucket", bucketInfo.Name)) log := reqLog.With(
zap.String("bucket", p.bucketInfo.Name),
nodes, err := h.listObjects(ctx, bucketInfo, prefix) zap.String("container", p.bucketInfo.CID.EncodeToString()),
zap.String("prefix", p.prefix),
)
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) logAndSendBucketError(c, log, err)
return return
} }
respObjects := make([]ResponseObject, len(nodes)) objects := resp.objects
sort.Slice(objects, func(i, j int) bool {
for i, node := range nodes { if objects[i].IsDir == objects[j].IsDir {
respObjects[i] = NewResponseObject(node) return objects[i].FileName < objects[j].FileName
}
sort.Slice(respObjects, func(i, j int) bool {
if respObjects[i].IsDir == respObjects[j].IsDir {
return respObjects[i].FileName < respObjects[j].FileName
} }
return respObjects[i].IsDir return objects[i].IsDir
}) })
indexTemplate := h.config.IndexPageTemplate()
tmpl, err := template.New("index").Funcs(template.FuncMap{ tmpl, err := template.New("index").Funcs(template.FuncMap{
"formatTimestamp": formatTimestamp, "formatSize": formatSize,
"formatSize": formatSize, "trimPrefix": trimPrefix,
"trimPrefix": trimPrefix, "urlencode": urlencode,
"urlencode": urlencode, "parentDir": parentDir,
"parentDir": parentDir, }).Parse(h.config.IndexPageTemplate())
}).Parse(indexTemplate)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) logAndSendBucketError(c, log, err)
return return
} }
bucketName := p.bucketInfo.Name
protocol := S3Protocol
if p.isNative {
bucketName = p.bucketInfo.CID.EncodeToString()
protocol = FrostfsProtocol
}
if err = tmpl.Execute(c, &BrowsePageData{ if err = tmpl.Execute(c, &BrowsePageData{
BucketName: bucketInfo.Name, Container: bucketName,
Prefix: prefix, Prefix: p.prefix,
Objects: respObjects, Objects: objects,
Protocol: protocol,
HasErrors: resp.hasErrors,
}); err != nil { }); err != nil {
logAndSendBucketError(c, log, err) logAndSendBucketError(c, log, err)
return return

View file

@ -23,13 +23,16 @@ import (
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format. // DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) { func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
test, _ := c.UserValue("oid").(string) oidURLParam := c.UserValue("oid").(string)
dkirillov marked this conversation as resolved Outdated

Why did we change oid to cid?

Why did we change `oid` to `cid`?

Oh. It seems I got it. Then we should rename h.byObjectName and h.byAddress so that it reflect reality.
And/or be able to handle urls like get/<bucket-name>/<oid>

Oh. It seems I got it. Then we should rename `h.byObjectName` and `h.byAddress` so that it reflect reality. And/or be able to handle urls like `get/<bucket-name>/<oid>`
var id oid.ID downloadQueryParam := c.QueryArgs().GetBool("download")
dkirillov marked this conversation as resolved Outdated

We usually use cnrID name. So let's write:

cnrIDStr, _ := c.UserValue("cid").(string)
var cnrID cid.ID
We usually use `cnrID` name. So let's write: ```golang cnrIDStr, _ := c.UserValue("cid").(string) var cnrID cid.ID ```
err := id.DecodeString(test)

What's the difference between checking oid before and cid now?

What's the difference between checking `oid` before and `cid` now?
if err != nil { switch {
h.byObjectName(c, h.receiveFile) case isObjectID(oidURLParam):

Why don't we check cid also?

Why don't we check `cid` also?

There may be also container identifier from NNS, and we want to resolve them too. So I check only oid, as it was before

There may be also container identifier from NNS, and we want to resolve them too. So I check only `oid`, as it was before

Then if we create object 5rnoerMhK3kSxDEUh2FrSQvSW86P2deahSifFLxFBF9v into bucket my-bucket and I wanted to get it by http-gw like curl http://<http-gw>/get/my-bucket/5rnoerMhK3kSxDEUh2FrSQvSW86P2deahSifFLxFBF9v I wil got 404. Is it ok?

Then if we create object `5rnoerMhK3kSxDEUh2FrSQvSW86P2deahSifFLxFBF9v` into bucket `my-bucket` and I wanted to get it by http-gw like `curl http://<http-gw>/get/my-bucket/5rnoerMhK3kSxDEUh2FrSQvSW86P2deahSifFLxFBF9v` I wil got `404`. Is it ok?
} else { h.byNativeAddress(c, h.receiveFile)
h.byAddress(c, h.receiveFile) case !isContainerRoot(oidURLParam) && (downloadQueryParam || !isDir(oidURLParam)):
h.byS3Path(c, h.receiveFile)
default:
h.browseIndex(c)
} }
} }
@ -45,7 +48,7 @@ func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
h.byAttribute(c, h.receiveFile) h.byAttribute(c, h.receiveFile)
} }
func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) { func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
filters := object.NewSearchFilters() filters := object.NewSearchFilters()
filters.AddRootFilter() filters.AddRootFilter()
filters.AddFilter(key, val, op) filters.AddFilter(key, val, op)
@ -54,7 +57,7 @@ func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op
PrmAuth: PrmAuth{ PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx), BearerToken: bearerToken(ctx),
}, },
Container: *cnrID, Container: cnrID,
Filters: filters, Filters: filters,
} }
@ -102,7 +105,7 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
return return
} }
resSearch, err := h.search(ctx, &bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil { if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)

View file

@ -22,6 +22,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/panjf2000/ants/v2"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -165,6 +166,7 @@ type Handler struct {
containerResolver ContainerResolver containerResolver ContainerResolver
tree *tree.Tree tree *tree.Tree
cache *cache.BucketCache cache *cache.BucketCache
workerPool *ants.Pool
} }
type AppParams struct { type AppParams struct {
@ -175,7 +177,7 @@ type AppParams struct {
Cache *cache.BucketCache Cache *cache.BucketCache
} }
func New(params *AppParams, config Config, tree *tree.Tree) *Handler { func New(params *AppParams, config Config, tree *tree.Tree, workerPool *ants.Pool) *Handler {
return &Handler{ return &Handler{
log: params.Logger, log: params.Logger,
frostfs: params.FrostFS, frostfs: params.FrostFS,
@ -184,14 +186,15 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler {
containerResolver: params.Resolver, containerResolver: params.Resolver,
tree: tree, tree: tree,
cache: params.Cache, cache: params.Cache,
workerPool: workerPool,
} }
} }
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it. // prepares request and object address to it.
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { func (h *Handler) byNativeAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
alexvanin marked this conversation as resolved Outdated

After internal demo, let's modify this code a bit.

We want to determine which browse function to use based on tree data.
If tree data is present, then use h.getDirObjectsS3,
If tree data is absent, then use h.getDirObjectsNative.

After internal demo, let's modify this code a bit. We want to determine which browse function to use based on tree data. If tree data is present, then use `h.getDirObjectsS3`, If tree data is absent, then use `h.getDirObjectsNative`.
idCnr, _ := c.UserValue("cid").(string) idCnr, _ := c.UserValue("cid").(string)
dkirillov marked this conversation as resolved Outdated

Please, write:

		idCnr, _ := c.UserValue("cid").(string)
		idObj, _ := url.PathUnescape(c.UserValue("oid").(string))
		log      := h.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
Please, write: ```golang idCnr, _ := c.UserValue("cid").(string) idObj, _ := url.PathUnescape(c.UserValue("oid").(string)) log := h.log.With(zap.String("cid", idCnr), zap.String("oid", idObj)) ```
idObj, _ := c.UserValue("oid").(string) idObj, _ := url.PathUnescape(c.UserValue("oid").(string))
ctx := utils.GetContextFromRequest(c) ctx := utils.GetContextFromRequest(c)
reqLog := utils.GetReqLogOrDefault(ctx, h.log) reqLog := utils.GetReqLogOrDefault(ctx, h.log)
@ -215,12 +218,11 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
f(ctx, *h.newRequest(c, log), addr) f(ctx, *h.newRequest(c, log), addr)
} }
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that // byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
dkirillov marked this conversation as resolved Outdated

We do the same GetObject request in f function. It's better to use HeadObject.

By the way, do we really need this? If oid is actual object id why do we list objects?

We do the same `GetObject` request in `f` function. It's better to use `HeadObject`. By the way, do we really need this? If `oid` is actual object id why do we list objects?

We only checked before that idObjstring is decodable to oid type, but I think we should also output index page if no object found. That's why I try to GetObject (Head is better, I agree)

We only checked before that `idObj`string is decodable to oid type, but I think we should also output index page if no object found. That's why I try to `GetObject` (Head is better, I agree)

I think we should also output index page if no object found

Could you explain why?

> I think we should also output index page if no object found Could you explain why?

Well, I thought the S3 index page worked similarly, but I forgot that I removed this feature in the final implementation. Apache also does not return an index page if page not found. So I agree, it shouldn't return index page in that case

Well, I thought the S3 index page worked similarly, but I forgot that I removed this feature in the final implementation. Apache also does not return an index page if page not found. So I agree, it shouldn't return index page in that case
// prepares request and object address to it. // resolves object address from S3-like path <bucket name>/<object key>.
func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { func (h *Handler) byS3Path(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
bucketname := c.UserValue("cid").(string) bucketname := c.UserValue("cid").(string)
key := c.UserValue("oid").(string) key := c.UserValue("oid").(string)
download := c.QueryArgs().GetBool("download")
ctx := utils.GetContextFromRequest(c) ctx := utils.GetContextFromRequest(c)
reqLog := utils.GetReqLogOrDefault(ctx, h.log) reqLog := utils.GetReqLogOrDefault(ctx, h.log)
@ -239,15 +241,6 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
} }
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, unescapedKey) foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, unescapedKey)
if h.config.IndexPageEnabled() && !download && string(c.Method()) != fasthttp.MethodHead {
if isDir(unescapedKey) || isContainerRoot(unescapedKey) {
if code := checkErrorType(err); code == fasthttp.StatusNotFound || code == fasthttp.StatusOK {
c.SetStatusCode(code)
h.browseObjects(c, bktInfo, unescapedKey)
return
}
}
}
if err != nil { if err != nil {
if errors.Is(err, tree.ErrNodeAccessDenied) { if errors.Is(err, tree.ErrNodeAccessDenied) {
response.Error(c, "Access Denied", fasthttp.StatusForbidden) response.Error(c, "Access Denied", fasthttp.StatusForbidden)
@ -267,7 +260,7 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
f(ctx, *h.newRequest(c, log), addr) f(ctx, *h.newRequest(c, log), addr)
} }
// byAttribute is a wrapper similar to byAddress. // byAttribute is a wrapper similar to byNativeAddress.
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
scid, _ := c.UserValue("cid").(string) scid, _ := c.UserValue("cid").(string)
key, _ := c.UserValue("attr_key").(string) key, _ := c.UserValue("attr_key").(string)
@ -298,7 +291,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
return return
} }
res, err := h.search(ctx, &bktInfo.CID, key, val, object.MatchStringEqual) res, err := h.search(ctx, bktInfo.CID, key, val, object.MatchStringEqual)
if err != nil { if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
@ -395,24 +388,50 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
return bktInfo, err return bktInfo, err
} }
func (h *Handler) listObjects(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]map[string]string, error) { func (h *Handler) browseIndex(c *fasthttp.RequestCtx) {
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true) if !h.config.IndexPageEnabled() {
c.SetStatusCode(fasthttp.StatusNotFound)
return
}
cidURLParam := c.UserValue("cid").(string)
oidURLParam := c.UserValue("oid").(string)
ctx := utils.GetContextFromRequest(c)
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(zap.String("cid", cidURLParam), zap.String("oid", oidURLParam))
unescapedKey, err := url.QueryUnescape(oidURLParam)
if err != nil { if err != nil {
return nil, err logAndSendBucketError(c, log, err)
return
} }
var objects = make([]map[string]string, 0, len(nodes)) bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log)
for _, node := range nodes { if err != nil {
meta := node.GetMeta() logAndSendBucketError(c, log, err)
if meta == nil { return
continue
}
var obj = make(map[string]string, len(meta))
for _, m := range meta {
obj[m.GetKey()] = string(m.GetValue())
}
objects = append(objects, obj)
} }
return objects, nil listFunc := h.getDirObjectsS3
dkirillov marked this conversation as resolved Outdated

Why do we need this if we have the same on line 433 ?

Why do we need this if we have the same on line 433 ?

And why we set status here rather than in h.browseObjects?

And why we set status here rather than in `h.browseObjects`?

By the way, why do we ever need to set success status?

By the way, why do we ever need to set success status?
isNativeList := false
err = h.tree.CheckSettingsNodeExist(ctx, bktInfo)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
// tree probe failed, try to use native
listFunc = h.getDirObjectsNative
isNativeList = true
} else {
logAndSendBucketError(c, log, err)
return
}
}
h.browseObjects(c, browseParams{
bucketInfo: bktInfo,
prefix: unescapedKey,
listObjects: listFunc,
isNative: isNativeList,
})
} }

View file

@ -26,6 +26,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
@ -57,10 +58,11 @@ func (c *configMock) IndexPageEnabled() bool {
return false return false
} }
func (c *configMock) IndexPageTemplatePath() string { func (c *configMock) IndexPageTemplate() string {
return "" return ""
} }
dkirillov marked this conversation as resolved Outdated

Please, add new line

Please, add new line
func (c *configMock) IndexPageTemplate() string {
func (c *configMock) IndexPageNativeTemplate() string {
return "" return ""
} }
@ -126,7 +128,11 @@ func prepareHandlerContext() (*handlerContext, error) {
treeMock := &treeClientMock{} treeMock := &treeClientMock{}
cfgMock := &configMock{} cfgMock := &configMock{}
handler := New(params, cfgMock, tree.NewTree(treeMock)) workerPool, err := ants.NewPool(1000)
if err != nil {
return nil, err
}
handler := New(params, cfgMock, tree.NewTree(treeMock), workerPool)
return &handlerContext{ return &handlerContext{
key: key, key: key,

View file

@ -107,9 +107,9 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
err := id.DecodeString(test) err := id.DecodeString(test)
if err != nil { if err != nil {
h.byObjectName(c, h.headObject) h.byS3Path(c, h.headObject)
} else { } else {
h.byAddress(c, h.headObject) h.byNativeAddress(c, h.headObject)
} }
} }

View file

@ -2,17 +2,16 @@ package handler
import ( import (
"context" "context"
"errors"
"strings" "strings"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
@ -46,19 +45,21 @@ func isDir(name string) bool {
return strings.HasSuffix(name, "/") return strings.HasSuffix(name, "/")
} }
func isObjectID(s string) bool {
var objID oid.ID
return objID.DecodeString(s) == nil
}
func isContainerRoot(key string) bool { func isContainerRoot(key string) bool {
return key == "" return key == ""
} }
func checkErrorType(err error) int { func loadAttributes(attrs []object.Attribute) map[string]string {
switch { result := make(map[string]string)
case err == nil: for _, attr := range attrs {
return fasthttp.StatusOK result[attr.Key()] = attr.Value()
case errors.Is(err, tree.ErrNodeAccessDenied):
return fasthttp.StatusForbidden
default:
return fasthttp.StatusNotFound
} }
return result
} }
func isValidToken(s string) bool { func isValidToken(s string) bool {

View file

@ -31,7 +31,8 @@ const (
CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go
AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go
FailedToReadIndexPageTemplate = "failed to read index page template, set default" // Warn in ../../app.go FailedToCreateWorkerPool = "failed to create worker pool" // Fatal in ../../app.go
FailedToReadIndexPageTemplate = "failed to read index page template" // Error in ../../app.go
SetCustomIndexPageTemplate = "set custom index page template" // Info in ../../app.go SetCustomIndexPageTemplate = "set custom index page template" // Info in ../../app.go
ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go
MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go
@ -71,6 +72,9 @@ const (
AddedStoragePeer = "added storage peer" // Info in ../../settings.go AddedStoragePeer = "added storage peer" // Info in ../../settings.go
CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go
CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go
FailedToSumbitTaskToPool = "failed to submit task to pool" // Error in ../handler/browse.go
FailedToHeadObject = "failed to head object" // Error in ../handler/browse.go
FailedToIterateOverResponse = "failed to iterate over search response" // Error in ../handler/browse.go
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go

View file

@ -1,11 +1,20 @@
{{$bucketName := .BucketName}} {{$container := .Container}}
{{ $prefix := trimPrefix .Prefix }} {{ $prefix := trimPrefix .Prefix }}
<!DOCTYPE html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8"/> <meta charset="UTF-8"/>
<title>Index of s3://{{$bucketName}}/{{if $prefix}}/{{$prefix}}/{{end}}</title> <title>Index of {{.Protocol}}://{{$container}}
/{{if $prefix}}/{{$prefix}}/{{end}}</title>
<style> <style>
.alert {
width: 80%;
box-sizing: border-box;
padding: 20px;
background-color: #f44336;
color: white;
margin-bottom: 15px;
}
table { table {
width: 80%; width: 80%;
border-collapse: collapse; border-collapse: collapse;
@ -23,15 +32,25 @@
th { th {
background-color: #c3bcbc; background-color: #c3bcbc;
} }
h1 {
font-size: 1.5em;
}
tr:nth-child(even) {background-color: #ebe7e7;} tr:nth-child(even) {background-color: #ebe7e7;}
</style> </style>
</head> </head>
<body> <body>
<h1>Index of s3://{{$bucketName}}/{{if $prefix}}{{$prefix}}/{{end}}</h1> <h1>Index of {{.Protocol}}://{{$container}}/{{if $prefix}}{{$prefix}}/{{end}}</h1>
{{ if .HasErrors }}
<div class="alert">
Errors occurred while processing the request. Perhaps some objects are missing
</div>
{{ end }}
<table> <table>
<thead> <thead>
<tr> <tr>
<th>Filename</th> <th>Filename</th>
<th>OID</th>
<th>Size</th> <th>Size</th>
<th>Created</th> <th>Created</th>
<th>Download</th> <th>Download</th>
@ -42,20 +61,22 @@
{{if $trimmedPrefix }} {{if $trimmedPrefix }}
<tr> <tr>
<td> <td>
⮐<a href="/get/{{$bucketName}}{{ urlencode $trimmedPrefix "" }}">..</a> ⮐<a href="/get/{{$container}}{{ urlencode $trimmedPrefix }}/">..</a>
</td> </td>
<td></td> <td></td>
<td></td> <td></td>
<td></td> <td></td>
<td></td>
</tr> </tr>
{{else}} {{else}}
<tr> <tr>
<td> <td>
⮐<a href="/get/{{ $bucketName }}/">..</a> ⮐<a href="/get/{{$container}}/">..</a>
</td> </td>
<td></td> <td></td>
<td></td> <td></td>
<td></td> <td></td>
<td></td>
</tr> </tr>
{{end}} {{end}}
{{range .Objects}} {{range .Objects}}
@ -63,21 +84,22 @@
<td> <td>
{{if .IsDir}} {{if .IsDir}}
dkirillov marked this conversation as resolved Outdated

Why do we change condition from if not .IsDir to if .Size?

Why do we change condition from `if not .IsDir` to `if .Size`?

Oh, .IsDir would be better to mark empty files with '0B' size

Oh, .IsDir would be better to mark empty files with '0B' size
🗀 🗀
<a href="/get/{{ $bucketName }}{{ urlencode $prefix .FileName }}/"> <a href="{{.GetURL}}/">
{{.FileName}}/ {{.FileName}}/
</a> </a>
{{else}} {{else}}
🗎 🗎
<a href="/get/{{ $bucketName }}{{ urlencode $prefix .FileName }}"> <a href="{{ .GetURL }}">
{{.FileName}} {{.FileName}}
</a> </a>
{{end}} {{end}}
</td> </td>
<td>{{.OID}}</td>
<td>{{if not .IsDir}}{{ formatSize .Size }}{{end}}</td> <td>{{if not .IsDir}}{{ formatSize .Size }}{{end}}</td>
<td>{{if not .IsDir}}{{ formatTimestamp .Created }}{{end}}</td> <td>{{ .Created }}</td>
<td> <td>
{{ if not .IsDir }} {{ if .OID }}
<a href="/get/{{ $bucketName}}{{ urlencode $prefix .FileName }}?download=true"> <a href="{{ .GetURL }}?download=true">
Link Link
</a> </a>
{{ end }} {{ end }}

View file

@ -30,6 +30,11 @@ type (
Meta map[string]string Meta map[string]string
} }
multiSystemNode struct {
// the first element is latest
nodes []*treeNode
}
GetNodesParams struct { GetNodesParams struct {
CnrID cid.ID CnrID cid.ID
BktInfo *data.BucketInfo BktInfo *data.BucketInfo
@ -50,18 +55,19 @@ var (
) )
const ( const (
FileNameKey = "FileName" FileNameKey = "FileName"
) settingsFileName = "bucket-settings"
const ( oidKV = "OID"
oidKV = "OID" uploadIDKV = "UploadId"
sizeKV = "Size"
// keys for delete marker nodes. // keys for delete marker nodes.
isDeleteMarkerKV = "IsDeleteMarker" isDeleteMarkerKV = "IsDeleteMarker"
sizeKV = "Size"
// versionTree -- ID of a tree with object versions. // versionTree -- ID of a tree with object versions.
versionTree = "version" versionTree = "version"
systemTree = "system"
separator = "/" separator = "/"
) )
@ -135,6 +141,45 @@ func newNodeVersionFromTreeNode(treeNode *treeNode) *api.NodeVersion {
return version return version
} }
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
var (
err error
index int
maxTimestamp uint64
)
if len(nodes) == 0 {
return nil, errors.New("multi node must have at least one node")
}
treeNodes := make([]*treeNode, len(nodes))
for i, node := range nodes {
if treeNodes[i], err = newTreeNode(node); err != nil {
return nil, fmt.Errorf("parse system node response: %w", err)
}
if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp {
index = i
maxTimestamp = timestamp
}
}
treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0]
return &multiSystemNode{
nodes: treeNodes,
}, nil
}
func (m *multiSystemNode) Latest() *treeNode {
return m.nodes[0]
}
func (m *multiSystemNode) Old() []*treeNode {
return m.nodes[1:]
}
func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*api.NodeVersion, error) { func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*api.NodeVersion, error) {
nodes, err := c.GetVersions(ctx, cnrID, objectName) nodes, err := c.GetVersions(ctx, cnrID, objectName)
if err != nil { if err != nil {
@ -165,6 +210,55 @@ func (c *Tree) GetVersions(ctx context.Context, cnrID *cid.ID, objectName string
return c.service.GetNodes(ctx, p) return c.service.GetNodes(ctx, p)
} }
func (c *Tree) CheckSettingsNodeExist(ctx context.Context, bktInfo *data.BucketInfo) error {
_, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
if err != nil {
return err
}
return nil
}
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) {
p := &GetNodesParams{
CnrID: bktInfo.CID,
BktInfo: bktInfo,
TreeID: systemTree,
Path: []string{name},
LatestOnly: false,
AllAttrs: true,
}
nodes, err := c.service.GetNodes(ctx, p)
if err != nil {
return nil, err
}
nodes = filterMultipartNodes(nodes)
if len(nodes) == 0 {
return nil, ErrNodeNotFound
}
return newMultiNode(nodes)
}
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {
res := make([]NodeResponse, 0, len(nodes))
LOOP:
for _, node := range nodes {
for _, meta := range node.GetMeta() {
if meta.GetKey() == uploadIDKV {
continue LOOP
}
}
res = append(res, node)
}
return res
}
func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) { func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
var ( var (
maxCreationTime uint64 maxCreationTime uint64