Denis Kirillov
b59aa06637
Some checks failed
/ Builds (1.19) (pull_request) Successful in 3m0s
/ Builds (1.20) (pull_request) Successful in 2m50s
/ DCO (pull_request) Failing after 1m9s
/ Vulncheck (pull_request) Successful in 1m20s
/ Lint (pull_request) Successful in 7m50s
/ Tests (1.19) (pull_request) Successful in 3m1s
/ Tests (1.20) (pull_request) Successful in 3m21s
Flag allows to skip checking `Content-Encoding` for `aws-chunked` value Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
831 lines
23 KiB
Go
831 lines
23 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/wallet"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/xml"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/go-chi/chi/v5/middleware"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/spf13/viper"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type (
|
|
// App is the main application structure.
|
|
App struct {
|
|
ctr auth.Center
|
|
log *zap.Logger
|
|
cfg *viper.Viper
|
|
pool *pool.Pool
|
|
treePool *treepool.Pool
|
|
key *keys.PrivateKey
|
|
nc *notifications.Controller
|
|
obj layer.Client
|
|
api api.Handler
|
|
|
|
servers []Server
|
|
|
|
metrics *metrics.AppMetrics
|
|
bucketResolver *resolver.BucketResolver
|
|
services []*Service
|
|
settings *appSettings
|
|
|
|
webDone chan struct{}
|
|
wrkDone chan struct{}
|
|
}
|
|
|
|
appSettings struct {
|
|
logLevel zap.AtomicLevel
|
|
policies *placementPolicy
|
|
xmlDecoder *xml.DecoderProvider
|
|
maxClient maxClientsConfig
|
|
bypassContentEncodingInChunks atomic.Bool
|
|
}
|
|
|
|
maxClientsConfig struct {
|
|
deadline time.Duration
|
|
count int
|
|
}
|
|
|
|
Logger struct {
|
|
logger *zap.Logger
|
|
lvl zap.AtomicLevel
|
|
}
|
|
|
|
placementPolicy struct {
|
|
mu sync.RWMutex
|
|
defaultPolicy netmap.PlacementPolicy
|
|
regionMap map[string]netmap.PlacementPolicy
|
|
copiesNumbers map[string][]uint32
|
|
defaultCopiesNumbers []uint32
|
|
}
|
|
)
|
|
|
|
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
|
objPool, treePool, key := getPools(ctx, log.logger, v)
|
|
|
|
// prepare auth center
|
|
ctr := auth.New(frostfs.NewAuthmateFrostFS(objPool), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, log.logger))
|
|
|
|
app := &App{
|
|
ctr: ctr,
|
|
log: log.logger,
|
|
cfg: v,
|
|
pool: objPool,
|
|
treePool: treePool,
|
|
key: key,
|
|
|
|
webDone: make(chan struct{}, 1),
|
|
wrkDone: make(chan struct{}, 1),
|
|
|
|
settings: newAppSettings(log, v),
|
|
}
|
|
|
|
app.init(ctx)
|
|
|
|
return app
|
|
}
|
|
|
|
func (a *App) init(ctx context.Context) {
|
|
a.initAPI(ctx)
|
|
a.initMetrics()
|
|
a.initServers(ctx)
|
|
a.initTracing(ctx)
|
|
}
|
|
|
|
func (a *App) initLayer(ctx context.Context) {
|
|
a.initResolver()
|
|
|
|
// prepare random key for anonymous requests
|
|
randomKey, err := keys.NewPrivateKey()
|
|
if err != nil {
|
|
a.log.Fatal("couldn't generate random key", zap.Error(err))
|
|
}
|
|
|
|
layerCfg := &layer.Config{
|
|
Caches: getCacheOptions(a.cfg, a.log),
|
|
AnonKey: layer.AnonymousKey{
|
|
Key: randomKey,
|
|
},
|
|
Resolver: a.bucketResolver,
|
|
TreeService: tree.NewTree(services.NewPoolWrapper(a.treePool), a.log),
|
|
}
|
|
|
|
// prepare object layer
|
|
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool), layerCfg)
|
|
|
|
if a.cfg.GetBool(cfgEnableNATS) {
|
|
nopts := getNotificationsOptions(a.cfg, a.log)
|
|
a.nc, err = notifications.NewController(nopts, a.log)
|
|
if err != nil {
|
|
a.log.Fatal("failed to enable notifications", zap.Error(err))
|
|
}
|
|
|
|
if err = a.obj.Initialize(ctx, a.nc); err != nil {
|
|
a.log.Fatal("couldn't initialize layer", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
|
policies, err := newPlacementPolicy(log.logger, v)
|
|
if err != nil {
|
|
log.logger.Fatal("failed to create new policy mapping", zap.Error(err))
|
|
}
|
|
|
|
settings := &appSettings{
|
|
logLevel: log.lvl,
|
|
policies: policies,
|
|
xmlDecoder: xml.NewDecoderProvider(v.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload)),
|
|
maxClient: newMaxClients(v),
|
|
}
|
|
|
|
settings.setBypassContentEncodingInChunks(v.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
|
|
|
|
return settings
|
|
}
|
|
|
|
func (s *appSettings) BypassContentEncodingInChunks() bool {
|
|
return s.bypassContentEncodingInChunks.Load()
|
|
}
|
|
|
|
func (s *appSettings) setBypassContentEncodingInChunks(bypass bool) {
|
|
s.bypassContentEncodingInChunks.Store(bypass)
|
|
}
|
|
|
|
func getDefaultPolicyValue(v *viper.Viper) string {
|
|
defaultPolicyStr := handler.DefaultPolicy
|
|
if v.IsSet(cfgPolicyDefault) {
|
|
defaultPolicyStr = v.GetString(cfgPolicyDefault)
|
|
}
|
|
|
|
return defaultPolicyStr
|
|
}
|
|
|
|
func (a *App) initAPI(ctx context.Context) {
|
|
a.initLayer(ctx)
|
|
a.initHandler()
|
|
}
|
|
|
|
func (a *App) initMetrics() {
|
|
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
|
|
a.metrics.State().SetHealth(metrics.HealthStatusStarting)
|
|
}
|
|
|
|
func (a *App) initResolver() {
|
|
var err error
|
|
a.bucketResolver, err = resolver.NewBucketResolver(a.getResolverConfig())
|
|
if err != nil {
|
|
a.log.Fatal("failed to create resolver", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (a *App) getResolverConfig() ([]string, *resolver.Config) {
|
|
resolveCfg := &resolver.Config{
|
|
FrostFS: frostfs.NewResolverFrostFS(a.pool),
|
|
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
|
}
|
|
|
|
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
|
if resolveCfg.RPCAddress == "" {
|
|
order = remove(order, resolver.NNSResolver)
|
|
a.log.Warn(fmt.Sprintf("resolver '%s' won't be used since '%s' isn't provided", resolver.NNSResolver, cfgRPCEndpoint))
|
|
}
|
|
|
|
if len(order) == 0 {
|
|
a.log.Info("container resolver will be disabled because of resolvers 'resolver_order' is empty")
|
|
}
|
|
|
|
return order, resolveCfg
|
|
}
|
|
|
|
func (a *App) initTracing(ctx context.Context) {
|
|
instanceID := ""
|
|
if len(a.servers) > 0 {
|
|
instanceID = a.servers[0].Address()
|
|
}
|
|
cfg := tracing.Config{
|
|
Enabled: a.cfg.GetBool(cfgTracingEnabled),
|
|
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
|
|
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
|
|
Service: "frostfs-s3-gw",
|
|
InstanceID: instanceID,
|
|
Version: version.Version,
|
|
}
|
|
updated, err := tracing.Setup(ctx, cfg)
|
|
if err != nil {
|
|
a.log.Warn("failed to initialize tracing", zap.Error(err))
|
|
}
|
|
if updated {
|
|
a.log.Info("tracing config updated")
|
|
}
|
|
}
|
|
|
|
func (a *App) shutdownTracing() {
|
|
const tracingShutdownTimeout = 5 * time.Second
|
|
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
|
|
defer cancel()
|
|
|
|
if err := tracing.Shutdown(shdnCtx); err != nil {
|
|
a.log.Warn("failed to shutdown tracing", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func newMaxClients(cfg *viper.Viper) maxClientsConfig {
|
|
config := maxClientsConfig{}
|
|
|
|
config.count = cfg.GetInt(cfgMaxClientsCount)
|
|
if config.count <= 0 {
|
|
config.count = defaultMaxClientsCount
|
|
}
|
|
|
|
config.deadline = cfg.GetDuration(cfgMaxClientsDeadline)
|
|
if config.deadline <= 0 {
|
|
config.deadline = defaultMaxClientsDeadline
|
|
}
|
|
|
|
return config
|
|
}
|
|
|
|
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
|
var prm pool.InitParameters
|
|
var prmTree treepool.InitParameters
|
|
|
|
password := wallet.GetPassword(cfg, cfgWalletPassphrase)
|
|
key, err := wallet.GetKeyFromPath(cfg.GetString(cfgWalletPath), cfg.GetString(cfgWalletAddress), password)
|
|
if err != nil {
|
|
logger.Fatal("could not load FrostFS private key", zap.Error(err))
|
|
}
|
|
|
|
prm.SetKey(&key.PrivateKey)
|
|
prmTree.SetKey(key)
|
|
logger.Info("using credentials", zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
|
|
|
for _, peer := range fetchPeers(logger, cfg) {
|
|
prm.AddNode(peer)
|
|
prmTree.AddNode(peer)
|
|
}
|
|
|
|
connTimeout := cfg.GetDuration(cfgConnectTimeout)
|
|
if connTimeout <= 0 {
|
|
connTimeout = defaultConnectTimeout
|
|
}
|
|
prm.SetNodeDialTimeout(connTimeout)
|
|
prmTree.SetNodeDialTimeout(connTimeout)
|
|
|
|
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
|
|
if streamTimeout <= 0 {
|
|
streamTimeout = defaultStreamTimeout
|
|
}
|
|
prm.SetNodeStreamTimeout(streamTimeout)
|
|
prmTree.SetNodeStreamTimeout(streamTimeout)
|
|
|
|
healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout)
|
|
if healthCheckTimeout <= 0 {
|
|
healthCheckTimeout = defaultHealthcheckTimeout
|
|
}
|
|
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
|
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
|
|
|
rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval)
|
|
if rebalanceInterval <= 0 {
|
|
rebalanceInterval = defaultRebalanceInterval
|
|
}
|
|
prm.SetClientRebalanceInterval(rebalanceInterval)
|
|
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
|
|
|
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
|
|
if errorThreshold <= 0 {
|
|
errorThreshold = defaultPoolErrorThreshold
|
|
}
|
|
prm.SetErrorThreshold(errorThreshold)
|
|
prm.SetLogger(logger)
|
|
prmTree.SetLogger(logger)
|
|
|
|
var apiGRPCDialOpts []grpc.DialOption
|
|
var treeGRPCDialOpts []grpc.DialOption
|
|
if cfg.GetBool(cfgTracingEnabled) {
|
|
interceptors := []grpc.DialOption{
|
|
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
|
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
|
|
}
|
|
treeGRPCDialOpts = append(treeGRPCDialOpts, interceptors...)
|
|
apiGRPCDialOpts = append(apiGRPCDialOpts, interceptors...)
|
|
}
|
|
prm.SetGRPCDialOptions(apiGRPCDialOpts...)
|
|
prmTree.SetGRPCDialOptions(treeGRPCDialOpts...)
|
|
|
|
p, err := pool.NewPool(prm)
|
|
if err != nil {
|
|
logger.Fatal("failed to create connection pool", zap.Error(err))
|
|
}
|
|
|
|
if err = p.Dial(ctx); err != nil {
|
|
logger.Fatal("failed to dial connection pool", zap.Error(err))
|
|
}
|
|
|
|
treePool, err := treepool.NewPool(prmTree)
|
|
if err != nil {
|
|
logger.Fatal("failed to create tree pool", zap.Error(err))
|
|
}
|
|
if err = treePool.Dial(ctx); err != nil {
|
|
logger.Fatal("failed to dial tree pool", zap.Error(err))
|
|
}
|
|
|
|
return p, treePool, key
|
|
}
|
|
|
|
func newPlacementPolicy(l *zap.Logger, v *viper.Viper) (*placementPolicy, error) {
|
|
policies := &placementPolicy{
|
|
regionMap: make(map[string]netmap.PlacementPolicy),
|
|
defaultCopiesNumbers: []uint32{handler.DefaultCopiesNumber},
|
|
}
|
|
|
|
policies.updateCopiesNumbers(l, v)
|
|
policies.updateDefaultCopiesNumbers(l, v)
|
|
|
|
return policies, policies.updatePolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile))
|
|
}
|
|
|
|
func (p *placementPolicy) DefaultPlacementPolicy() netmap.PlacementPolicy {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
return p.defaultPolicy
|
|
}
|
|
|
|
func (p *placementPolicy) PlacementPolicy(name string) (netmap.PlacementPolicy, bool) {
|
|
p.mu.RLock()
|
|
policy, ok := p.regionMap[name]
|
|
p.mu.RUnlock()
|
|
|
|
return policy, ok
|
|
}
|
|
|
|
func (p *placementPolicy) CopiesNumbers(locationConstraint string) ([]uint32, bool) {
|
|
p.mu.RLock()
|
|
copiesNumbers, ok := p.copiesNumbers[locationConstraint]
|
|
p.mu.RUnlock()
|
|
|
|
return copiesNumbers, ok
|
|
}
|
|
|
|
func (p *placementPolicy) DefaultCopiesNumbers() []uint32 {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
return p.defaultCopiesNumbers
|
|
}
|
|
|
|
func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) {
|
|
if err := p.updatePolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile)); err != nil {
|
|
l.Warn("policies won't be updated", zap.Error(err))
|
|
}
|
|
|
|
p.updateCopiesNumbers(l, v)
|
|
p.updateDefaultCopiesNumbers(l, v)
|
|
}
|
|
|
|
func (p *placementPolicy) updatePolicy(defaultPolicy string, regionPolicyFilepath string) error {
|
|
var defaultPlacementPolicy netmap.PlacementPolicy
|
|
if err := defaultPlacementPolicy.DecodeString(defaultPolicy); err != nil {
|
|
return fmt.Errorf("parse default policy '%s': %w", defaultPolicy, err)
|
|
}
|
|
|
|
regionPolicyMap, err := readRegionMap(regionPolicyFilepath)
|
|
if err != nil {
|
|
return fmt.Errorf("read region map file: %w", err)
|
|
}
|
|
|
|
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap))
|
|
for region, policy := range regionPolicyMap {
|
|
var pp netmap.PlacementPolicy
|
|
if err = pp.DecodeString(policy); err == nil {
|
|
regionMap[region] = pp
|
|
continue
|
|
}
|
|
|
|
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
|
|
regionMap[region] = pp
|
|
continue
|
|
}
|
|
|
|
return fmt.Errorf("parse region '%s' to policy mapping: %w", region, err)
|
|
}
|
|
|
|
p.mu.Lock()
|
|
p.defaultPolicy = defaultPlacementPolicy
|
|
p.regionMap = regionMap
|
|
p.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *placementPolicy) updateCopiesNumbers(l *zap.Logger, v *viper.Viper) {
|
|
if newCopiesNumbers, err := fetchCopiesNumbers(l, v); err != nil {
|
|
l.Warn("copies numbers won't be updated", zap.Error(err))
|
|
} else {
|
|
p.mu.Lock()
|
|
p.copiesNumbers = newCopiesNumbers
|
|
p.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (p *placementPolicy) updateDefaultCopiesNumbers(l *zap.Logger, v *viper.Viper) {
|
|
configuredValues, err := fetchDefaultCopiesNumbers(v)
|
|
|
|
if err == nil {
|
|
p.mu.Lock()
|
|
p.defaultCopiesNumbers = configuredValues
|
|
p.mu.Unlock()
|
|
l.Info("default copies numbers", zap.Uint32s("vector", p.defaultCopiesNumbers))
|
|
return
|
|
}
|
|
|
|
l.Error("cannot parse default copies numbers", zap.Error(err))
|
|
l.Warn("default copies numbers won't be updated", zap.Uint32s("current value", p.DefaultCopiesNumbers()))
|
|
}
|
|
|
|
func remove(list []string, element string) []string {
|
|
for i, item := range list {
|
|
if item == element {
|
|
return append(list[:i], list[i+1:]...)
|
|
}
|
|
}
|
|
return list
|
|
}
|
|
|
|
// Wait waits for an application to finish.
|
|
//
|
|
// Pre-logs a message about the launch of the application mentioning its
|
|
// version (version.Version) and its name (frostfs-s3-gw). At the end, it writes
|
|
// about the stop to the log.
|
|
func (a *App) Wait() {
|
|
a.log.Info("application started",
|
|
zap.String("name", "frostfs-s3-gw"),
|
|
zap.String("version", version.Version),
|
|
)
|
|
|
|
a.metrics.State().SetVersion(version.Version)
|
|
a.setHealthStatus()
|
|
|
|
<-a.webDone // wait for web-server to be stopped
|
|
|
|
a.log.Info("application finished")
|
|
}
|
|
|
|
func (a *App) setHealthStatus() {
|
|
a.metrics.State().SetHealth(metrics.HealthStatusReady)
|
|
}
|
|
|
|
// Serve runs HTTP server to handle S3 API requests.
|
|
func (a *App) Serve(ctx context.Context) {
|
|
// Attach S3 API:
|
|
domains := a.cfg.GetStringSlice(cfgListenDomains)
|
|
a.log.Info("fetch domains, prepare to use API", zap.Strings("domains", domains))
|
|
|
|
throttleOps := middleware.ThrottleOpts{
|
|
Limit: a.settings.maxClient.count,
|
|
BacklogTimeout: a.settings.maxClient.deadline,
|
|
}
|
|
|
|
chiRouter := chi.NewRouter()
|
|
api.AttachChi(chiRouter, domains, throttleOps, a.api, a.ctr, a.log, a.metrics)
|
|
|
|
// Use mux.Router as http.Handler
|
|
srv := new(http.Server)
|
|
srv.Handler = chiRouter
|
|
srv.ErrorLog = zap.NewStdLog(a.log)
|
|
|
|
a.startServices()
|
|
|
|
for i := range a.servers {
|
|
go func(i int) {
|
|
a.log.Info("starting server", zap.String("address", a.servers[i].Address()))
|
|
|
|
if err := srv.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
|
|
a.log.Fatal("listen and serve", zap.Error(err))
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, syscall.SIGHUP)
|
|
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
break LOOP
|
|
case <-sigs:
|
|
a.configReload(ctx)
|
|
}
|
|
}
|
|
|
|
ctx, cancel := shutdownContext()
|
|
defer cancel()
|
|
|
|
a.log.Info("stopping server", zap.Error(srv.Shutdown(ctx)))
|
|
|
|
a.metrics.Shutdown()
|
|
a.stopServices()
|
|
a.shutdownTracing()
|
|
|
|
close(a.webDone)
|
|
}
|
|
|
|
func shutdownContext() (context.Context, context.CancelFunc) {
|
|
return context.WithTimeout(context.Background(), defaultShutdownTimeout)
|
|
}
|
|
|
|
func (a *App) configReload(ctx context.Context) {
|
|
a.log.Info("SIGHUP config reload started")
|
|
|
|
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
|
a.log.Warn("failed to reload config because it's missed")
|
|
return
|
|
}
|
|
if err := readInConfig(a.cfg); err != nil {
|
|
a.log.Warn("failed to reload config", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if err := a.bucketResolver.UpdateResolvers(a.getResolverConfig()); err != nil {
|
|
a.log.Warn("failed to reload resolvers", zap.Error(err))
|
|
}
|
|
|
|
if err := a.updateServers(); err != nil {
|
|
a.log.Warn("failed to reload server parameters", zap.Error(err))
|
|
}
|
|
|
|
a.stopServices()
|
|
a.startServices()
|
|
|
|
a.updateSettings()
|
|
|
|
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
|
a.initTracing(ctx)
|
|
a.setHealthStatus()
|
|
|
|
a.log.Info("SIGHUP config reload completed")
|
|
}
|
|
|
|
func (a *App) updateSettings() {
|
|
if lvl, err := getLogLevel(a.cfg); err != nil {
|
|
a.log.Warn("log level won't be updated", zap.Error(err))
|
|
} else {
|
|
a.settings.logLevel.SetLevel(lvl)
|
|
}
|
|
|
|
a.settings.policies.update(a.log, a.cfg)
|
|
|
|
a.settings.xmlDecoder.UseDefaultNamespaceForCompleteMultipart(a.cfg.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload))
|
|
a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
|
|
}
|
|
|
|
func (a *App) startServices() {
|
|
a.services = a.services[:0]
|
|
|
|
pprofService := NewPprofService(a.cfg, a.log)
|
|
a.services = append(a.services, pprofService)
|
|
go pprofService.Start()
|
|
|
|
prometheusService := NewPrometheusService(a.cfg, a.log, a.metrics.Handler())
|
|
a.services = append(a.services, prometheusService)
|
|
go prometheusService.Start()
|
|
}
|
|
|
|
func (a *App) initServers(ctx context.Context) {
|
|
serversInfo := fetchServers(a.cfg)
|
|
|
|
a.servers = make([]Server, 0, len(serversInfo))
|
|
for _, serverInfo := range serversInfo {
|
|
fields := []zap.Field{
|
|
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
|
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
|
}
|
|
srv, err := newServer(ctx, serverInfo)
|
|
if err != nil {
|
|
a.log.Warn("failed to add server", append(fields, zap.Error(err))...)
|
|
continue
|
|
}
|
|
|
|
a.servers = append(a.servers, srv)
|
|
a.log.Info("add server", fields...)
|
|
}
|
|
|
|
if len(a.servers) == 0 {
|
|
a.log.Fatal("no healthy servers")
|
|
}
|
|
}
|
|
|
|
func (a *App) updateServers() error {
|
|
serversInfo := fetchServers(a.cfg)
|
|
|
|
var found bool
|
|
for _, serverInfo := range serversInfo {
|
|
index := a.serverIndex(serverInfo.Address)
|
|
if index == -1 {
|
|
continue
|
|
}
|
|
|
|
if serverInfo.TLS.Enabled {
|
|
if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
|
return fmt.Errorf("failed to update tls certs: %w", err)
|
|
}
|
|
}
|
|
found = true
|
|
}
|
|
|
|
if !found {
|
|
return fmt.Errorf("invalid servers configuration: no known server found")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *App) serverIndex(address string) int {
|
|
for i := range a.servers {
|
|
if a.servers[i].Address() == address {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func (a *App) stopServices() {
|
|
ctx, cancel := shutdownContext()
|
|
defer cancel()
|
|
|
|
for _, svc := range a.services {
|
|
svc.ShutDown(ctx)
|
|
}
|
|
}
|
|
|
|
func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options {
|
|
cfg := notifications.Options{}
|
|
cfg.URL = v.GetString(cfgNATSEndpoint)
|
|
cfg.Timeout = v.GetDuration(cfgNATSTimeout)
|
|
if cfg.Timeout <= 0 {
|
|
l.Error("invalid lifetime, using default value (in seconds)",
|
|
zap.String("parameter", cfgNATSTimeout),
|
|
zap.Duration("value in config", cfg.Timeout),
|
|
zap.Duration("default", notifications.DefaultTimeout))
|
|
cfg.Timeout = notifications.DefaultTimeout
|
|
}
|
|
cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile)
|
|
cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile)
|
|
cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles)
|
|
|
|
return &cfg
|
|
}
|
|
|
|
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
|
|
cacheCfg := layer.DefaultCachesConfigs(l)
|
|
|
|
cacheCfg.Objects.Lifetime = getLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime)
|
|
cacheCfg.Objects.Size = getSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size)
|
|
|
|
cacheCfg.ObjectsList.Lifetime = getLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
|
|
cacheCfg.ObjectsList.Size = getSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
|
|
|
|
cacheCfg.Buckets.Lifetime = getLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
|
|
cacheCfg.Buckets.Size = getSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)
|
|
|
|
cacheCfg.Names.Lifetime = getLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime)
|
|
cacheCfg.Names.Size = getSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size)
|
|
|
|
cacheCfg.System.Lifetime = getLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime)
|
|
cacheCfg.System.Size = getSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size)
|
|
|
|
cacheCfg.AccessControl.Lifetime = getLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
|
|
cacheCfg.AccessControl.Size = getSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
|
|
|
|
return cacheCfg
|
|
}
|
|
|
|
func getLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
|
if v.IsSet(cfgEntry) {
|
|
lifetime := v.GetDuration(cfgEntry)
|
|
if lifetime <= 0 {
|
|
l.Error("invalid lifetime, using default value (in seconds)",
|
|
zap.String("parameter", cfgEntry),
|
|
zap.Duration("value in config", lifetime),
|
|
zap.Duration("default", defaultValue))
|
|
} else {
|
|
return lifetime
|
|
}
|
|
}
|
|
return defaultValue
|
|
}
|
|
|
|
func getSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
|
|
if v.IsSet(cfgEntry) {
|
|
size := v.GetInt(cfgEntry)
|
|
if size <= 0 {
|
|
l.Error("invalid cache size, using default value",
|
|
zap.String("parameter", cfgEntry),
|
|
zap.Int("value in config", size),
|
|
zap.Int("default", defaultValue))
|
|
} else {
|
|
return size
|
|
}
|
|
}
|
|
return defaultValue
|
|
}
|
|
|
|
func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
|
cacheCfg := cache.DefaultAccessBoxConfig(l)
|
|
|
|
cacheCfg.Lifetime = getLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime)
|
|
cacheCfg.Size = getSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size)
|
|
|
|
return cacheCfg
|
|
}
|
|
|
|
func (a *App) initHandler() {
|
|
cfg := &handler.Config{
|
|
Policy: a.settings.policies,
|
|
DefaultMaxAge: handler.DefaultMaxAge,
|
|
NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS),
|
|
XMLDecoder: a.settings.xmlDecoder,
|
|
}
|
|
|
|
if a.cfg.IsSet(cfgDefaultMaxAge) {
|
|
defaultMaxAge := a.cfg.GetInt(cfgDefaultMaxAge)
|
|
|
|
if defaultMaxAge <= 0 && defaultMaxAge != -1 {
|
|
a.log.Fatal("invalid defaultMaxAge",
|
|
zap.String("parameter", cfgDefaultMaxAge),
|
|
zap.String("value in config", strconv.Itoa(defaultMaxAge)))
|
|
}
|
|
cfg.DefaultMaxAge = defaultMaxAge
|
|
}
|
|
|
|
cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketAllow)
|
|
cfg.IsResolveListAllow = len(cfg.ResolveZoneList) > 0
|
|
if !cfg.IsResolveListAllow {
|
|
cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketDeny)
|
|
}
|
|
|
|
cfg.CompleteMultipartKeepalive = a.cfg.GetDuration(cfgKludgeCompleteMultipartUploadKeepalive)
|
|
cfg.Kludge = a.settings
|
|
|
|
var err error
|
|
a.api, err = handler.New(a.log, a.obj, a.nc, cfg)
|
|
if err != nil {
|
|
a.log.Fatal("could not initialize API handler", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func readRegionMap(filePath string) (map[string]string, error) {
|
|
regionMap := make(map[string]string)
|
|
|
|
if filePath == "" {
|
|
return regionMap, nil
|
|
}
|
|
|
|
data, err := os.ReadFile(filePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("coudln't read file '%s'", filePath)
|
|
}
|
|
|
|
if err = json.Unmarshal(data, ®ionMap); err != nil {
|
|
return nil, fmt.Errorf("unmarshal policies: %w", err)
|
|
}
|
|
|
|
if _, ok := regionMap[api.DefaultLocationConstraint]; ok {
|
|
return nil, fmt.Errorf("config overrides %s location constraint", api.DefaultLocationConstraint)
|
|
}
|
|
|
|
return regionMap, nil
|
|
}
|