From a375af7d98373a5d7177a04755bc87d95f1b77ef Mon Sep 17 00:00:00 2001 From: Roman Loginov Date: Tue, 28 Nov 2023 11:29:08 +0300 Subject: [PATCH] [#91] Add support namespaces Signed-off-by: Roman Loginov --- CHANGELOG.md | 3 +- cmd/http-gw/app.go | 66 ++++++++++++++++++++++++++--- cmd/http-gw/integration_test.go | 38 ++++++++++++++++- cmd/http-gw/settings.go | 10 +++++ config/config.env | 5 +++ config/config.yaml | 4 ++ docs/gate-configuration.md | 49 ++++++++++++++------- go.mod | 2 +- internal/cache/buckets.go | 10 +++-- internal/handler/handler.go | 9 +++- internal/handler/middleware/util.go | 26 ++++++++++++ resolver/resolver.go | 51 ++++++++++++++++++---- 12 files changed, 236 insertions(+), 37 deletions(-) create mode 100644 internal/handler/middleware/util.go diff --git a/CHANGELOG.md b/CHANGELOG.md index fd19460..e2dd2c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,8 @@ This document outlines major changes between releases. - Add `frostfs.buffer_max_size_for_put` config param - Add bucket/container caching - Disable homomorphic hash for PUT if it's disabled in container itself -- Add new `logger.destination` config param (#89) +- Add new `logger.destination` config param (#89) +- Add support namespaces (#91) ### Changed - Update prometheus to v1.15.0 (#35) diff --git a/cmd/http-gw/app.go b/cmd/http-gw/app.go index 3878277..1ad1f20 100644 --- a/cmd/http-gw/app.go +++ b/cmd/http-gw/app.go @@ -7,13 +7,16 @@ import ( "os" "os/signal" "runtime/debug" + "strings" "sync" "syscall" "time" + v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" @@ -34,6 +37,7 @@ import ( "github.com/spf13/viper" "github.com/valyala/fasthttp" "go.uber.org/zap" + "golang.org/x/exp/slices" ) type ( @@ -78,6 +82,8 @@ type ( zipCompression bool clientCut bool bufferMaxSizeForPut uint64 + namespaceHeader string + defaultNamespaces []string } ) @@ -209,6 +215,7 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) { resolveCfg := &resolver.Config{ FrostFS: resolver.NewFrostFSResolver(a.pool), RPCAddress: a.cfg.GetString(cfgRPCEndpoint), + Settings: a.settings, } order := a.cfg.GetStringSlice(cfgResolveOrder) @@ -477,6 +484,8 @@ func (a *app) updateSettings() { a.settings.setZipCompression(a.cfg.GetBool(cfgZipCompression)) a.settings.setClientCut(a.cfg.GetBool(cfgClientCut)) a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut)) + a.settings.setNamespaceHeader(a.cfg.GetString(cfgResolveNamespaceHeader)) + a.settings.setDefaultNamespaces(a.cfg.GetStringSlice(cfgResolveDefaultNamespaces)) } func (a *app) startServices() { @@ -510,15 +519,15 @@ func (a *app) configureRouter(handler *handler.Handler) { response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed) } - r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(handler.Upload)))) + r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.Upload))))) a.log.Info(logs.AddedPathUploadCid) - r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadByAddressOrBucketName)))) - r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(handler.HeadByAddressOrBucketName)))) + r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAddressOrBucketName))))) + r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAddressOrBucketName))))) a.log.Info(logs.AddedPathGetCidOid) - r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadByAttribute)))) - r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(handler.HeadByAttribute)))) + r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAttribute))))) + r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAttribute))))) a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal) - r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadZipped)))) + r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadZipped))))) a.log.Info(logs.AddedPathZipCidPrefix) a.webServer.Handler = r.Handler @@ -562,6 +571,18 @@ func (a *app) tracer(h fasthttp.RequestHandler) fasthttp.RequestHandler { } } +func (a *app) reqNamespace(h fasthttp.RequestHandler) fasthttp.RequestHandler { + return func(req *fasthttp.RequestCtx) { + appCtx := utils.GetContextFromRequest(req) + + nsBytes := req.Request.Header.Peek(a.settings.NamespaceHeader()) + appCtx = middleware.SetNamespace(appCtx, string(nsBytes)) + + utils.SetContextToRequest(appCtx, req) + h(req) + } +} + func (a *app) AppParams() *utils.AppParams { return &utils.AppParams{ Logger: a.log, @@ -669,3 +690,36 @@ func (a *app) setRuntimeParameters() { zap.Int64("old_value", previous)) } } + +func (s *appSettings) NamespaceHeader() string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.namespaceHeader +} + +func (s *appSettings) setNamespaceHeader(nsHeader string) { + s.mu.Lock() + s.namespaceHeader = nsHeader + s.mu.Unlock() +} + +func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) { + s.mu.RLock() + namespaces := s.defaultNamespaces + s.mu.RUnlock() + if slices.Contains(namespaces, ns) { + return v2container.SysAttributeZoneDefault, true + } + + return ns + ".ns", false +} + +func (s *appSettings) setDefaultNamespaces(namespaces []string) { + for i := range namespaces { // to be set namespaces in env variable as `HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"` + namespaces[i] = strings.Trim(namespaces[i], "\"") + } + + s.mu.Lock() + s.defaultNamespaces = namespaces + s.mu.Unlock() +} diff --git a/cmd/http-gw/integration_test.go b/cmd/http-gw/integration_test.go index 76a8325..f76c3ce 100644 --- a/cmd/http-gw/integration_test.go +++ b/cmd/http-gw/integration_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" + "go.uber.org/zap/zapcore" ) type putResponse struct { @@ -68,6 +69,7 @@ func TestIntegration(t *testing.T) { t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, ownerID, CID, version) }) t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID, version) }) t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID, version) }) + t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID, version) }) cancel() server.Wait() @@ -81,7 +83,7 @@ func runServer() (App, context.CancelFunc) { cancelCtx, cancel := context.WithCancel(context.Background()) v := getDefaultConfig() - l, lvl := newLogger(v) + l, lvl := newStdoutLogger(zapcore.DebugLevel) application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl)) go application.Serve() @@ -338,6 +340,40 @@ func checkZip(t *testing.T, data []byte, length int64, names, contents []string) } } +func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, version string) { + content := "content of file" + attributes := map[string]string{ + "some-attr": "some-get-value", + } + + id := putObject(ctx, t, clientPool, ownerID, CID, content, attributes) + + req, err := http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil) + require.NoError(t, err) + req.Header.Set(defaultNamespaceHeader, "") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + checkGetResponse(t, resp, content, attributes) + + req, err = http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil) + require.NoError(t, err) + req.Header.Set(defaultNamespaceHeader, "root") + + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + checkGetResponse(t, resp, content, attributes) + + req, err = http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil) + require.NoError(t, err) + req.Header.Set(defaultNamespaceHeader, "root2") + + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, resp.StatusCode) + +} + func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container { req := testcontainers.ContainerRequest{ Image: image, diff --git a/cmd/http-gw/settings.go b/cmd/http-gw/settings.go index 6e633ba..24e4f37 100644 --- a/cmd/http-gw/settings.go +++ b/cmd/http-gw/settings.go @@ -49,6 +49,8 @@ const ( defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb + defaultNamespaceHeader = "X-Frostfs-Namespace" + cfgServer = "server" cfgTLSEnabled = "tls.enabled" cfgTLSCertFile = "tls.cert_file" @@ -116,6 +118,10 @@ const ( cfgBucketsCacheLifetime = "cache.buckets.lifetime" cfgBucketsCacheSize = "cache.buckets.size" + // Bucket resolving options. + cfgResolveNamespaceHeader = "resolve_bucket.namespace_header" + cfgResolveDefaultNamespaces = "resolve_bucket.default_namespaces" + // Command line args. cmdHelp = "help" cmdVersion = "version" @@ -199,6 +205,10 @@ func settings() *viper.Viper { v.SetDefault(cfgPprofAddress, "localhost:8083") v.SetDefault(cfgPrometheusAddress, "localhost:8084") + // resolve bucket + v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader) + v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"}) + // Binding flags if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil { panic(err) diff --git a/config/config.env b/config/config.env index 739cb96..be42af9 100644 --- a/config/config.env +++ b/config/config.env @@ -109,3 +109,8 @@ HTTP_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576 # Cache which contains mapping of bucket name to bucket info HTTP_GW_CACHE_BUCKETS_LIFETIME=1m HTTP_GW_CACHE_BUCKETS_SIZE=1000 + +# Header to determine zone to resolve bucket name +HTTP_GW_RESOLVE_BUCKET_NAMESPACE_HEADER=X-Frostfs-Namespace +# Namespaces that should be handled as default +HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root" diff --git a/config/config.yaml b/config/config.yaml index 6ab9994..020b0dd 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -119,3 +119,7 @@ cache: buckets: lifetime: 1m size: 1000 + +resolve_bucket: + namespace_header: X-Frostfs-Namespace + default_namespaces: [ "", "root" ] \ No newline at end of file diff --git a/docs/gate-configuration.md b/docs/gate-configuration.md index 1b51848..fe4b50f 100644 --- a/docs/gate-configuration.md +++ b/docs/gate-configuration.md @@ -40,22 +40,23 @@ $ cat http.log # Structure -| Section | Description | -|-----------------|-------------------------------------------------------| -| no section | [General parameters](#general-section) | -| `wallet` | [Wallet configuration](#wallet-section) | -| `peers` | [Nodes configuration](#peers-section) | -| `logger` | [Logger configuration](#logger-section) | -| `web` | [Web configuration](#web-section) | -| `server` | [Server configuration](#server-section) | -| `upload-header` | [Upload header configuration](#upload-header-section) | -| `zip` | [ZIP configuration](#zip-section) | -| `pprof` | [Pprof configuration](#pprof-section) | -| `prometheus` | [Prometheus configuration](#prometheus-section) | -| `tracing` | [Tracing configuration](#tracing-section) | -| `runtime` | [Runtime configuration](#runtime-section) | -| `frostfs` | [Frostfs configuration](#frostfs-section) | -| `cache` | [Cache configuration](#cache-section) | +| Section | Description | +|------------------|----------------------------------------------------------------| +| no section | [General parameters](#general-section) | +| `wallet` | [Wallet configuration](#wallet-section) | +| `peers` | [Nodes configuration](#peers-section) | +| `logger` | [Logger configuration](#logger-section) | +| `web` | [Web configuration](#web-section) | +| `server` | [Server configuration](#server-section) | +| `upload-header` | [Upload header configuration](#upload-header-section) | +| `zip` | [ZIP configuration](#zip-section) | +| `pprof` | [Pprof configuration](#pprof-section) | +| `prometheus` | [Prometheus configuration](#prometheus-section) | +| `tracing` | [Tracing configuration](#tracing-section) | +| `runtime` | [Runtime configuration](#runtime-section) | +| `frostfs` | [Frostfs configuration](#frostfs-section) | +| `cache` | [Cache configuration](#cache-section) | +| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) | # General section @@ -315,3 +316,19 @@ size: 1000 |------------|------------|------------------|-------------------------------| | `lifetime` | `duration` | depends on cache | Lifetime of entries in cache. | | `size` | `int` | depends on cache | LRU cache size. | + + +# `resolve_bucket` section + +Bucket name resolving parameters from and to container ID. + +```yaml +resolve_bucket: + namespace_header: X-Frostfs-Namespace + default_namespaces: [ "", "root" ] +``` + +| Parameter | Type | SIGHUP reload | Default value | Description | +|----------------------|------------|---------------|-----------------------|--------------------------------------------------------------------------------------------------------------------------| +| `namespace_header` | `string` | yes | `X-Frostfs-Namespace` | Header to determine zone to resolve bucket name. | +| `default_namespaces` | `[]string` | yes | ["","root"] | Namespaces that should be handled as default. | \ No newline at end of file diff --git a/go.mod b/go.mod index 98abcb7..c398358 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 go.uber.org/zap v1.24.0 + golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc google.golang.org/grpc v1.55.0 ) @@ -102,7 +103,6 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.9.0 // indirect - golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/sync v0.2.0 // indirect golang.org/x/sys v0.8.0 // indirect diff --git a/internal/cache/buckets.go b/internal/cache/buckets.go index abeda6a..f8e6d88 100644 --- a/internal/cache/buckets.go +++ b/internal/cache/buckets.go @@ -46,8 +46,8 @@ func NewBucketCache(config *Config) *BucketCache { } // Get returns a cached object. -func (o *BucketCache) Get(key string) *data.BucketInfo { - entry, err := o.cache.Get(key) +func (o *BucketCache) Get(ns, bktName string) *data.BucketInfo { + entry, err := o.cache.Get(formKey(ns, bktName)) if err != nil { return nil } @@ -64,5 +64,9 @@ func (o *BucketCache) Get(key string) *data.BucketInfo { // Put puts an object to cache. func (o *BucketCache) Put(bkt *data.BucketInfo) error { - return o.cache.Set(bkt.Name, bkt) + return o.cache.Set(formKey(bkt.Zone, bkt.Name), bkt) +} + +func formKey(ns, name string) string { + return name + "." + ns } diff --git a/internal/handler/handler.go b/internal/handler/handler.go index fa3c364..757b5be 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -10,6 +10,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" @@ -31,6 +32,7 @@ type Config interface { ZipCompression() bool ClientCut() bool BufferMaxSizeForPut() uint64 + NamespaceHeader() string } type Handler struct { @@ -190,7 +192,12 @@ func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*ci } func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) { - if bktInfo := h.cache.Get(containerName); bktInfo != nil { + ns, err := middleware.GetNamespace(ctx) + if err != nil { + return nil, err + } + + if bktInfo := h.cache.Get(ns, containerName); bktInfo != nil { return bktInfo, nil } diff --git a/internal/handler/middleware/util.go b/internal/handler/middleware/util.go new file mode 100644 index 0000000..284513a --- /dev/null +++ b/internal/handler/middleware/util.go @@ -0,0 +1,26 @@ +package middleware + +import ( + "context" + "fmt" +) + +// keyWrapper is wrapper for context keys. +type keyWrapper string + +const nsKey = keyWrapper("namespace") + +// GetNamespace extract namespace from context. +func GetNamespace(ctx context.Context) (string, error) { + ns, ok := ctx.Value(nsKey).(string) + if !ok { + return "", fmt.Errorf("couldn't get namespace from context") + } + + return ns, nil +} + +// SetNamespace sets namespace in the context. +func SetNamespace(ctx context.Context, ns string) context.Context { + return context.WithValue(ctx, nsKey, ns) +} diff --git a/resolver/resolver.go b/resolver/resolver.go index e6707e2..e7615d4 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns" @@ -28,9 +29,14 @@ type FrostFS interface { SystemDNS(context.Context) (string, error) } +type Settings interface { + FormContainerZone(ns string) (zone string, isDefault bool) +} + type Config struct { FrostFS FrostFS RPCAddress string + Settings Settings } type ContainerResolver struct { @@ -135,29 +141,43 @@ func (r *ContainerResolver) equals(resolverNames []string) bool { func newResolver(name string, cfg *Config) (*Resolver, error) { switch name { case DNSResolver: - return NewDNSResolver(cfg.FrostFS) + return NewDNSResolver(cfg.FrostFS, cfg.Settings) case NNSResolver: - return NewNNSResolver(cfg.RPCAddress) + return NewNNSResolver(cfg.RPCAddress, cfg.Settings) default: return nil, fmt.Errorf("unknown resolver: %s", name) } } -func NewDNSResolver(frostFS FrostFS) (*Resolver, error) { +func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) { if frostFS == nil { return nil, fmt.Errorf("pool must not be nil for DNS resolver") } + if settings == nil { + return nil, fmt.Errorf("resolver settings must not be nil for DNS resolver") + } var dns ns.DNS resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) { - domain, err := frostFS.SystemDNS(ctx) + var err error + + namespace, err := middleware.GetNamespace(ctx) if err != nil { - return nil, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err) + return nil, err } - domain = name + "." + domain + zone, isDefault := settings.FormContainerZone(namespace) + if isDefault { + zone, err = frostFS.SystemDNS(ctx) + if err != nil { + return nil, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err) + } + } + + domain := name + "." + zone cnrID, err := dns.ResolveContainerName(domain) + if err != nil { return nil, fmt.Errorf("couldn't resolve container '%s' as '%s': %w", name, domain, err) } @@ -170,17 +190,32 @@ func NewDNSResolver(frostFS FrostFS) (*Resolver, error) { }, nil } -func NewNNSResolver(rpcAddress string) (*Resolver, error) { +func NewNNSResolver(rpcAddress string, settings Settings) (*Resolver, error) { + if rpcAddress == "" { + return nil, fmt.Errorf("rpc address must not be empty for NNS resolver") + } + if settings == nil { + return nil, fmt.Errorf("resolver settings must not be nil for NNS resolver") + } + var nns ns.NNS if err := nns.Dial(rpcAddress); err != nil { return nil, fmt.Errorf("could not dial nns: %w", err) } - resolveFunc := func(_ context.Context, name string) (*cid.ID, error) { + resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) { var d container.Domain d.SetName(name) + namespace, err := middleware.GetNamespace(ctx) + if err != nil { + return nil, err + } + + zone, _ := settings.FormContainerZone(namespace) + d.SetZone(zone) + cnrID, err := nns.ResolveContainerDomain(d) if err != nil { return nil, fmt.Errorf("couldn't resolve container '%s': %w", name, err)