From 43764772aa6f90fbeaf99cd6bccdebc157b5330a Mon Sep 17 00:00:00 2001 From: Nikita Zinkevich Date: Thu, 10 Oct 2024 11:59:53 +0300 Subject: [PATCH] [#151] index page: Add browse via native protocol Signed-off-by: Nikita Zinkevich --- cmd/http-gw/app.go | 20 ++- cmd/http-gw/settings.go | 6 +- config/config.env | 9 + config/config.yaml | 3 + docs/gate-configuration.md | 30 ++-- go.mod | 1 + go.sum | 2 + internal/handler/browse.go | 300 ++++++++++++++++++++++++++----- internal/handler/download.go | 23 +-- internal/handler/handler.go | 87 +++++---- internal/handler/handler_test.go | 12 +- internal/handler/head.go | 4 +- internal/handler/utils.go | 21 +-- internal/logs/logs.go | 6 +- internal/templates/index.gotmpl | 42 +++-- tree/tree.go | 104 ++++++++++- 16 files changed, 537 insertions(+), 133 deletions(-) diff --git a/cmd/http-gw/app.go b/cmd/http-gw/app.go index 853977c..0dd53a6 100644 --- a/cmd/http-gw/app.go +++ b/cmd/http-gw/app.go @@ -40,6 +40,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/panjf2000/ants/v2" "github.com/spf13/viper" "github.com/valyala/fasthttp" "go.opentelemetry.io/otel/trace" @@ -89,6 +90,7 @@ type ( appSettings struct { reconnectInterval time.Duration dialerSource *internalnet.DialerSource + workerPoolSize int mu sync.RWMutex defaultTimestamp bool @@ -184,6 +186,7 @@ func (a *app) initAppSettings() { a.settings = &appSettings{ reconnectInterval: fetchReconnectInterval(a.cfg), dialerSource: getDialerSource(a.log, a.cfg), + workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize), } a.settings.update(a.cfg, a.log) } @@ -490,7 +493,13 @@ func (a *app) setHealthStatus() { } func (a *app) Serve() { - handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool))) + workerPool := a.initWorkerPool() + defer func() { + workerPool.Release() + close(a.webDone) + }() + + handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool) // Configure router. a.configureRouter(handler) @@ -532,8 +541,14 @@ LOOP: a.metrics.Shutdown() a.stopServices() a.shutdownTracing() +} - close(a.webDone) +func (a *app) initWorkerPool() *ants.Pool { + workerPool, err := ants.NewPool(a.settings.workerPoolSize) + if err != nil { + a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err)) + } + return workerPool } func (a *app) shutdownTracing() { @@ -609,6 +624,7 @@ func (a *app) stopServices() { svc.ShutDown(ctx) } } + func (a *app) configureRouter(handler *handler.Handler) { r := router.New() r.RedirectTrailingSlash = true diff --git a/cmd/http-gw/settings.go b/cmd/http-gw/settings.go index 4f1712b..316c500 100644 --- a/cmd/http-gw/settings.go +++ b/cmd/http-gw/settings.go @@ -71,6 +71,8 @@ const ( cfgIndexPageEnabled = "index_page.enabled" cfgIndexPageTemplatePath = "index_page.template_path" + cfgWorkerPoolSize = "worker_pool_size" + // Web. cfgWebReadBufferSize = "web.read_buffer_size" cfgWebWriteBufferSize = "web.write_buffer_size" @@ -228,9 +230,6 @@ func settings() *viper.Viper { // pool: v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold) - v.SetDefault(cfgIndexPageEnabled, false) - v.SetDefault(cfgIndexPageTemplatePath, "") - // frostfs: v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut) @@ -242,6 +241,7 @@ func settings() *viper.Viper { v.SetDefault(cfgWebStreamRequestBody, true) v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize) + v.SetDefault(cfgWorkerPoolSize, 1000) // upload header v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false) diff --git a/config/config.env b/config/config.env index db54c92..fd51392 100644 --- a/config/config.env +++ b/config/config.env @@ -150,3 +150,12 @@ HTTP_GW_MULTINET_FALLBACK_DELAY=300ms # List of subnets and IP addresses to use as source for those subnets HTTP_GW_MULTINET_SUBNETS_1_MASK=1.2.3.4/24 HTTP_GW_MULTINET_SUBNETS_1_SOURCE_IPS=1.2.3.4 1.2.3.5 + +# Number of workers in handler's worker pool +HTTP_GW_WORKER_POOL_SIZE=1000 + +# Index page +# Enable index page support +HTTP_GW_INDEX_PAGE_ENABLED=false +# Index page template path +HTTP_GW_INDEX_PAGE_TEMPLATE_PATH=internal/handler/templates/index.gotmpl \ No newline at end of file diff --git a/config/config.yaml b/config/config.yaml index 6c89e78..ef5c529 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -113,6 +113,9 @@ request_timeout: 5s # Timeout to check node health during rebalance. rebalance_timer: 30s # Interval to check nodes health. pool_error_threshold: 100 # The number of errors on connection after which node is considered as unhealthy. +# Number of workers in handler's worker pool +worker_pool_size: 1000 + # Enable index page to see objects list for specified container and prefix index_page: enabled: false diff --git a/docs/gate-configuration.md b/docs/gate-configuration.md index 5b5b018..c6cb617 100644 --- a/docs/gate-configuration.md +++ b/docs/gate-configuration.md @@ -75,18 +75,21 @@ request_timeout: 5s rebalance_timer: 30s pool_error_threshold: 100 reconnect_interval: 1m +worker_pool_size: 1000 + ``` -| Parameter | Type | SIGHUP reload | Default value | Description | -|------------------------|------------|---------------|---------------|-------------------------------------------------------------------------------------------------| -| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. | -| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. | -| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. | -| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. | -| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. | -| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. | -| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. | -| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. | +| Parameter | Type | SIGHUP reload | Default value | Description | +|------------------------|------------|---------------|---------------|------------------------------------------------------------------------------------| +| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. | +| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. | +| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. | +| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. | +| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. | +| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. | +| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. | +| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. | +| `worker_pool_size` | `int` | no | `1000` | Maximum worker count in handler's worker pool. | # `wallet` section @@ -374,7 +377,12 @@ resolve_bucket: # `index_page` section -Parameters for index HTML-page output with S3-bucket or S3-subdir content for `Get object` request +Parameters for index HTML-page output. Activates if `GetObject` request returns `not found`. Two +index page modes available: + +* `s3` mode uses tree service for listing objects, +* `native` sends requests to nodes via native protocol. + If request pass S3-bucket name instead of CID, `s3` mode will be used, otherwise `native`. ```yaml index_page: diff --git a/go.mod b/go.mod index ca8f4fe..a2f41d8 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/docker/go-units v0.4.0 github.com/fasthttp/router v1.4.1 github.com/nspcc-dev/neo-go v0.106.2 + github.com/panjf2000/ants/v2 v2.5.0 github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_model v0.5.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index 197f21c..a7a5be4 100644 --- a/go.sum +++ b/go.sum @@ -682,6 +682,8 @@ github.com/opencontainers/selinux v1.6.0/go.mod h1:VVGKuOLlE7v4PJyT6h7mNWvq1rzqi github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3ogry1nUQF8Evvo= github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8= github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= +github.com/panjf2000/ants/v2 v2.5.0 h1:1rWGWSnxCsQBga+nQbA4/iY6VMeNoOIAM0ZWh9u3q2Q= +github.com/panjf2000/ants/v2 v2.5.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU= diff --git a/internal/handler/browse.go b/internal/handler/browse.go index c89d8c5..b24a569 100644 --- a/internal/handler/browse.go +++ b/internal/handler/browse.go @@ -1,15 +1,21 @@ package handler import ( + "context" "html/template" "net/url" "sort" "strconv" "strings" + "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/docker/go-units" "github.com/valyala/fasthttp" "go.uber.org/zap" @@ -25,19 +31,68 @@ const ( type ( BrowsePageData struct { - BucketName, - Prefix string - Objects []ResponseObject + HasErrors bool + Container string + Prefix string + Protocol string + Objects []ResponseObject } ResponseObject struct { OID string Created string FileName string + FilePath string Size string IsDir bool + GetURL string } ) +func newListObjectsResponseS3(attrs map[string]string) ResponseObject { + return ResponseObject{ + Created: formatTimestamp(attrs[attrCreated]), + OID: attrs[attrOID], + FileName: attrs[attrFileName], + Size: attrs[attrSize], + IsDir: attrs[attrOID] == "", + } +} + +func newListObjectsResponseNative(attrs map[string]string) ResponseObject { + filename := lastPathElement(attrs[object.AttributeFilePath]) + if filename == "" { + filename = attrs[attrFileName] + } + return ResponseObject{ + OID: attrs[attrOID], + Created: formatTimestamp(attrs[object.AttributeTimestamp] + "000"), + FileName: filename, + FilePath: attrs[object.AttributeFilePath], + Size: attrs[attrSize], + IsDir: false, + } +} + +func getNextDir(filepath, prefix string) string { + restPath := strings.Replace(filepath, prefix, "", 1) + index := strings.Index(restPath, "/") + if index == -1 { + return "" + } + return restPath[:index] +} + +func lastPathElement(path string) string { + if path == "" { + return path + } + index := strings.LastIndex(path, "/") + if index == len(path)-1 { + index = strings.LastIndex(path[:index], "/") + } + return path[index+1:] +} + func parseTimestamp(tstamp string) (time.Time, error) { millis, err := strconv.ParseInt(tstamp, 10, 64) if err != nil { @@ -47,16 +102,6 @@ func parseTimestamp(tstamp string) (time.Time, error) { return time.UnixMilli(millis), nil } -func NewResponseObject(nodes map[string]string) ResponseObject { - return ResponseObject{ - OID: nodes[attrOID], - Created: nodes[attrCreated], - FileName: nodes[attrFileName], - Size: nodes[attrSize], - IsDir: nodes[attrOID] == "", - } -} - func formatTimestamp(strdate string) string { date, err := parseTimestamp(strdate) if err != nil || date.IsZero() { @@ -94,12 +139,9 @@ func trimPrefix(encPrefix string) string { return prefix[:slashIndex] } -func urlencode(prefix, filename string) string { +func urlencode(path string) string { var res strings.Builder - path := filename - if prefix != "" { - path = strings.Join([]string{prefix, filename}, "/") - } + prefixParts := strings.Split(path, "/") for _, prefixPart := range prefixParts { prefixPart = "/" + url.PathEscape(prefixPart) @@ -112,46 +154,220 @@ func urlencode(prefix, filename string) string { return res.String() } -func (h *Handler) browseObjects(c *fasthttp.RequestCtx, bucketInfo *data.BucketInfo, prefix string) { +type GetObjectsResponse struct { + objects []ResponseObject + hasErrors bool +} + +func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) { + nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true) + if err != nil { + return nil, err + } + + result := &GetObjectsResponse{ + objects: make([]ResponseObject, 0, len(nodes)), + } + for _, node := range nodes { + meta := node.GetMeta() + if meta == nil { + continue + } + var attrs = make(map[string]string, len(meta)) + for _, m := range meta { + attrs[m.GetKey()] = string(m.GetValue()) + } + obj := newListObjectsResponseS3(attrs) + obj.FilePath = prefix + obj.FileName + obj.GetURL = "/get/" + bucketInfo.Name + urlencode(obj.FilePath) + result.objects = append(result.objects, obj) + } + + return result, nil +} + +func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) { + var basePath string + if ind := strings.LastIndex(prefix, "/"); ind != -1 { + basePath = prefix[:ind+1] + } + + filters := object.NewSearchFilters() + filters.AddRootFilter() + if prefix != "" { + filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix) + } + + prm := PrmObjectSearch{ + PrmAuth: PrmAuth{ + BearerToken: bearerToken(ctx), + }, + Container: bucketInfo.CID, + Filters: filters, + } + objectIDs, err := h.frostfs.SearchObjects(ctx, prm) + if err != nil { + return nil, err + } + defer objectIDs.Close() + + resp, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath) + if err != nil { + return nil, err + } + + log := utils.GetReqLogOrDefault(ctx, h.log) + dirs := make(map[string]struct{}) + result := &GetObjectsResponse{ + objects: make([]ResponseObject, 0, 100), + } + for objExt := range resp { + if objExt.Error != nil { + log.Error(logs.FailedToHeadObject, zap.Error(objExt.Error)) + result.hasErrors = true + continue + } + if objExt.Object.IsDir { + if _, ok := dirs[objExt.Object.FileName]; ok { + continue + } + objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + urlencode(objExt.Object.FilePath) + dirs[objExt.Object.FileName] = struct{}{} + } else { + objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + "/" + objExt.Object.OID + } + result.objects = append(result.objects, objExt.Object) + } + return result, nil +} + +type ResponseObjectExtended struct { + Object ResponseObject + Error error +} + +func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (<-chan ResponseObjectExtended, error) { + res := make(chan ResponseObjectExtended) + + go func() { + defer close(res) + log := utils.GetReqLogOrDefault(ctx, h.log).With( + zap.String("cid", cnrID.EncodeToString()), + zap.String("path", basePath), + ) + var wg sync.WaitGroup + err := objectIDs.Iterate(func(id oid.ID) bool { + wg.Add(1) + err := h.workerPool.Submit(func() { + defer wg.Done() + var obj ResponseObjectExtended + obj.Object, obj.Error = h.headDirObject(ctx, cnrID, id, basePath) + res <- obj + }) + if err != nil { + wg.Done() + log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err)) + } + select { + case <-ctx.Done(): + return true + default: + return false + } + }) + if err != nil { + log.Error(logs.FailedToIterateOverResponse, zap.Error(err)) + } + wg.Wait() + }() + + return res, nil +} + +func (h *Handler) headDirObject(ctx context.Context, cnrID cid.ID, objID oid.ID, basePath string) (ResponseObject, error) { + addr := newAddress(cnrID, objID) + obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{ + PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)}, + Address: addr, + }) + if err != nil { + return ResponseObject{}, err + } + + attrs := loadAttributes(obj.Attributes()) + attrs[attrOID] = objID.EncodeToString() + if multipartSize, ok := attrs[attributeMultipartObjectSize]; ok { + attrs[attrSize] = multipartSize + } else { + attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10) + } + + dirname := getNextDir(attrs[object.AttributeFilePath], basePath) + if dirname == "" { + return newListObjectsResponseNative(attrs), nil + } + + return ResponseObject{ + FileName: dirname, + FilePath: basePath + dirname, + IsDir: true, + }, nil +} + +type browseParams struct { + bucketInfo *data.BucketInfo + prefix string + isNative bool + listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error) +} + +func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) { + const S3Protocol = "s3" + const FrostfsProtocol = "frostfs" + ctx := utils.GetContextFromRequest(c) reqLog := utils.GetReqLogOrDefault(ctx, h.log) - log := reqLog.With(zap.String("bucket", bucketInfo.Name)) - - nodes, err := h.listObjects(ctx, bucketInfo, prefix) + log := reqLog.With( + zap.String("bucket", p.bucketInfo.Name), + zap.String("container", p.bucketInfo.CID.EncodeToString()), + zap.String("prefix", p.prefix), + ) + resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix) if err != nil { logAndSendBucketError(c, log, err) return } - respObjects := make([]ResponseObject, len(nodes)) - - for i, node := range nodes { - respObjects[i] = NewResponseObject(node) - } - - sort.Slice(respObjects, func(i, j int) bool { - if respObjects[i].IsDir == respObjects[j].IsDir { - return respObjects[i].FileName < respObjects[j].FileName + objects := resp.objects + sort.Slice(objects, func(i, j int) bool { + if objects[i].IsDir == objects[j].IsDir { + return objects[i].FileName < objects[j].FileName } - return respObjects[i].IsDir + return objects[i].IsDir }) - indexTemplate := h.config.IndexPageTemplate() tmpl, err := template.New("index").Funcs(template.FuncMap{ - "formatTimestamp": formatTimestamp, - "formatSize": formatSize, - "trimPrefix": trimPrefix, - "urlencode": urlencode, - "parentDir": parentDir, - }).Parse(indexTemplate) + "formatSize": formatSize, + "trimPrefix": trimPrefix, + "urlencode": urlencode, + "parentDir": parentDir, + }).Parse(h.config.IndexPageTemplate()) if err != nil { logAndSendBucketError(c, log, err) return } + bucketName := p.bucketInfo.Name + protocol := S3Protocol + if p.isNative { + bucketName = p.bucketInfo.CID.EncodeToString() + protocol = FrostfsProtocol + } if err = tmpl.Execute(c, &BrowsePageData{ - BucketName: bucketInfo.Name, - Prefix: prefix, - Objects: respObjects, + Container: bucketName, + Prefix: p.prefix, + Objects: objects, + Protocol: protocol, + HasErrors: resp.hasErrors, }); err != nil { logAndSendBucketError(c, log, err) return diff --git a/internal/handler/download.go b/internal/handler/download.go index 19380d4..cd4e55a 100644 --- a/internal/handler/download.go +++ b/internal/handler/download.go @@ -23,13 +23,16 @@ import ( // DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format. func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) { - test, _ := c.UserValue("oid").(string) - var id oid.ID - err := id.DecodeString(test) - if err != nil { - h.byObjectName(c, h.receiveFile) - } else { - h.byAddress(c, h.receiveFile) + oidURLParam := c.UserValue("oid").(string) + downloadQueryParam := c.QueryArgs().GetBool("download") + + switch { + case isObjectID(oidURLParam): + h.byNativeAddress(c, h.receiveFile) + case !isContainerRoot(oidURLParam) && (downloadQueryParam || !isDir(oidURLParam)): + h.byS3Path(c, h.receiveFile) + default: + h.browseIndex(c) } } @@ -45,7 +48,7 @@ func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) { h.byAttribute(c, h.receiveFile) } -func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) { +func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) { filters := object.NewSearchFilters() filters.AddRootFilter() filters.AddFilter(key, val, op) @@ -54,7 +57,7 @@ func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op PrmAuth: PrmAuth{ BearerToken: bearerToken(ctx), }, - Container: *cnrID, + Container: cnrID, Filters: filters, } @@ -102,7 +105,7 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) { return } - resSearch, err := h.search(ctx, &bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) + resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) if err != nil { log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 62d0897..9ed7f99 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -22,6 +22,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "github.com/panjf2000/ants/v2" "github.com/valyala/fasthttp" "go.uber.org/zap" ) @@ -165,6 +166,7 @@ type Handler struct { containerResolver ContainerResolver tree *tree.Tree cache *cache.BucketCache + workerPool *ants.Pool } type AppParams struct { @@ -175,7 +177,7 @@ type AppParams struct { Cache *cache.BucketCache } -func New(params *AppParams, config Config, tree *tree.Tree) *Handler { +func New(params *AppParams, config Config, tree *tree.Tree, workerPool *ants.Pool) *Handler { return &Handler{ log: params.Logger, frostfs: params.FrostFS, @@ -184,14 +186,15 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler { containerResolver: params.Resolver, tree: tree, cache: params.Cache, + workerPool: workerPool, } } -// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that +// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // prepares request and object address to it. -func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { +func (h *Handler) byNativeAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { idCnr, _ := c.UserValue("cid").(string) - idObj, _ := c.UserValue("oid").(string) + idObj, _ := url.PathUnescape(c.UserValue("oid").(string)) ctx := utils.GetContextFromRequest(c) reqLog := utils.GetReqLogOrDefault(ctx, h.log) @@ -215,12 +218,11 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ f(ctx, *h.newRequest(c, log), addr) } -// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that -// prepares request and object address to it. -func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { +// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that +// resolves object address from S3-like path /. +func (h *Handler) byS3Path(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { bucketname := c.UserValue("cid").(string) key := c.UserValue("oid").(string) - download := c.QueryArgs().GetBool("download") ctx := utils.GetContextFromRequest(c) reqLog := utils.GetReqLogOrDefault(ctx, h.log) @@ -239,15 +241,6 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r } foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, unescapedKey) - if h.config.IndexPageEnabled() && !download && string(c.Method()) != fasthttp.MethodHead { - if isDir(unescapedKey) || isContainerRoot(unescapedKey) { - if code := checkErrorType(err); code == fasthttp.StatusNotFound || code == fasthttp.StatusOK { - c.SetStatusCode(code) - h.browseObjects(c, bktInfo, unescapedKey) - return - } - } - } if err != nil { if errors.Is(err, tree.ErrNodeAccessDenied) { response.Error(c, "Access Denied", fasthttp.StatusForbidden) @@ -267,7 +260,7 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r f(ctx, *h.newRequest(c, log), addr) } -// byAttribute is a wrapper similar to byAddress. +// byAttribute is a wrapper similar to byNativeAddress. func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { scid, _ := c.UserValue("cid").(string) key, _ := c.UserValue("attr_key").(string) @@ -298,7 +291,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re return } - res, err := h.search(ctx, &bktInfo.CID, key, val, object.MatchStringEqual) + res, err := h.search(ctx, bktInfo.CID, key, val, object.MatchStringEqual) if err != nil { log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) @@ -395,24 +388,50 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket return bktInfo, err } -func (h *Handler) listObjects(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]map[string]string, error) { - nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true) +func (h *Handler) browseIndex(c *fasthttp.RequestCtx) { + if !h.config.IndexPageEnabled() { + c.SetStatusCode(fasthttp.StatusNotFound) + return + } + + cidURLParam := c.UserValue("cid").(string) + oidURLParam := c.UserValue("oid").(string) + + ctx := utils.GetContextFromRequest(c) + reqLog := utils.GetReqLogOrDefault(ctx, h.log) + log := reqLog.With(zap.String("cid", cidURLParam), zap.String("oid", oidURLParam)) + + unescapedKey, err := url.QueryUnescape(oidURLParam) if err != nil { - return nil, err + logAndSendBucketError(c, log, err) + return } - var objects = make([]map[string]string, 0, len(nodes)) - for _, node := range nodes { - meta := node.GetMeta() - if meta == nil { - continue - } - var obj = make(map[string]string, len(meta)) - for _, m := range meta { - obj[m.GetKey()] = string(m.GetValue()) - } - objects = append(objects, obj) + bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log) + if err != nil { + logAndSendBucketError(c, log, err) + return } - return objects, nil + listFunc := h.getDirObjectsS3 + isNativeList := false + + err = h.tree.CheckSettingsNodeExist(ctx, bktInfo) + if err != nil { + if errors.Is(err, tree.ErrNodeNotFound) { + // tree probe failed, try to use native + listFunc = h.getDirObjectsNative + isNativeList = true + } else { + logAndSendBucketError(c, log, err) + return + } + } + + h.browseObjects(c, browseParams{ + bucketInfo: bktInfo, + prefix: unescapedKey, + listObjects: listFunc, + isNative: isNativeList, + }) } diff --git a/internal/handler/handler_test.go b/internal/handler/handler_test.go index 4fe9153..34668a5 100644 --- a/internal/handler/handler_test.go +++ b/internal/handler/handler_test.go @@ -26,6 +26,7 @@ import ( oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" "github.com/valyala/fasthttp" "go.uber.org/zap" @@ -57,10 +58,11 @@ func (c *configMock) IndexPageEnabled() bool { return false } -func (c *configMock) IndexPageTemplatePath() string { +func (c *configMock) IndexPageTemplate() string { return "" } -func (c *configMock) IndexPageTemplate() string { + +func (c *configMock) IndexPageNativeTemplate() string { return "" } @@ -126,7 +128,11 @@ func prepareHandlerContext() (*handlerContext, error) { treeMock := &treeClientMock{} cfgMock := &configMock{} - handler := New(params, cfgMock, tree.NewTree(treeMock)) + workerPool, err := ants.NewPool(1000) + if err != nil { + return nil, err + } + handler := New(params, cfgMock, tree.NewTree(treeMock), workerPool) return &handlerContext{ key: key, diff --git a/internal/handler/head.go b/internal/handler/head.go index f0a1e94..ccd6a91 100644 --- a/internal/handler/head.go +++ b/internal/handler/head.go @@ -107,9 +107,9 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) { err := id.DecodeString(test) if err != nil { - h.byObjectName(c, h.headObject) + h.byS3Path(c, h.headObject) } else { - h.byAddress(c, h.headObject) + h.byNativeAddress(c, h.headObject) } } diff --git a/internal/handler/utils.go b/internal/handler/utils.go index a944b67..b537d64 100644 --- a/internal/handler/utils.go +++ b/internal/handler/utils.go @@ -2,17 +2,16 @@ package handler import ( "context" - "errors" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/valyala/fasthttp" "go.uber.org/zap" @@ -46,19 +45,21 @@ func isDir(name string) bool { return strings.HasSuffix(name, "/") } +func isObjectID(s string) bool { + var objID oid.ID + return objID.DecodeString(s) == nil +} + func isContainerRoot(key string) bool { return key == "" } -func checkErrorType(err error) int { - switch { - case err == nil: - return fasthttp.StatusOK - case errors.Is(err, tree.ErrNodeAccessDenied): - return fasthttp.StatusForbidden - default: - return fasthttp.StatusNotFound +func loadAttributes(attrs []object.Attribute) map[string]string { + result := make(map[string]string) + for _, attr := range attrs { + result[attr.Key()] = attr.Value() } + return result } func isValidToken(s string) bool { diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 409f87d..4dfa21f 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -31,7 +31,8 @@ const ( CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go - FailedToReadIndexPageTemplate = "failed to read index page template, set default" // Warn in ../../app.go + FailedToCreateWorkerPool = "failed to create worker pool" // Fatal in ../../app.go + FailedToReadIndexPageTemplate = "failed to read index page template" // Error in ../../app.go SetCustomIndexPageTemplate = "set custom index page template" // Info in ../../app.go ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go @@ -71,6 +72,9 @@ const ( AddedStoragePeer = "added storage peer" // Info in ../../settings.go CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go + FailedToSumbitTaskToPool = "failed to submit task to pool" // Error in ../handler/browse.go + FailedToHeadObject = "failed to head object" // Error in ../handler/browse.go + FailedToIterateOverResponse = "failed to iterate over search response" // Error in ../handler/browse.go InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go diff --git a/internal/templates/index.gotmpl b/internal/templates/index.gotmpl index ea66a62..b14cc06 100644 --- a/internal/templates/index.gotmpl +++ b/internal/templates/index.gotmpl @@ -1,11 +1,20 @@ -{{$bucketName := .BucketName}} +{{$container := .Container}} {{ $prefix := trimPrefix .Prefix }} - Index of s3://{{$bucketName}}/{{if $prefix}}/{{$prefix}}/{{end}} + Index of {{.Protocol}}://{{$container}} + /{{if $prefix}}/{{$prefix}}/{{end}} -

Index of s3://{{$bucketName}}/{{if $prefix}}{{$prefix}}/{{end}}

+

Index of {{.Protocol}}://{{$container}}/{{if $prefix}}{{$prefix}}/{{end}}

+{{ if .HasErrors }} +
+ Errors occurred while processing the request. Perhaps some objects are missing +
+{{ end }} + @@ -42,20 +61,22 @@ {{if $trimmedPrefix }} + {{else}} + {{end}} {{range .Objects}} @@ -63,21 +84,22 @@ + - +
FilenameOID Size Created Download
- ⮐.. + ⮐..
- ⮐.. + ⮐..
{{if .IsDir}} 🗀 - + {{.FileName}}/ {{else}} 🗎 - + {{.FileName}} {{end}} {{.OID}} {{if not .IsDir}}{{ formatSize .Size }}{{end}}{{if not .IsDir}}{{ formatTimestamp .Created }}{{end}}{{ .Created }} - {{ if not .IsDir }} - + {{ if .OID }} + Link {{ end }} diff --git a/tree/tree.go b/tree/tree.go index 162f41f..40209a5 100644 --- a/tree/tree.go +++ b/tree/tree.go @@ -30,6 +30,11 @@ type ( Meta map[string]string } + multiSystemNode struct { + // the first element is latest + nodes []*treeNode + } + GetNodesParams struct { CnrID cid.ID BktInfo *data.BucketInfo @@ -50,18 +55,19 @@ var ( ) const ( - FileNameKey = "FileName" -) + FileNameKey = "FileName" + settingsFileName = "bucket-settings" -const ( - oidKV = "OID" + oidKV = "OID" + uploadIDKV = "UploadId" + sizeKV = "Size" // keys for delete marker nodes. isDeleteMarkerKV = "IsDeleteMarker" - sizeKV = "Size" // versionTree -- ID of a tree with object versions. versionTree = "version" + systemTree = "system" separator = "/" ) @@ -135,6 +141,45 @@ func newNodeVersionFromTreeNode(treeNode *treeNode) *api.NodeVersion { return version } +func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) { + var ( + err error + index int + maxTimestamp uint64 + ) + + if len(nodes) == 0 { + return nil, errors.New("multi node must have at least one node") + } + + treeNodes := make([]*treeNode, len(nodes)) + + for i, node := range nodes { + if treeNodes[i], err = newTreeNode(node); err != nil { + return nil, fmt.Errorf("parse system node response: %w", err) + } + + if timestamp := getMaxTimestamp(node); timestamp > maxTimestamp { + index = i + maxTimestamp = timestamp + } + } + + treeNodes[0], treeNodes[index] = treeNodes[index], treeNodes[0] + + return &multiSystemNode{ + nodes: treeNodes, + }, nil +} + +func (m *multiSystemNode) Latest() *treeNode { + return m.nodes[0] +} + +func (m *multiSystemNode) Old() []*treeNode { + return m.nodes[1:] +} + func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*api.NodeVersion, error) { nodes, err := c.GetVersions(ctx, cnrID, objectName) if err != nil { @@ -165,6 +210,55 @@ func (c *Tree) GetVersions(ctx context.Context, cnrID *cid.ID, objectName string return c.service.GetNodes(ctx, p) } +func (c *Tree) CheckSettingsNodeExist(ctx context.Context, bktInfo *data.BucketInfo) error { + _, err := c.getSystemNode(ctx, bktInfo, settingsFileName) + if err != nil { + return err + } + + return nil +} + +func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name string) (*multiSystemNode, error) { + p := &GetNodesParams{ + CnrID: bktInfo.CID, + BktInfo: bktInfo, + TreeID: systemTree, + Path: []string{name}, + LatestOnly: false, + AllAttrs: true, + } + nodes, err := c.service.GetNodes(ctx, p) + if err != nil { + return nil, err + } + + nodes = filterMultipartNodes(nodes) + + if len(nodes) == 0 { + return nil, ErrNodeNotFound + } + + return newMultiNode(nodes) +} + +func filterMultipartNodes(nodes []NodeResponse) []NodeResponse { + res := make([]NodeResponse, 0, len(nodes)) + +LOOP: + for _, node := range nodes { + for _, meta := range node.GetMeta() { + if meta.GetKey() == uploadIDKV { + continue LOOP + } + } + + res = append(res, node) + } + + return res +} + func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) { var ( maxCreationTime uint64