From d2587b21afcf206e00c6d15affd73e6f49dc7da8 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 14 Nov 2022 12:37:47 +0300 Subject: [PATCH] [#747] Reload policies on SIGHUP Signed-off-by: Denis Kirillov --- api/handler/api.go | 6 +- api/handler/handlers_test.go | 18 ++++ api/handler/put.go | 4 +- cmd/s3-gw/app.go | 162 +++++++++++++++++++++++------------ 4 files changed, 131 insertions(+), 59 deletions(-) diff --git a/api/handler/api.go b/api/handler/api.go index a31546681..f7c49680c 100644 --- a/api/handler/api.go +++ b/api/handler/api.go @@ -32,9 +32,9 @@ type ( CopiesNumber uint32 } - PlacementPolicy struct { - Default netmap.PlacementPolicy - RegionMap map[string]netmap.PlacementPolicy + PlacementPolicy interface { + Default() netmap.PlacementPolicy + Get(string) (netmap.PlacementPolicy, bool) } ) diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index 83a15a063..ab2e1b3c6 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -19,6 +19,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/api/resolver" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" @@ -50,6 +51,18 @@ func (hc *handlerContext) Context() context.Context { return hc.context } +type placementPolicyMock struct { + defaultPolicy netmap.PlacementPolicy +} + +func (p *placementPolicyMock) Default() netmap.PlacementPolicy { + return p.defaultPolicy +} + +func (p *placementPolicyMock) Get(string) (netmap.PlacementPolicy, bool) { + return netmap.PlacementPolicy{}, false +} + func prepareHandlerContext(t *testing.T) *handlerContext { key, err := keys.NewPrivateKey() require.NoError(t, err) @@ -72,11 +85,16 @@ func prepareHandlerContext(t *testing.T) *handlerContext { TreeService: layer.NewTreeService(), } + var pp netmap.PlacementPolicy + err = pp.DecodeString("REP 1") + require.NoError(t, err) + h := &handler{ log: l, obj: layer.NewLayer(l, tp, layerCfg), cfg: &Config{ TLSEnabled: true, + Policy: &placementPolicyMock{defaultPolicy: pp}, }, } diff --git a/api/handler/put.go b/api/handler/put.go index fc231dd7f..ee50b1b1f 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -748,13 +748,13 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) { } func (h handler) setPolicy(prm *layer.CreateBucketParams, locationConstraint string, userPolicies []*accessbox.ContainerPolicy) { - prm.Policy = h.cfg.Policy.Default + prm.Policy = h.cfg.Policy.Default() if locationConstraint == "" { return } - if policy, ok := h.cfg.Policy.RegionMap[locationConstraint]; ok { + if policy, ok := h.cfg.Policy.Get(locationConstraint); ok { prm.Policy = policy prm.LocationConstraint = locationConstraint } diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 8ccb3f979..df504e82f 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -58,7 +58,8 @@ type ( } appSettings struct { - LogLevel zap.AtomicLevel + logLevel zap.AtomicLevel + policies *placementPolicy } Logger struct { @@ -86,6 +87,12 @@ type ( SetHealth(int32) Unregister() } + + placementPolicy struct { + mu sync.RWMutex + defaultPolicy netmap.PlacementPolicy + regionMap map[string]netmap.PlacementPolicy + } ) func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { @@ -105,7 +112,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { wrkDone: make(chan struct{}, 1), maxClients: newMaxClients(v), - settings: &appSettings{LogLevel: log.lvl}, + settings: newAppSettings(log, v), } app.init(ctx) @@ -114,7 +121,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { } func (a *App) init(ctx context.Context) { - a.initHandlers(ctx) + a.initAPI(ctx) a.initMetrics() a.initTLSProvider() } @@ -160,16 +167,30 @@ func (a *App) initLayer(ctx context.Context) { } } -func (a *App) initHandlers(ctx context.Context) { - a.initLayer(ctx) - - var err error - handlerOptions := getHandlerOptions(a.cfg, a.log) - - a.api, err = handler.New(a.log, a.obj, a.nc, handlerOptions) +func newAppSettings(log *Logger, v *viper.Viper) *appSettings { + policies, err := newPlacementPolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile)) if err != nil { - a.log.Fatal("could not initialize API handler", zap.Error(err)) + log.logger.Fatal("failed to create new policy mapping", zap.Error(err)) } + + return &appSettings{ + logLevel: log.lvl, + policies: policies, + } +} + +func getDefaultPolicyValue(v *viper.Viper) string { + defaultPolicyStr := handler.DefaultPolicy + if v.IsSet(cfgPolicyDefault) { + defaultPolicyStr = v.GetString(cfgPolicyDefault) + } + + return defaultPolicyStr +} + +func (a *App) initAPI(ctx context.Context) { + a.initLayer(ctx) + a.initHandler() } func (a *App) initMetrics() { @@ -282,6 +303,63 @@ func getPool(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.P return p, key } +func newPlacementPolicy(defaultPolicy string, regionPolicyFilepath string) (*placementPolicy, error) { + policies := &placementPolicy{ + regionMap: make(map[string]netmap.PlacementPolicy), + } + + return policies, policies.update(defaultPolicy, regionPolicyFilepath) +} + +func (p *placementPolicy) Default() netmap.PlacementPolicy { + p.mu.RLock() + defer p.mu.RUnlock() + return p.defaultPolicy +} + +func (p *placementPolicy) Get(name string) (netmap.PlacementPolicy, bool) { + p.mu.RLock() + policy, ok := p.regionMap[name] + p.mu.RUnlock() + + return policy, ok +} + +func (p *placementPolicy) update(defaultPolicy string, regionPolicyFilepath string) error { + var defaultPlacementPolicy netmap.PlacementPolicy + if err := defaultPlacementPolicy.DecodeString(defaultPolicy); err != nil { + return fmt.Errorf("parse default policy '%s': %w", defaultPolicy, err) + } + + regionPolicyMap, err := readRegionMap(regionPolicyFilepath) + if err != nil { + return fmt.Errorf("read region map file: %w", err) + } + + regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap)) + for region, policy := range regionPolicyMap { + var pp netmap.PlacementPolicy + if err = pp.DecodeString(policy); err == nil { + regionMap[region] = pp + continue + } + + if err = pp.UnmarshalJSON([]byte(policy)); err == nil { + regionMap[region] = pp + continue + } + + return fmt.Errorf("parse region '%s' to policy mapping: %w", region, err) + } + + p.mu.Lock() + p.defaultPolicy = defaultPlacementPolicy + p.regionMap = regionMap + p.mu.Unlock() + + return nil +} + func newAppMetrics(logger *zap.Logger, provider GateMetricsCollector, enabled bool) *appMetrics { if !enabled { logger.Warn("metrics are disabled") @@ -499,7 +577,11 @@ func (a *App) updateSettings() { if lvl, err := getLogLevel(a.cfg); err != nil { a.log.Warn("log level won't be updated", zap.Error(err)) } else { - a.settings.LogLevel.SetLevel(lvl) + a.settings.logLevel.SetLevel(lvl) + } + + if err := a.settings.policies.update(getDefaultPolicyValue(a.cfg), a.cfg.GetString(cfgPolicyRegionMapFile)); err != nil { + a.log.Warn("policies won't be updated", zap.Error(err)) } } @@ -603,68 +685,40 @@ func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config { return cacheCfg } -func getHandlerOptions(v *viper.Viper, l *zap.Logger) *handler.Config { +func (a *App) initHandler() { cfg := &handler.Config{ - Policy: handler.PlacementPolicy{ - RegionMap: make(map[string]netmap.PlacementPolicy), - }, + Policy: a.settings.policies, DefaultMaxAge: handler.DefaultMaxAge, - NotificatorEnabled: v.GetBool(cfgEnableNATS), - TLSEnabled: v.IsSet(cfgTLSKeyFile) && v.IsSet(cfgTLSCertFile), + NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS), + TLSEnabled: a.cfg.IsSet(cfgTLSKeyFile) && a.cfg.IsSet(cfgTLSCertFile), CopiesNumber: handler.DefaultCopiesNumber, } - defaultPolicyStr := handler.DefaultPolicy - if v.IsSet(cfgPolicyDefault) { - defaultPolicyStr = v.GetString(cfgPolicyDefault) - } - - if err := cfg.Policy.Default.DecodeString(defaultPolicyStr); err != nil { - l.Fatal("couldn't parse container default policy", zap.Error(err)) - } - - regionPolicyMap, err := parseRegionMap(v) - if err != nil { - l.Fatal("couldn't parse region mapping policy file", zap.Error(err)) - } - - for region, policy := range regionPolicyMap { - var pp netmap.PlacementPolicy - if err = pp.DecodeString(policy); err == nil { - cfg.Policy.RegionMap[region] = pp - continue - } - - if err = pp.UnmarshalJSON([]byte(policy)); err == nil { - cfg.Policy.RegionMap[region] = pp - continue - } - - l.Fatal("couldn't parse region mapping policy", zap.String("name", region), zap.Error(err)) - } - - if v.IsSet(cfgDefaultMaxAge) { - defaultMaxAge := v.GetInt(cfgDefaultMaxAge) + if a.cfg.IsSet(cfgDefaultMaxAge) { + defaultMaxAge := a.cfg.GetInt(cfgDefaultMaxAge) if defaultMaxAge <= 0 && defaultMaxAge != -1 { - l.Fatal("invalid defaultMaxAge", + a.log.Fatal("invalid defaultMaxAge", zap.String("parameter", cfgDefaultMaxAge), zap.String("value in config", strconv.Itoa(defaultMaxAge))) } cfg.DefaultMaxAge = defaultMaxAge } - if val := v.GetUint32(cfgSetCopiesNumber); val > 0 { + if val := a.cfg.GetUint32(cfgSetCopiesNumber); val > 0 { cfg.CopiesNumber = val } - return cfg + var err error + a.api, err = handler.New(a.log, a.obj, a.nc, cfg) + if err != nil { + a.log.Fatal("could not initialize API handler", zap.Error(err)) + } } -func parseRegionMap(v *viper.Viper) (map[string]string, error) { +func readRegionMap(filePath string) (map[string]string, error) { regionMap := make(map[string]string) - filePath := v.GetString(cfgPolicyRegionMapFile) if filePath == "" { return regionMap, nil }