From e18e5b4fc57041a10dfdb6649e0d513e952d71be Mon Sep 17 00:00:00 2001 From: Nikita Zinkevich Date: Thu, 10 Oct 2024 11:59:53 +0300 Subject: [PATCH] [#151] index page: Add browse via native protocol Signed-off-by: Nikita Zinkevich --- cmd/http-gw/app.go | 20 +- cmd/http-gw/settings.go | 5 +- docs/gate-configuration.md | 30 +-- go.mod | 1 + go.sum | 2 + internal/handler/browse.go | 301 ++++++++++++++++++++++++++----- internal/handler/download.go | 17 +- internal/handler/handler.go | 60 +++--- internal/handler/handler_test.go | 12 +- internal/handler/head.go | 4 +- internal/handler/utils.go | 9 + internal/logs/logs.go | 6 +- internal/templates/index.gotmpl | 42 ++++- 13 files changed, 394 insertions(+), 115 deletions(-) diff --git a/cmd/http-gw/app.go b/cmd/http-gw/app.go index 84379d4..8123a24 100644 --- a/cmd/http-gw/app.go +++ b/cmd/http-gw/app.go @@ -40,6 +40,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/panjf2000/ants/v2" "github.com/spf13/viper" "github.com/valyala/fasthttp" "go.opentelemetry.io/otel/trace" @@ -96,6 +97,7 @@ type ( clientCut bool returnIndexPage bool indexPageTemplate string + workerPoolSize int bufferMaxSizeForPut uint64 namespaceHeader string defaultNamespaces []string @@ -184,6 +186,7 @@ func (a *app) initAppSettings() { a.settings = &appSettings{ reconnectInterval: fetchReconnectInterval(a.cfg), dialerSource: getDialerSource(a.log, a.cfg), + workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize), } a.settings.update(a.cfg, a.log) } @@ -490,7 +493,13 @@ func (a *app) setHealthStatus() { } func (a *app) Serve() { - handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool))) + workerPool := a.initWorkerPool() + defer func() { + workerPool.Release() + close(a.webDone) + }() + + handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool) // Configure router. a.configureRouter(handler) @@ -532,8 +541,14 @@ LOOP: a.metrics.Shutdown() a.stopServices() a.shutdownTracing() +} - close(a.webDone) +func (a *app) initWorkerPool() *ants.Pool { + workerPool, err := ants.NewPool(a.settings.workerPoolSize) + if err != nil { + a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err)) + } + return workerPool } func (a *app) shutdownTracing() { @@ -609,6 +624,7 @@ func (a *app) stopServices() { svc.ShutDown(ctx) } } + func (a *app) configureRouter(handler *handler.Handler) { r := router.New() r.RedirectTrailingSlash = true diff --git a/cmd/http-gw/settings.go b/cmd/http-gw/settings.go index f464fbc..bc36077 100644 --- a/cmd/http-gw/settings.go +++ b/cmd/http-gw/settings.go @@ -70,6 +70,7 @@ const ( cfgIndexPageEnabled = "index_page.enabled" cfgIndexPageTemplatePath = "index_page.template_path" + cfgWorkerPoolSize = "worker_pool_size" // Web. cfgWebReadBufferSize = "web.read_buffer_size" @@ -227,9 +228,6 @@ func settings() *viper.Viper { // pool: v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold) - v.SetDefault(cfgIndexPageEnabled, false) - v.SetDefault(cfgIndexPageTemplatePath, "") - // frostfs: v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut) @@ -241,6 +239,7 @@ func settings() *viper.Viper { v.SetDefault(cfgWebStreamRequestBody, true) v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize) + v.SetDefault(cfgWorkerPoolSize, 1000) // upload header v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false) diff --git a/docs/gate-configuration.md b/docs/gate-configuration.md index be4b30b..b30c22e 100644 --- a/docs/gate-configuration.md +++ b/docs/gate-configuration.md @@ -75,18 +75,21 @@ request_timeout: 5s rebalance_timer: 30s pool_error_threshold: 100 reconnect_interval: 1m +worker_pool_size: 1000 + ``` -| Parameter | Type | SIGHUP reload | Default value | Description | -|------------------------|------------|---------------|---------------|-------------------------------------------------------------------------------------------------| -| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. | -| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. | -| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. | -| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. | -| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. | -| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. | -| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. | -| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. | +| Parameter | Type | SIGHUP reload | Default value | Description | +|------------------------|------------|---------------|---------------|------------------------------------------------------------------------------------| +| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. | +| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. | +| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. | +| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. | +| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. | +| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. | +| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. | +| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. | +| `worker_pool_size` | `int` | no | `1000` | Maximum worker count in handler's worker pool. | # `wallet` section @@ -352,7 +355,12 @@ resolve_bucket: # `index_page` section -Parameters for index HTML-page output with S3-bucket or S3-subdir content for `Get object` request +Parameters for index HTML-page output. Activates if `GetObject` request returns `not found`. Two +index page modes available: + +* `s3` mode uses tree service for listing objects, +* `native` sends requests to nodes via native protocol. + If request pass S3-bucket name instead of CID, `s3` mode will be used, otherwise `native`. ```yaml index_page: diff --git a/go.mod b/go.mod index efb79eb..add46be 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/docker/go-units v0.4.0 github.com/fasthttp/router v1.4.1 github.com/nspcc-dev/neo-go v0.106.2 + github.com/panjf2000/ants/v2 v2.5.0 github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_model v0.5.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index ce02ee6..922e4a1 100644 --- a/go.sum +++ b/go.sum @@ -682,6 +682,8 @@ github.com/opencontainers/selinux v1.6.0/go.mod h1:VVGKuOLlE7v4PJyT6h7mNWvq1rzqi github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3ogry1nUQF8Evvo= github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8= github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= +github.com/panjf2000/ants/v2 v2.5.0 h1:1rWGWSnxCsQBga+nQbA4/iY6VMeNoOIAM0ZWh9u3q2Q= +github.com/panjf2000/ants/v2 v2.5.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU= diff --git a/internal/handler/browse.go b/internal/handler/browse.go index c89d8c5..b7aacfa 100644 --- a/internal/handler/browse.go +++ b/internal/handler/browse.go @@ -1,15 +1,21 @@ package handler import ( + "context" "html/template" "net/url" "sort" "strconv" "strings" + "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/docker/go-units" "github.com/valyala/fasthttp" "go.uber.org/zap" @@ -25,19 +31,68 @@ const ( type ( BrowsePageData struct { - BucketName, - Prefix string - Objects []ResponseObject + HasErrors bool + Container string + Prefix string + Protocol string + Objects []ResponseObject } ResponseObject struct { OID string Created string FileName string + FilePath string Size string IsDir bool + GetURL string } ) +func newListObjectsResponseS3(attrs map[string]string) ResponseObject { + return ResponseObject{ + Created: formatTimestamp(attrs[attrCreated]), + OID: attrs[attrOID], + FileName: attrs[attrFileName], + Size: attrs[attrSize], + IsDir: attrs[attrOID] == "", + } +} + +func newListObjectsResponseNative(attrs map[string]string) ResponseObject { + filename := lastPathElement(attrs[object.AttributeFilePath]) + if filename == "" { + filename = attrs[attrFileName] + } + return ResponseObject{ + OID: attrs[attrOID], + Created: formatTimestamp(attrs[object.AttributeTimestamp] + "000"), + FileName: filename, + FilePath: attrs[object.AttributeFilePath], + Size: attrs[attrSize], + IsDir: false, + } +} + +func getNextDir(filepath, prefix string) string { + restPath := strings.Replace(filepath, prefix, "", 1) + index := strings.Index(restPath, "/") + if index == -1 { + return "" + } + return restPath[:index] +} + +func lastPathElement(path string) string { + if path == "" { + return path + } + index := strings.LastIndex(path, "/") + if index == len(path)-1 { + index = strings.LastIndex(path[:index], "/") + } + return path[index+1:] +} + func parseTimestamp(tstamp string) (time.Time, error) { millis, err := strconv.ParseInt(tstamp, 10, 64) if err != nil { @@ -47,16 +102,6 @@ func parseTimestamp(tstamp string) (time.Time, error) { return time.UnixMilli(millis), nil } -func NewResponseObject(nodes map[string]string) ResponseObject { - return ResponseObject{ - OID: nodes[attrOID], - Created: nodes[attrCreated], - FileName: nodes[attrFileName], - Size: nodes[attrSize], - IsDir: nodes[attrOID] == "", - } -} - func formatTimestamp(strdate string) string { date, err := parseTimestamp(strdate) if err != nil || date.IsZero() { @@ -94,12 +139,9 @@ func trimPrefix(encPrefix string) string { return prefix[:slashIndex] } -func urlencode(prefix, filename string) string { +func urlencode(path string) string { var res strings.Builder - path := filename - if prefix != "" { - path = strings.Join([]string{prefix, filename}, "/") - } + prefixParts := strings.Split(path, "/") for _, prefixPart := range prefixParts { prefixPart = "/" + url.PathEscape(prefixPart) @@ -112,46 +154,221 @@ func urlencode(prefix, filename string) string { return res.String() } -func (h *Handler) browseObjects(c *fasthttp.RequestCtx, bucketInfo *data.BucketInfo, prefix string) { +type GetObjectsResponse struct { + objects []ResponseObject + hasErrors bool +} + +func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) { + nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true) + if err != nil { + return nil, err + } + + var result = &GetObjectsResponse{ + objects: make([]ResponseObject, 0, len(nodes)), + hasErrors: false, + } + for _, node := range nodes { + meta := node.GetMeta() + if meta == nil { + continue + } + var attrs = make(map[string]string, len(meta)) + for _, m := range meta { + attrs[m.GetKey()] = string(m.GetValue()) + } + obj := newListObjectsResponseS3(attrs) + obj.FilePath = prefix + obj.FileName + obj.GetURL = "/get/" + bucketInfo.Name + urlencode(obj.FilePath) + result.objects = append(result.objects, obj) + } + + return result, nil +} + +func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) { + var basePath string + if ind := strings.LastIndex(prefix, "/"); ind != -1 { + basePath = prefix[:ind+1] + } + + filters := object.NewSearchFilters() + filters.AddRootFilter() + if prefix != "" { + filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix) + } + + prm := PrmObjectSearch{ + PrmAuth: PrmAuth{ + BearerToken: bearerToken(ctx), + }, + Container: bucketInfo.CID, + Filters: filters, + } + objectIDs, err := h.frostfs.SearchObjects(ctx, prm) + if err != nil { + return nil, err + } + defer objectIDs.Close() + + resp, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath) + if err != nil { + return nil, err + } + + log := utils.GetReqLogOrDefault(ctx, h.log) + dirs := make(map[string]struct{}) + result := &GetObjectsResponse{ + objects: make([]ResponseObject, 0, 100), + } + for objExt := range resp { + if objExt.Error != nil { + log.Error(logs.FailedToHeadObject, zap.Error(objExt.Error)) + result.hasErrors = true + continue + } + if objExt.Object.IsDir { + if _, ok := dirs[objExt.Object.FileName]; ok { + continue + } + objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + urlencode(objExt.Object.FilePath) + dirs[objExt.Object.FileName] = struct{}{} + } else { + objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + "/" + objExt.Object.OID + } + result.objects = append(result.objects, objExt.Object) + } + return result, nil +} + +type ResponseObjectExtended struct { + Object ResponseObject + Error error +} + +func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (<-chan ResponseObjectExtended, error) { + res := make(chan ResponseObjectExtended) + + go func() { + defer close(res) + log := utils.GetReqLogOrDefault(ctx, h.log).With( + zap.String("cid", cnrID.EncodeToString()), + zap.String("path", basePath), + ) + var wg sync.WaitGroup + err := objectIDs.Iterate(func(id oid.ID) bool { + wg.Add(1) + err := h.workerPool.Submit(func() { + defer wg.Done() + var obj ResponseObjectExtended + obj.Object, obj.Error = h.headDirObject(ctx, cnrID, id, basePath) + res <- obj + }) + if err != nil { + wg.Done() + log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err)) + } + select { + case <-ctx.Done(): + return true + default: + return false + } + }) + if err != nil { + log.Error(logs.FailedToIterateOverResponse, zap.Error(err)) + } + wg.Wait() + }() + + return res, nil +} + +func (h *Handler) headDirObject(ctx context.Context, cnrID cid.ID, objID oid.ID, basePath string) (ResponseObject, error) { + addr := newAddress(cnrID, objID) + obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{ + PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)}, + Address: addr, + }) + if err != nil { + return ResponseObject{}, err + } + + attrs := loadAttributes(obj.Attributes()) + attrs[attrOID] = objID.EncodeToString() + if multipartSize, ok := attrs[attributeMultipartObjectSize]; ok { + attrs[attrSize] = multipartSize + } else { + attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10) + } + + dirname := getNextDir(attrs[object.AttributeFilePath], basePath) + if dirname == "" { + return newListObjectsResponseNative(attrs), nil + } + + return ResponseObject{ + FileName: dirname, + FilePath: basePath + dirname, + IsDir: true, + }, nil +} + +type browseParams struct { + bucketInfo *data.BucketInfo + prefix string + isNative bool + listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error) +} + +func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) { + const S3Protocol = "s3" + const FrostfsProtocol = "frostfs" + ctx := utils.GetContextFromRequest(c) reqLog := utils.GetReqLogOrDefault(ctx, h.log) - log := reqLog.With(zap.String("bucket", bucketInfo.Name)) - - nodes, err := h.listObjects(ctx, bucketInfo, prefix) + log := reqLog.With( + zap.String("bucket", p.bucketInfo.Name), + zap.String("container", p.bucketInfo.CID.EncodeToString()), + zap.String("prefix", p.prefix), + ) + resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix) if err != nil { logAndSendBucketError(c, log, err) return } - respObjects := make([]ResponseObject, len(nodes)) - - for i, node := range nodes { - respObjects[i] = NewResponseObject(node) - } - - sort.Slice(respObjects, func(i, j int) bool { - if respObjects[i].IsDir == respObjects[j].IsDir { - return respObjects[i].FileName < respObjects[j].FileName + objects := resp.objects + sort.Slice(objects, func(i, j int) bool { + if objects[i].IsDir == objects[j].IsDir { + return objects[i].FileName < objects[j].FileName } - return respObjects[i].IsDir + return objects[i].IsDir }) - indexTemplate := h.config.IndexPageTemplate() tmpl, err := template.New("index").Funcs(template.FuncMap{ - "formatTimestamp": formatTimestamp, - "formatSize": formatSize, - "trimPrefix": trimPrefix, - "urlencode": urlencode, - "parentDir": parentDir, - }).Parse(indexTemplate) + "formatSize": formatSize, + "trimPrefix": trimPrefix, + "urlencode": urlencode, + "parentDir": parentDir, + }).Parse(h.config.IndexPageTemplate()) if err != nil { logAndSendBucketError(c, log, err) return } + bucketName := p.bucketInfo.Name + protocol := S3Protocol + if p.isNative { + bucketName = p.bucketInfo.CID.EncodeToString() + protocol = FrostfsProtocol + } if err = tmpl.Execute(c, &BrowsePageData{ - BucketName: bucketInfo.Name, - Prefix: prefix, - Objects: respObjects, + Container: bucketName, + Prefix: p.prefix, + Objects: objects, + Protocol: protocol, + HasErrors: resp.hasErrors, }); err != nil { logAndSendBucketError(c, log, err) return diff --git a/internal/handler/download.go b/internal/handler/download.go index 19380d4..3e04541 100644 --- a/internal/handler/download.go +++ b/internal/handler/download.go @@ -23,13 +23,12 @@ import ( // DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format. func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) { - test, _ := c.UserValue("oid").(string) - var id oid.ID - err := id.DecodeString(test) - if err != nil { - h.byObjectName(c, h.receiveFile) + cnrIDStr, _ := c.UserValue("cid").(string) + var cnrID cid.ID + if err := cnrID.DecodeString(cnrIDStr); err != nil { + h.byS3Path(c, h.receiveFile) } else { - h.byAddress(c, h.receiveFile) + h.byNativeAddress(c, h.receiveFile) } } @@ -45,7 +44,7 @@ func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) { h.byAttribute(c, h.receiveFile) } -func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) { +func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) { filters := object.NewSearchFilters() filters.AddRootFilter() filters.AddFilter(key, val, op) @@ -54,7 +53,7 @@ func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op PrmAuth: PrmAuth{ BearerToken: bearerToken(ctx), }, - Container: *cnrID, + Container: cnrID, Filters: filters, } @@ -102,7 +101,7 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) { return } - resSearch, err := h.search(ctx, &bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) + resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) if err != nil { log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 62d0897..3247e47 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -22,6 +22,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "github.com/panjf2000/ants/v2" "github.com/valyala/fasthttp" "go.uber.org/zap" ) @@ -165,6 +166,7 @@ type Handler struct { containerResolver ContainerResolver tree *tree.Tree cache *cache.BucketCache + workerPool *ants.Pool } type AppParams struct { @@ -175,7 +177,7 @@ type AppParams struct { Cache *cache.BucketCache } -func New(params *AppParams, config Config, tree *tree.Tree) *Handler { +func New(params *AppParams, config Config, tree *tree.Tree, workerPool *ants.Pool) *Handler { return &Handler{ log: params.Logger, frostfs: params.FrostFS, @@ -184,14 +186,15 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler { containerResolver: params.Resolver, tree: tree, cache: params.Cache, + workerPool: workerPool, } } -// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that +// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // prepares request and object address to it. -func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { +func (h *Handler) byNativeAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { idCnr, _ := c.UserValue("cid").(string) - idObj, _ := c.UserValue("oid").(string) + idObj, _ := url.PathUnescape(c.UserValue("oid").(string)) ctx := utils.GetContextFromRequest(c) reqLog := utils.GetReqLogOrDefault(ctx, h.log) @@ -205,6 +208,16 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ objID := new(oid.ID) if err = objID.DecodeString(idObj); err != nil { + if h.config.IndexPageEnabled() { + c.SetStatusCode(fasthttp.StatusNotFound) + h.browseObjects(c, browseParams{ + bucketInfo: bktInfo, + prefix: idObj, + listObjects: h.getDirObjectsNative, + isNative: true, + }) + return + } log.Error(logs.WrongObjectID, zap.Error(err)) response.Error(c, "wrong object id", fasthttp.StatusBadRequest) return @@ -215,9 +228,9 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ f(ctx, *h.newRequest(c, log), addr) } -// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that -// prepares request and object address to it. -func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { +// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that +// resolves object address from S3-like path /. +func (h *Handler) byS3Path(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { bucketname := c.UserValue("cid").(string) key := c.UserValue("oid").(string) download := c.QueryArgs().GetBool("download") @@ -243,7 +256,12 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r if isDir(unescapedKey) || isContainerRoot(unescapedKey) { if code := checkErrorType(err); code == fasthttp.StatusNotFound || code == fasthttp.StatusOK { c.SetStatusCode(code) - h.browseObjects(c, bktInfo, unescapedKey) + h.browseObjects(c, browseParams{ + bucketInfo: bktInfo, + prefix: unescapedKey, + listObjects: h.getDirObjectsS3, + isNative: false, + }) return } } @@ -267,7 +285,7 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r f(ctx, *h.newRequest(c, log), addr) } -// byAttribute is a wrapper similar to byAddress. +// byAttribute is a wrapper similar to byNativeAddress. func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { scid, _ := c.UserValue("cid").(string) key, _ := c.UserValue("attr_key").(string) @@ -298,7 +316,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re return } - res, err := h.search(ctx, &bktInfo.CID, key, val, object.MatchStringEqual) + res, err := h.search(ctx, bktInfo.CID, key, val, object.MatchStringEqual) if err != nil { log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) @@ -394,25 +412,3 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket return bktInfo, err } - -func (h *Handler) listObjects(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]map[string]string, error) { - nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true) - if err != nil { - return nil, err - } - - var objects = make([]map[string]string, 0, len(nodes)) - for _, node := range nodes { - meta := node.GetMeta() - if meta == nil { - continue - } - var obj = make(map[string]string, len(meta)) - for _, m := range meta { - obj[m.GetKey()] = string(m.GetValue()) - } - objects = append(objects, obj) - } - - return objects, nil -} diff --git a/internal/handler/handler_test.go b/internal/handler/handler_test.go index 4fe9153..34668a5 100644 --- a/internal/handler/handler_test.go +++ b/internal/handler/handler_test.go @@ -26,6 +26,7 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" "github.com/valyala/fasthttp" "go.uber.org/zap" @@ -57,10 +58,11 @@ func (c *configMock) IndexPageEnabled() bool { return false } -func (c *configMock) IndexPageTemplatePath() string { +func (c *configMock) IndexPageTemplate() string { return "" } -func (c *configMock) IndexPageTemplate() string { + +func (c *configMock) IndexPageNativeTemplate() string { return "" } @@ -126,7 +128,11 @@ func prepareHandlerContext() (*handlerContext, error) { treeMock := &treeClientMock{} cfgMock := &configMock{} - handler := New(params, cfgMock, tree.NewTree(treeMock)) + workerPool, err := ants.NewPool(1000) + if err != nil { + return nil, err + } + handler := New(params, cfgMock, tree.NewTree(treeMock), workerPool) return &handlerContext{ key: key, diff --git a/internal/handler/head.go b/internal/handler/head.go index f0a1e94..ccd6a91 100644 --- a/internal/handler/head.go +++ b/internal/handler/head.go @@ -107,9 +107,9 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) { err := id.DecodeString(test) if err != nil { - h.byObjectName(c, h.headObject) + h.byS3Path(c, h.headObject) } else { - h.byAddress(c, h.headObject) + h.byNativeAddress(c, h.headObject) } } diff --git a/internal/handler/utils.go b/internal/handler/utils.go index a944b67..42665a9 100644 --- a/internal/handler/utils.go +++ b/internal/handler/utils.go @@ -13,6 +13,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/valyala/fasthttp" "go.uber.org/zap" @@ -61,6 +62,14 @@ func checkErrorType(err error) int { } } +func loadAttributes(attrs []object.Attribute) map[string]string { + result := make(map[string]string) + for _, attr := range attrs { + result[attr.Key()] = attr.Value() + } + return result +} + func isValidToken(s string) bool { for _, c := range s { if c <= ' ' || c > 127 { diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 409f87d..4dfa21f 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -31,7 +31,8 @@ const ( CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go - FailedToReadIndexPageTemplate = "failed to read index page template, set default" // Warn in ../../app.go + FailedToCreateWorkerPool = "failed to create worker pool" // Fatal in ../../app.go + FailedToReadIndexPageTemplate = "failed to read index page template" // Error in ../../app.go SetCustomIndexPageTemplate = "set custom index page template" // Info in ../../app.go ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go @@ -71,6 +72,9 @@ const ( AddedStoragePeer = "added storage peer" // Info in ../../settings.go CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go + FailedToSumbitTaskToPool = "failed to submit task to pool" // Error in ../handler/browse.go + FailedToHeadObject = "failed to head object" // Error in ../handler/browse.go + FailedToIterateOverResponse = "failed to iterate over search response" // Error in ../handler/browse.go InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go diff --git a/internal/templates/index.gotmpl b/internal/templates/index.gotmpl index ea66a62..85250b1 100644 --- a/internal/templates/index.gotmpl +++ b/internal/templates/index.gotmpl @@ -1,11 +1,20 @@ -{{$bucketName := .BucketName}} +{{$container := .Container}} {{ $prefix := trimPrefix .Prefix }} - Index of s3://{{$bucketName}}/{{if $prefix}}/{{$prefix}}/{{end}} + Index of {{.Protocol}}://{{$container}} + /{{if $prefix}}/{{$prefix}}/{{end}} -

Index of s3://{{$bucketName}}/{{if $prefix}}{{$prefix}}/{{end}}

+

Index of {{.Protocol}}://{{$container}}/{{if $prefix}}{{$prefix}}/{{end}}

+{{ if .HasErrors }} +
+ Errors occured when attempt to browse folder. Some objects may missing +
+{{ end }} + @@ -42,20 +61,22 @@ {{if $trimmedPrefix }} + {{else}} + {{end}} {{range .Objects}} @@ -63,21 +84,22 @@ + - +
FilenameOID Size Created Download
- ⮐.. + ⮐..
- ⮐.. + ⮐..
{{if .IsDir}} 🗀 - + {{.FileName}}/ {{else}} 🗎 - + {{.FileName}} {{end}} {{.OID}} {{if not .IsDir}}{{ formatSize .Size }}{{end}}{{if not .IsDir}}{{ formatTimestamp .Created }}{{end}}{{ .Created }} - {{ if not .IsDir }} - + {{ if .OID }} + Link {{ end }}