[#747] Reload policies on SIGHUP
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
f5fe9a9b4b
commit
d2587b21af
4 changed files with 131 additions and 59 deletions
|
@ -32,9 +32,9 @@ type (
|
|||
CopiesNumber uint32
|
||||
}
|
||||
|
||||
PlacementPolicy struct {
|
||||
Default netmap.PlacementPolicy
|
||||
RegionMap map[string]netmap.PlacementPolicy
|
||||
PlacementPolicy interface {
|
||||
Default() netmap.PlacementPolicy
|
||||
Get(string) (netmap.PlacementPolicy, bool)
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/resolver"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||
|
@ -50,6 +51,18 @@ func (hc *handlerContext) Context() context.Context {
|
|||
return hc.context
|
||||
}
|
||||
|
||||
type placementPolicyMock struct {
|
||||
defaultPolicy netmap.PlacementPolicy
|
||||
}
|
||||
|
||||
func (p *placementPolicyMock) Default() netmap.PlacementPolicy {
|
||||
return p.defaultPolicy
|
||||
}
|
||||
|
||||
func (p *placementPolicyMock) Get(string) (netmap.PlacementPolicy, bool) {
|
||||
return netmap.PlacementPolicy{}, false
|
||||
}
|
||||
|
||||
func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||
key, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
@ -72,11 +85,16 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
|
|||
TreeService: layer.NewTreeService(),
|
||||
}
|
||||
|
||||
var pp netmap.PlacementPolicy
|
||||
err = pp.DecodeString("REP 1")
|
||||
require.NoError(t, err)
|
||||
|
||||
h := &handler{
|
||||
log: l,
|
||||
obj: layer.NewLayer(l, tp, layerCfg),
|
||||
cfg: &Config{
|
||||
TLSEnabled: true,
|
||||
Policy: &placementPolicyMock{defaultPolicy: pp},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -748,13 +748,13 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (h handler) setPolicy(prm *layer.CreateBucketParams, locationConstraint string, userPolicies []*accessbox.ContainerPolicy) {
|
||||
prm.Policy = h.cfg.Policy.Default
|
||||
prm.Policy = h.cfg.Policy.Default()
|
||||
|
||||
if locationConstraint == "" {
|
||||
return
|
||||
}
|
||||
|
||||
if policy, ok := h.cfg.Policy.RegionMap[locationConstraint]; ok {
|
||||
if policy, ok := h.cfg.Policy.Get(locationConstraint); ok {
|
||||
prm.Policy = policy
|
||||
prm.LocationConstraint = locationConstraint
|
||||
}
|
||||
|
|
162
cmd/s3-gw/app.go
162
cmd/s3-gw/app.go
|
@ -58,7 +58,8 @@ type (
|
|||
}
|
||||
|
||||
appSettings struct {
|
||||
LogLevel zap.AtomicLevel
|
||||
logLevel zap.AtomicLevel
|
||||
policies *placementPolicy
|
||||
}
|
||||
|
||||
Logger struct {
|
||||
|
@ -86,6 +87,12 @@ type (
|
|||
SetHealth(int32)
|
||||
Unregister()
|
||||
}
|
||||
|
||||
placementPolicy struct {
|
||||
mu sync.RWMutex
|
||||
defaultPolicy netmap.PlacementPolicy
|
||||
regionMap map[string]netmap.PlacementPolicy
|
||||
}
|
||||
)
|
||||
|
||||
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
||||
|
@ -105,7 +112,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
|||
wrkDone: make(chan struct{}, 1),
|
||||
|
||||
maxClients: newMaxClients(v),
|
||||
settings: &appSettings{LogLevel: log.lvl},
|
||||
settings: newAppSettings(log, v),
|
||||
}
|
||||
|
||||
app.init(ctx)
|
||||
|
@ -114,7 +121,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
|||
}
|
||||
|
||||
func (a *App) init(ctx context.Context) {
|
||||
a.initHandlers(ctx)
|
||||
a.initAPI(ctx)
|
||||
a.initMetrics()
|
||||
a.initTLSProvider()
|
||||
}
|
||||
|
@ -160,16 +167,30 @@ func (a *App) initLayer(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (a *App) initHandlers(ctx context.Context) {
|
||||
a.initLayer(ctx)
|
||||
|
||||
var err error
|
||||
handlerOptions := getHandlerOptions(a.cfg, a.log)
|
||||
|
||||
a.api, err = handler.New(a.log, a.obj, a.nc, handlerOptions)
|
||||
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||
policies, err := newPlacementPolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile))
|
||||
if err != nil {
|
||||
a.log.Fatal("could not initialize API handler", zap.Error(err))
|
||||
log.logger.Fatal("failed to create new policy mapping", zap.Error(err))
|
||||
}
|
||||
|
||||
return &appSettings{
|
||||
logLevel: log.lvl,
|
||||
policies: policies,
|
||||
}
|
||||
}
|
||||
|
||||
func getDefaultPolicyValue(v *viper.Viper) string {
|
||||
defaultPolicyStr := handler.DefaultPolicy
|
||||
if v.IsSet(cfgPolicyDefault) {
|
||||
defaultPolicyStr = v.GetString(cfgPolicyDefault)
|
||||
}
|
||||
|
||||
return defaultPolicyStr
|
||||
}
|
||||
|
||||
func (a *App) initAPI(ctx context.Context) {
|
||||
a.initLayer(ctx)
|
||||
a.initHandler()
|
||||
}
|
||||
|
||||
func (a *App) initMetrics() {
|
||||
|
@ -282,6 +303,63 @@ func getPool(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.P
|
|||
return p, key
|
||||
}
|
||||
|
||||
func newPlacementPolicy(defaultPolicy string, regionPolicyFilepath string) (*placementPolicy, error) {
|
||||
policies := &placementPolicy{
|
||||
regionMap: make(map[string]netmap.PlacementPolicy),
|
||||
}
|
||||
|
||||
return policies, policies.update(defaultPolicy, regionPolicyFilepath)
|
||||
}
|
||||
|
||||
func (p *placementPolicy) Default() netmap.PlacementPolicy {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return p.defaultPolicy
|
||||
}
|
||||
|
||||
func (p *placementPolicy) Get(name string) (netmap.PlacementPolicy, bool) {
|
||||
p.mu.RLock()
|
||||
policy, ok := p.regionMap[name]
|
||||
p.mu.RUnlock()
|
||||
|
||||
return policy, ok
|
||||
}
|
||||
|
||||
func (p *placementPolicy) update(defaultPolicy string, regionPolicyFilepath string) error {
|
||||
var defaultPlacementPolicy netmap.PlacementPolicy
|
||||
if err := defaultPlacementPolicy.DecodeString(defaultPolicy); err != nil {
|
||||
return fmt.Errorf("parse default policy '%s': %w", defaultPolicy, err)
|
||||
}
|
||||
|
||||
regionPolicyMap, err := readRegionMap(regionPolicyFilepath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read region map file: %w", err)
|
||||
}
|
||||
|
||||
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap))
|
||||
for region, policy := range regionPolicyMap {
|
||||
var pp netmap.PlacementPolicy
|
||||
if err = pp.DecodeString(policy); err == nil {
|
||||
regionMap[region] = pp
|
||||
continue
|
||||
}
|
||||
|
||||
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
|
||||
regionMap[region] = pp
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf("parse region '%s' to policy mapping: %w", region, err)
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
p.defaultPolicy = defaultPlacementPolicy
|
||||
p.regionMap = regionMap
|
||||
p.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newAppMetrics(logger *zap.Logger, provider GateMetricsCollector, enabled bool) *appMetrics {
|
||||
if !enabled {
|
||||
logger.Warn("metrics are disabled")
|
||||
|
@ -499,7 +577,11 @@ func (a *App) updateSettings() {
|
|||
if lvl, err := getLogLevel(a.cfg); err != nil {
|
||||
a.log.Warn("log level won't be updated", zap.Error(err))
|
||||
} else {
|
||||
a.settings.LogLevel.SetLevel(lvl)
|
||||
a.settings.logLevel.SetLevel(lvl)
|
||||
}
|
||||
|
||||
if err := a.settings.policies.update(getDefaultPolicyValue(a.cfg), a.cfg.GetString(cfgPolicyRegionMapFile)); err != nil {
|
||||
a.log.Warn("policies won't be updated", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -603,68 +685,40 @@ func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
|||
return cacheCfg
|
||||
}
|
||||
|
||||
func getHandlerOptions(v *viper.Viper, l *zap.Logger) *handler.Config {
|
||||
func (a *App) initHandler() {
|
||||
cfg := &handler.Config{
|
||||
Policy: handler.PlacementPolicy{
|
||||
RegionMap: make(map[string]netmap.PlacementPolicy),
|
||||
},
|
||||
Policy: a.settings.policies,
|
||||
DefaultMaxAge: handler.DefaultMaxAge,
|
||||
NotificatorEnabled: v.GetBool(cfgEnableNATS),
|
||||
TLSEnabled: v.IsSet(cfgTLSKeyFile) && v.IsSet(cfgTLSCertFile),
|
||||
NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS),
|
||||
TLSEnabled: a.cfg.IsSet(cfgTLSKeyFile) && a.cfg.IsSet(cfgTLSCertFile),
|
||||
CopiesNumber: handler.DefaultCopiesNumber,
|
||||
}
|
||||
|
||||
defaultPolicyStr := handler.DefaultPolicy
|
||||
if v.IsSet(cfgPolicyDefault) {
|
||||
defaultPolicyStr = v.GetString(cfgPolicyDefault)
|
||||
}
|
||||
|
||||
if err := cfg.Policy.Default.DecodeString(defaultPolicyStr); err != nil {
|
||||
l.Fatal("couldn't parse container default policy", zap.Error(err))
|
||||
}
|
||||
|
||||
regionPolicyMap, err := parseRegionMap(v)
|
||||
if err != nil {
|
||||
l.Fatal("couldn't parse region mapping policy file", zap.Error(err))
|
||||
}
|
||||
|
||||
for region, policy := range regionPolicyMap {
|
||||
var pp netmap.PlacementPolicy
|
||||
if err = pp.DecodeString(policy); err == nil {
|
||||
cfg.Policy.RegionMap[region] = pp
|
||||
continue
|
||||
}
|
||||
|
||||
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
|
||||
cfg.Policy.RegionMap[region] = pp
|
||||
continue
|
||||
}
|
||||
|
||||
l.Fatal("couldn't parse region mapping policy", zap.String("name", region), zap.Error(err))
|
||||
}
|
||||
|
||||
if v.IsSet(cfgDefaultMaxAge) {
|
||||
defaultMaxAge := v.GetInt(cfgDefaultMaxAge)
|
||||
if a.cfg.IsSet(cfgDefaultMaxAge) {
|
||||
defaultMaxAge := a.cfg.GetInt(cfgDefaultMaxAge)
|
||||
|
||||
if defaultMaxAge <= 0 && defaultMaxAge != -1 {
|
||||
l.Fatal("invalid defaultMaxAge",
|
||||
a.log.Fatal("invalid defaultMaxAge",
|
||||
zap.String("parameter", cfgDefaultMaxAge),
|
||||
zap.String("value in config", strconv.Itoa(defaultMaxAge)))
|
||||
}
|
||||
cfg.DefaultMaxAge = defaultMaxAge
|
||||
}
|
||||
|
||||
if val := v.GetUint32(cfgSetCopiesNumber); val > 0 {
|
||||
if val := a.cfg.GetUint32(cfgSetCopiesNumber); val > 0 {
|
||||
cfg.CopiesNumber = val
|
||||
}
|
||||
|
||||
return cfg
|
||||
var err error
|
||||
a.api, err = handler.New(a.log, a.obj, a.nc, cfg)
|
||||
if err != nil {
|
||||
a.log.Fatal("could not initialize API handler", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func parseRegionMap(v *viper.Viper) (map[string]string, error) {
|
||||
func readRegionMap(filePath string) (map[string]string, error) {
|
||||
regionMap := make(map[string]string)
|
||||
|
||||
filePath := v.GetString(cfgPolicyRegionMapFile)
|
||||
if filePath == "" {
|
||||
return regionMap, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue