WIP: [#XX] Support virtual-hosted-style access to container #37
6 changed files with 135 additions and 12 deletions
45
app.go
45
app.go
|
@ -15,6 +15,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
|
hostRouter "git.frostfs.info/TrueCloudLab/frostfs-http-gw/router"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
@ -338,8 +339,11 @@ func (a *app) Serve(ctx context.Context) {
|
||||||
uploadRoutes := uploader.New(ctx, a.AppParams(), a.settings.Uploader)
|
uploadRoutes := uploader.New(ctx, a.AppParams(), a.settings.Uploader)
|
||||||
downloadRoutes := downloader.New(ctx, a.AppParams(), a.settings.Downloader)
|
downloadRoutes := downloader.New(ctx, a.AppParams(), a.settings.Downloader)
|
||||||
|
|
||||||
|
domains := a.cfg.GetStringSlice(cfgListenDomains)
|
||||||
|
a.log.Info("fetch domains, prepare to use API", zap.Strings("domains", domains))
|
||||||
|
|
||||||
// Configure router.
|
// Configure router.
|
||||||
a.configureRouter(uploadRoutes, downloadRoutes)
|
a.configureRouter(domains, uploadRoutes, downloadRoutes)
|
||||||
|
|
||||||
a.startServices()
|
a.startServices()
|
||||||
a.initServers(ctx)
|
a.initServers(ctx)
|
||||||
|
@ -436,7 +440,42 @@ func (a *app) stopServices() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) configureRouter(uploadRoutes *uploader.Uploader, downloadRoutes *downloader.Downloader) {
|
func (a *app) configureRouter(domains []string, uploadRoutes *uploader.Uploader, downloadRoutes *downloader.Downloader) {
|
||||||
|
hr := hostRouter.NewHostBucketRouter("cid")
|
||||||
|
hr.Default(a.defaultRouter(uploadRoutes, downloadRoutes))
|
||||||
|
|
||||||
|
bucketRouter := a.bucketRouter(uploadRoutes, downloadRoutes)
|
||||||
|
for _, domain := range domains {
|
||||||
|
hr.Map(domain, bucketRouter)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.webServer.Handler = hr.Handler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) bucketRouter(uploadRoutes *uploader.Uploader, downloadRoutes *downloader.Downloader) *router.Router {
|
||||||
|
r := router.New()
|
||||||
|
r.RedirectTrailingSlash = true
|
||||||
|
r.NotFound = func(r *fasthttp.RequestCtx) {
|
||||||
|
response.Error(r, "Not found", fasthttp.StatusNotFound)
|
||||||
|
}
|
||||||
|
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
|
||||||
|
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||||||
|
}
|
||||||
|
r.POST("/upload", a.logger(uploadRoutes.Upload))
|
||||||
|
a.log.Info("added path /upload")
|
||||||
|
r.GET("/get/{oid}", a.logger(downloadRoutes.DownloadByAddress))
|
||||||
|
r.HEAD("/get/{oid}", a.logger(downloadRoutes.HeadByAddress))
|
||||||
|
a.log.Info("added path /get/{oid}")
|
||||||
|
r.GET("/get_by_attribute/{attr_key}/{attr_val:*}", a.logger(downloadRoutes.DownloadByAttribute))
|
||||||
|
r.HEAD("/get_by_attribute/{attr_key}/{attr_val:*}", a.logger(downloadRoutes.HeadByAttribute))
|
||||||
|
a.log.Info("added path /get_by_attribute/{attr_key}/{attr_val:*}")
|
||||||
|
r.GET("/zip/{prefix:*}", a.logger(downloadRoutes.DownloadZipped))
|
||||||
|
a.log.Info("added path /zip/{prefix}")
|
||||||
|
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) defaultRouter(uploadRoutes *uploader.Uploader, downloadRoutes *downloader.Downloader) *router.Router {
|
||||||
r := router.New()
|
r := router.New()
|
||||||
r.RedirectTrailingSlash = true
|
r.RedirectTrailingSlash = true
|
||||||
r.NotFound = func(r *fasthttp.RequestCtx) {
|
r.NotFound = func(r *fasthttp.RequestCtx) {
|
||||||
|
@ -456,7 +495,7 @@ func (a *app) configureRouter(uploadRoutes *uploader.Uploader, downloadRoutes *d
|
||||||
r.GET("/zip/{cid}/{prefix:*}", a.logger(downloadRoutes.DownloadZipped))
|
r.GET("/zip/{cid}/{prefix:*}", a.logger(downloadRoutes.DownloadZipped))
|
||||||
a.log.Info("added path /zip/{cid}/{prefix}")
|
a.log.Info("added path /zip/{cid}/{prefix}")
|
||||||
|
|
||||||
a.webServer.Handler = r.Handler
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
|
|
|
@ -17,6 +17,9 @@ HTTP_GW_PROMETHEUS_ADDRESS=localhost:8084
|
||||||
# Log level.
|
# Log level.
|
||||||
HTTP_GW_LOGGER_LEVEL=debug
|
HTTP_GW_LOGGER_LEVEL=debug
|
||||||
|
|
||||||
|
# Domains to be able to use virtual-hosted-style access to bucket/container.
|
||||||
|
HTTP_GW_LISTEN_DOMAINS=httpdev.frostfs.devenv
|
||||||
|
|
||||||
HTTP_GW_SERVER_0_ADDRESS=0.0.0.0:443
|
HTTP_GW_SERVER_0_ADDRESS=0.0.0.0:443
|
||||||
HTTP_GW_SERVER_0_TLS_ENABLED=false
|
HTTP_GW_SERVER_0_TLS_ENABLED=false
|
||||||
HTTP_GW_SERVER_0_TLS_CERT_FILE=/path/to/tls/cert
|
HTTP_GW_SERVER_0_TLS_CERT_FILE=/path/to/tls/cert
|
||||||
|
|
|
@ -13,6 +13,10 @@ prometheus:
|
||||||
logger:
|
logger:
|
||||||
level: debug # Log level.
|
level: debug # Log level.
|
||||||
|
|
||||||
|
# Domains to be able to use virtual-hosted-style access to bucket/container.
|
||||||
|
listen_domains:
|
||||||
|
- httpdev.frostfs.devenv
|
||||||
|
|
||||||
server:
|
server:
|
||||||
- address: 0.0.0.0:8080
|
- address: 0.0.0.0:8080
|
||||||
tls:
|
tls:
|
||||||
|
|
|
@ -57,6 +57,10 @@ $ cat http.log
|
||||||
# General section
|
# General section
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
|
listen_domains:
|
||||||
|
- httpdev.frostfs.devenv
|
||||||
|
- httpdev2.frostfs.devenv
|
||||||
|
|
||||||
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
|
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
|
||||||
resolve_order:
|
resolve_order:
|
||||||
- nns
|
- nns
|
||||||
|
@ -69,15 +73,16 @@ rebalance_timer: 30s
|
||||||
pool_error_threshold: 100
|
pool_error_threshold: 100
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|------------------------|------------|---------------|----------------|------------------------------------------------------------------------------------|
|
|------------------------|------------|---------------|---------------|------------------------------------------------------------------------------------|
|
||||||
| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. |
|
| `listen_domains` | `[]string` | | | Domains to be able to use virtual-hosted-style access to bucket/container. |
|
||||||
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. |
|
| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. |
|
||||||
| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. |
|
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. |
|
||||||
| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. |
|
| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. |
|
||||||
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. |
|
||||||
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
||||||
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
||||||
|
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
||||||
|
|
||||||
# `wallet` section
|
# `wallet` section
|
||||||
|
|
||||||
|
|
69
router/router.go
Normal file
69
router/router.go
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
package router
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/fasthttp/router"
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
|
)
|
||||||
|
|
||||||
|
type HostBucketRouter struct {
|
||||||
|
routes map[string]*router.Router
|
||||||
|
bktParam string
|
||||||
|
defaultRouter *router.Router
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHostBucketRouter(bktParam string) HostBucketRouter {
|
||||||
|
return HostBucketRouter{
|
||||||
|
routes: make(map[string]*router.Router),
|
||||||
|
bktParam: bktParam,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hr *HostBucketRouter) Default(router *router.Router) {
|
||||||
|
hr.defaultRouter = router
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hr HostBucketRouter) Map(host string, h *router.Router) {
|
||||||
|
hr.routes[strings.ToLower(host)] = h
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hr HostBucketRouter) Handler(ctx *fasthttp.RequestCtx) {
|
||||||
|
bucket, domain := getBucketDomain(getHost(ctx))
|
||||||
|
_ = bucket
|
||||||
|
domainRouter, ok := hr.routes[strings.ToLower(domain)]
|
||||||
|
if !ok {
|
||||||
|
domainRouter = hr.defaultRouter
|
||||||
|
if domainRouter == nil {
|
||||||
|
ctx.Error(fasthttp.StatusMessage(fasthttp.StatusNotFound), fasthttp.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if bucket != "" {
|
||||||
|
ctx.SetUserValue(hr.bktParam, bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
domainRouter.Handler(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getBucketDomain(host string) (bucket string, domain string) {
|
||||||
|
parts := strings.Split(host, ".")
|
||||||
|
if len(parts) > 1 {
|
||||||
|
return parts[0], strings.Join(parts[1:], ".")
|
||||||
|
}
|
||||||
|
return "", host
|
||||||
|
}
|
||||||
|
|
||||||
|
// getHost tries its best to return the request host.
|
||||||
|
// According to section 14.23 of RFC 2616 the Host header
|
||||||
|
// can include the port number if the default value of 80 is not used.
|
||||||
|
func getHost(r *fasthttp.RequestCtx) string {
|
||||||
|
host := string(r.Host())
|
||||||
|
|
||||||
|
if i := strings.Index(host, ":"); i != -1 {
|
||||||
|
host = host[:i]
|
||||||
|
}
|
||||||
|
|
||||||
|
return host
|
||||||
|
}
|
|
@ -57,6 +57,9 @@ const (
|
||||||
// Logger.
|
// Logger.
|
||||||
cfgLoggerLevel = "logger.level"
|
cfgLoggerLevel = "logger.level"
|
||||||
|
|
||||||
|
// Domains for host based bucket/container access
|
||||||
|
cfgListenDomains = "listen_domains"
|
||||||
|
|
||||||
// Wallet.
|
// Wallet.
|
||||||
cfgWalletPassphrase = "wallet.passphrase"
|
cfgWalletPassphrase = "wallet.passphrase"
|
||||||
cfgWalletPath = "wallet.path"
|
cfgWalletPath = "wallet.path"
|
||||||
|
|
Loading…
Reference in a new issue