[#91] Add support namespaces #96
12 changed files with 236 additions and 37 deletions
|
@ -18,6 +18,7 @@ This document outlines major changes between releases.
|
||||||
- Add bucket/container caching
|
- Add bucket/container caching
|
||||||
- Disable homomorphic hash for PUT if it's disabled in container itself
|
- Disable homomorphic hash for PUT if it's disabled in container itself
|
||||||
- Add new `logger.destination` config param (#89)
|
- Add new `logger.destination` config param (#89)
|
||||||
|
- Add support namespaces (#91)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Update prometheus to v1.15.0 (#35)
|
- Update prometheus to v1.15.0 (#35)
|
||||||
|
|
|
@ -7,13 +7,16 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
|
@ -34,6 +37,7 @@ import (
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
@ -78,6 +82,8 @@ type (
|
||||||
zipCompression bool
|
zipCompression bool
|
||||||
clientCut bool
|
clientCut bool
|
||||||
bufferMaxSizeForPut uint64
|
bufferMaxSizeForPut uint64
|
||||||
|
namespaceHeader string
|
||||||
|
defaultNamespaces []string
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -209,6 +215,7 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) {
|
||||||
resolveCfg := &resolver.Config{
|
resolveCfg := &resolver.Config{
|
||||||
FrostFS: resolver.NewFrostFSResolver(a.pool),
|
FrostFS: resolver.NewFrostFSResolver(a.pool),
|
||||||
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
||||||
|
Settings: a.settings,
|
||||||
}
|
}
|
||||||
|
|
||||||
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
||||||
|
@ -477,6 +484,8 @@ func (a *app) updateSettings() {
|
||||||
a.settings.setZipCompression(a.cfg.GetBool(cfgZipCompression))
|
a.settings.setZipCompression(a.cfg.GetBool(cfgZipCompression))
|
||||||
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
|
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
|
||||||
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
|
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
|
||||||
|
a.settings.setNamespaceHeader(a.cfg.GetString(cfgResolveNamespaceHeader))
|
||||||
|
a.settings.setDefaultNamespaces(a.cfg.GetStringSlice(cfgResolveDefaultNamespaces))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) startServices() {
|
func (a *app) startServices() {
|
||||||
|
@ -510,15 +519,15 @@ func (a *app) configureRouter(handler *handler.Handler) {
|
||||||
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(handler.Upload))))
|
r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.Upload)))))
|
||||||
a.log.Info(logs.AddedPathUploadCid)
|
a.log.Info(logs.AddedPathUploadCid)
|
||||||
r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadByAddressOrBucketName))))
|
r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAddressOrBucketName)))))
|
||||||
r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(handler.HeadByAddressOrBucketName))))
|
r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAddressOrBucketName)))))
|
||||||
a.log.Info(logs.AddedPathGetCidOid)
|
a.log.Info(logs.AddedPathGetCidOid)
|
||||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadByAttribute))))
|
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAttribute)))))
|
||||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(handler.HeadByAttribute))))
|
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAttribute)))))
|
||||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
||||||
r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadZipped))))
|
r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadZipped)))))
|
||||||
a.log.Info(logs.AddedPathZipCidPrefix)
|
a.log.Info(logs.AddedPathZipCidPrefix)
|
||||||
|
|
||||||
a.webServer.Handler = r.Handler
|
a.webServer.Handler = r.Handler
|
||||||
|
@ -562,6 +571,18 @@ func (a *app) tracer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *app) reqNamespace(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
|
return func(req *fasthttp.RequestCtx) {
|
||||||
|
appCtx := utils.GetContextFromRequest(req)
|
||||||
|
|
||||||
|
nsBytes := req.Request.Header.Peek(a.settings.NamespaceHeader())
|
||||||
|
appCtx = middleware.SetNamespace(appCtx, string(nsBytes))
|
||||||
|
|
||||||
|
utils.SetContextToRequest(appCtx, req)
|
||||||
|
h(req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (a *app) AppParams() *utils.AppParams {
|
func (a *app) AppParams() *utils.AppParams {
|
||||||
return &utils.AppParams{
|
return &utils.AppParams{
|
||||||
Logger: a.log,
|
Logger: a.log,
|
||||||
|
@ -669,3 +690,36 @@ func (a *app) setRuntimeParameters() {
|
||||||
zap.Int64("old_value", previous))
|
zap.Int64("old_value", previous))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) NamespaceHeader() string {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.namespaceHeader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setNamespaceHeader(nsHeader string) {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.namespaceHeader = nsHeader
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) {
|
||||||
|
s.mu.RLock()
|
||||||
|
namespaces := s.defaultNamespaces
|
||||||
|
s.mu.RUnlock()
|
||||||
|
if slices.Contains(namespaces, ns) {
|
||||||
|
return v2container.SysAttributeZoneDefault, true
|
||||||
|
}
|
||||||
|
|
||||||
|
return ns + ".ns", false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setDefaultNamespaces(namespaces []string) {
|
||||||
|
for i := range namespaces { // to be set namespaces in env variable as `HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"`
|
||||||
|
namespaces[i] = strings.Trim(namespaces[i], "\"")
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
s.defaultNamespaces = namespaces
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/testcontainers/testcontainers-go"
|
"github.com/testcontainers/testcontainers-go"
|
||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
)
|
)
|
||||||
|
|
||||||
type putResponse struct {
|
type putResponse struct {
|
||||||
|
@ -68,6 +69,7 @@ func TestIntegration(t *testing.T) {
|
||||||
t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, ownerID, CID, version) })
|
t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID, version) })
|
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID, version) })
|
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
|
t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
server.Wait()
|
server.Wait()
|
||||||
|
@ -81,7 +83,7 @@ func runServer() (App, context.CancelFunc) {
|
||||||
cancelCtx, cancel := context.WithCancel(context.Background())
|
cancelCtx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
v := getDefaultConfig()
|
v := getDefaultConfig()
|
||||||
l, lvl := newLogger(v)
|
l, lvl := newStdoutLogger(zapcore.DebugLevel)
|
||||||
application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl))
|
application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl))
|
||||||
go application.Serve()
|
go application.Serve()
|
||||||
|
|
||||||
|
@ -338,6 +340,40 @@ func checkZip(t *testing.T, data []byte, length int64, names, contents []string)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, version string) {
|
||||||
|
content := "content of file"
|
||||||
|
attributes := map[string]string{
|
||||||
|
"some-attr": "some-get-value",
|
||||||
|
}
|
||||||
|
|
||||||
|
id := putObject(ctx, t, clientPool, ownerID, CID, content, attributes)
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set(defaultNamespaceHeader, "")
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
checkGetResponse(t, resp, content, attributes)
|
||||||
|
|
||||||
|
req, err = http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set(defaultNamespaceHeader, "root")
|
||||||
|
|
||||||
|
resp, err = http.DefaultClient.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
checkGetResponse(t, resp, content, attributes)
|
||||||
|
|
||||||
|
req, err = http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set(defaultNamespaceHeader, "root2")
|
||||||
|
|
||||||
|
resp, err = http.DefaultClient.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
|
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
|
||||||
req := testcontainers.ContainerRequest{
|
req := testcontainers.ContainerRequest{
|
||||||
Image: image,
|
Image: image,
|
||||||
|
|
|
@ -49,6 +49,8 @@ const (
|
||||||
|
|
||||||
defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb
|
defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb
|
||||||
|
|
||||||
|
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
||||||
|
|
||||||
cfgServer = "server"
|
cfgServer = "server"
|
||||||
cfgTLSEnabled = "tls.enabled"
|
cfgTLSEnabled = "tls.enabled"
|
||||||
cfgTLSCertFile = "tls.cert_file"
|
cfgTLSCertFile = "tls.cert_file"
|
||||||
|
@ -116,6 +118,10 @@ const (
|
||||||
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
|
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
|
||||||
cfgBucketsCacheSize = "cache.buckets.size"
|
cfgBucketsCacheSize = "cache.buckets.size"
|
||||||
|
|
||||||
|
// Bucket resolving options.
|
||||||
|
cfgResolveNamespaceHeader = "resolve_bucket.namespace_header"
|
||||||
|
cfgResolveDefaultNamespaces = "resolve_bucket.default_namespaces"
|
||||||
|
|
||||||
// Command line args.
|
// Command line args.
|
||||||
cmdHelp = "help"
|
cmdHelp = "help"
|
||||||
cmdVersion = "version"
|
cmdVersion = "version"
|
||||||
|
@ -199,6 +205,10 @@ func settings() *viper.Viper {
|
||||||
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
||||||
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
||||||
|
|
||||||
|
// resolve bucket
|
||||||
|
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
||||||
|
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
|
||||||
|
|
||||||
// Binding flags
|
// Binding flags
|
||||||
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|
|
@ -109,3 +109,8 @@ HTTP_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576
|
||||||
# Cache which contains mapping of bucket name to bucket info
|
# Cache which contains mapping of bucket name to bucket info
|
||||||
HTTP_GW_CACHE_BUCKETS_LIFETIME=1m
|
HTTP_GW_CACHE_BUCKETS_LIFETIME=1m
|
||||||
HTTP_GW_CACHE_BUCKETS_SIZE=1000
|
HTTP_GW_CACHE_BUCKETS_SIZE=1000
|
||||||
|
|
||||||
|
# Header to determine zone to resolve bucket name
|
||||||
|
HTTP_GW_RESOLVE_BUCKET_NAMESPACE_HEADER=X-Frostfs-Namespace
|
||||||
|
# Namespaces that should be handled as default
|
||||||
|
HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"
|
||||||
|
|
|
@ -119,3 +119,7 @@ cache:
|
||||||
buckets:
|
buckets:
|
||||||
lifetime: 1m
|
lifetime: 1m
|
||||||
size: 1000
|
size: 1000
|
||||||
|
|
||||||
|
resolve_bucket:
|
||||||
|
namespace_header: X-Frostfs-Namespace
|
||||||
|
default_namespaces: [ "", "root" ]
|
|
@ -41,7 +41,7 @@ $ cat http.log
|
||||||
# Structure
|
# Structure
|
||||||
|
|
||||||
| Section | Description |
|
| Section | Description |
|
||||||
|-----------------|-------------------------------------------------------|
|
|------------------|----------------------------------------------------------------|
|
||||||
| no section | [General parameters](#general-section) |
|
| no section | [General parameters](#general-section) |
|
||||||
| `wallet` | [Wallet configuration](#wallet-section) |
|
| `wallet` | [Wallet configuration](#wallet-section) |
|
||||||
| `peers` | [Nodes configuration](#peers-section) |
|
| `peers` | [Nodes configuration](#peers-section) |
|
||||||
|
@ -56,6 +56,7 @@ $ cat http.log
|
||||||
| `runtime` | [Runtime configuration](#runtime-section) |
|
| `runtime` | [Runtime configuration](#runtime-section) |
|
||||||
| `frostfs` | [Frostfs configuration](#frostfs-section) |
|
| `frostfs` | [Frostfs configuration](#frostfs-section) |
|
||||||
| `cache` | [Cache configuration](#cache-section) |
|
| `cache` | [Cache configuration](#cache-section) |
|
||||||
|
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
||||||
|
|
||||||
|
|
||||||
# General section
|
# General section
|
||||||
|
@ -315,3 +316,19 @@ size: 1000
|
||||||
|------------|------------|------------------|-------------------------------|
|
|------------|------------|------------------|-------------------------------|
|
||||||
| `lifetime` | `duration` | depends on cache | Lifetime of entries in cache. |
|
| `lifetime` | `duration` | depends on cache | Lifetime of entries in cache. |
|
||||||
| `size` | `int` | depends on cache | LRU cache size. |
|
| `size` | `int` | depends on cache | LRU cache size. |
|
||||||
|
|
||||||
|
|
||||||
|
# `resolve_bucket` section
|
||||||
|
|
||||||
|
Bucket name resolving parameters from and to container ID.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
resolve_bucket:
|
||||||
|
namespace_header: X-Frostfs-Namespace
|
||||||
|
default_namespaces: [ "", "root" ]
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
|----------------------|------------|---------------|-----------------------|--------------------------------------------------------------------------------------------------------------------------|
|
||||||
|
| `namespace_header` | `string` | yes | `X-Frostfs-Namespace` | Header to determine zone to resolve bucket name. |
|
||||||
|
| `default_namespaces` | `[]string` | yes | ["","root"] | Namespaces that should be handled as default. |
|
2
go.mod
2
go.mod
|
@ -21,6 +21,7 @@ require (
|
||||||
go.opentelemetry.io/otel v1.16.0
|
go.opentelemetry.io/otel v1.16.0
|
||||||
go.opentelemetry.io/otel/trace v1.16.0
|
go.opentelemetry.io/otel/trace v1.16.0
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
|
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
|
||||||
google.golang.org/grpc v1.55.0
|
google.golang.org/grpc v1.55.0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -102,7 +103,6 @@ require (
|
||||||
go.uber.org/atomic v1.10.0 // indirect
|
go.uber.org/atomic v1.10.0 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
golang.org/x/crypto v0.9.0 // indirect
|
golang.org/x/crypto v0.9.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
|
|
||||||
golang.org/x/net v0.10.0 // indirect
|
golang.org/x/net v0.10.0 // indirect
|
||||||
golang.org/x/sync v0.2.0 // indirect
|
golang.org/x/sync v0.2.0 // indirect
|
||||||
golang.org/x/sys v0.8.0 // indirect
|
golang.org/x/sys v0.8.0 // indirect
|
||||||
|
|
10
internal/cache/buckets.go
vendored
10
internal/cache/buckets.go
vendored
|
@ -46,8 +46,8 @@ func NewBucketCache(config *Config) *BucketCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns a cached object.
|
// Get returns a cached object.
|
||||||
func (o *BucketCache) Get(key string) *data.BucketInfo {
|
func (o *BucketCache) Get(ns, bktName string) *data.BucketInfo {
|
||||||
entry, err := o.cache.Get(key)
|
entry, err := o.cache.Get(formKey(ns, bktName))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -64,5 +64,9 @@ func (o *BucketCache) Get(key string) *data.BucketInfo {
|
||||||
|
|
||||||
// Put puts an object to cache.
|
// Put puts an object to cache.
|
||||||
func (o *BucketCache) Put(bkt *data.BucketInfo) error {
|
func (o *BucketCache) Put(bkt *data.BucketInfo) error {
|
||||||
return o.cache.Set(bkt.Name, bkt)
|
return o.cache.Set(formKey(bkt.Zone, bkt.Name), bkt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func formKey(ns, name string) string {
|
||||||
|
return name + "." + ns
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
|
@ -31,6 +32,7 @@ type Config interface {
|
||||||
ZipCompression() bool
|
ZipCompression() bool
|
||||||
ClientCut() bool
|
ClientCut() bool
|
||||||
BufferMaxSizeForPut() uint64
|
BufferMaxSizeForPut() uint64
|
||||||
|
NamespaceHeader() string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
|
@ -190,7 +192,12 @@ func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*ci
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
|
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
|
||||||
if bktInfo := h.cache.Get(containerName); bktInfo != nil {
|
ns, err := middleware.GetNamespace(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if bktInfo := h.cache.Get(ns, containerName); bktInfo != nil {
|
||||||
return bktInfo, nil
|
return bktInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
26
internal/handler/middleware/util.go
Normal file
26
internal/handler/middleware/util.go
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
package middleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// keyWrapper is wrapper for context keys.
|
||||||
|
type keyWrapper string
|
||||||
|
|
||||||
|
const nsKey = keyWrapper("namespace")
|
||||||
|
|
||||||
|
// GetNamespace extract namespace from context.
|
||||||
|
func GetNamespace(ctx context.Context) (string, error) {
|
||||||
|
ns, ok := ctx.Value(nsKey).(string)
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("couldn't get namespace from context")
|
||||||
|
}
|
||||||
|
|
||||||
|
return ns, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetNamespace sets namespace in the context.
|
||||||
|
func SetNamespace(ctx context.Context, ns string) context.Context {
|
||||||
|
return context.WithValue(ctx, nsKey, ns)
|
||||||
|
}
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
|
||||||
|
@ -28,9 +29,14 @@ type FrostFS interface {
|
||||||
SystemDNS(context.Context) (string, error)
|
SystemDNS(context.Context) (string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Settings interface {
|
||||||
|
FormContainerZone(ns string) (zone string, isDefault bool)
|
||||||
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
FrostFS FrostFS
|
FrostFS FrostFS
|
||||||
RPCAddress string
|
RPCAddress string
|
||||||
|
Settings Settings
|
||||||
}
|
}
|
||||||
|
|
||||||
type ContainerResolver struct {
|
type ContainerResolver struct {
|
||||||
|
@ -135,29 +141,43 @@ func (r *ContainerResolver) equals(resolverNames []string) bool {
|
||||||
func newResolver(name string, cfg *Config) (*Resolver, error) {
|
func newResolver(name string, cfg *Config) (*Resolver, error) {
|
||||||
switch name {
|
switch name {
|
||||||
case DNSResolver:
|
case DNSResolver:
|
||||||
return NewDNSResolver(cfg.FrostFS)
|
return NewDNSResolver(cfg.FrostFS, cfg.Settings)
|
||||||
case NNSResolver:
|
case NNSResolver:
|
||||||
return NewNNSResolver(cfg.RPCAddress)
|
return NewNNSResolver(cfg.RPCAddress, cfg.Settings)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unknown resolver: %s", name)
|
return nil, fmt.Errorf("unknown resolver: %s", name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDNSResolver(frostFS FrostFS) (*Resolver, error) {
|
func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) {
|
||||||
if frostFS == nil {
|
if frostFS == nil {
|
||||||
return nil, fmt.Errorf("pool must not be nil for DNS resolver")
|
return nil, fmt.Errorf("pool must not be nil for DNS resolver")
|
||||||
}
|
}
|
||||||
|
if settings == nil {
|
||||||
|
return nil, fmt.Errorf("resolver settings must not be nil for DNS resolver")
|
||||||
|
}
|
||||||
|
|
||||||
var dns ns.DNS
|
var dns ns.DNS
|
||||||
|
|
||||||
resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) {
|
resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) {
|
||||||
domain, err := frostFS.SystemDNS(ctx)
|
var err error
|
||||||
|
|
||||||
|
namespace, err := middleware.GetNamespace(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
zone, isDefault := settings.FormContainerZone(namespace)
|
||||||
|
if isDefault {
|
||||||
|
zone, err = frostFS.SystemDNS(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err)
|
return nil, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
domain = name + "." + domain
|
domain := name + "." + zone
|
||||||
cnrID, err := dns.ResolveContainerName(domain)
|
cnrID, err := dns.ResolveContainerName(domain)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't resolve container '%s' as '%s': %w", name, domain, err)
|
return nil, fmt.Errorf("couldn't resolve container '%s' as '%s': %w", name, domain, err)
|
||||||
}
|
}
|
||||||
|
@ -170,17 +190,32 @@ func NewDNSResolver(frostFS FrostFS) (*Resolver, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNNSResolver(rpcAddress string) (*Resolver, error) {
|
func NewNNSResolver(rpcAddress string, settings Settings) (*Resolver, error) {
|
||||||
|
if rpcAddress == "" {
|
||||||
|
return nil, fmt.Errorf("rpc address must not be empty for NNS resolver")
|
||||||
|
}
|
||||||
|
if settings == nil {
|
||||||
|
return nil, fmt.Errorf("resolver settings must not be nil for NNS resolver")
|
||||||
|
}
|
||||||
|
|
||||||
var nns ns.NNS
|
var nns ns.NNS
|
||||||
|
|
||||||
if err := nns.Dial(rpcAddress); err != nil {
|
if err := nns.Dial(rpcAddress); err != nil {
|
||||||
return nil, fmt.Errorf("could not dial nns: %w", err)
|
return nil, fmt.Errorf("could not dial nns: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resolveFunc := func(_ context.Context, name string) (*cid.ID, error) {
|
resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) {
|
||||||
var d container.Domain
|
var d container.Domain
|
||||||
d.SetName(name)
|
d.SetName(name)
|
||||||
|
|
||||||
|
namespace, err := middleware.GetNamespace(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
zone, _ := settings.FormContainerZone(namespace)
|
||||||
|
d.SetZone(zone)
|
||||||
|
|
||||||
cnrID, err := nns.ResolveContainerDomain(d)
|
cnrID, err := nns.ResolveContainerDomain(d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't resolve container '%s': %w", name, err)
|
return nil, fmt.Errorf("couldn't resolve container '%s': %w", name, err)
|
||||||
|
|
Loading…
Reference in a new issue