frostfs-s3-gw/cmd/s3-gw/app.go
Alex Vanin 8151753eeb [#60] Use periodic white space XML writer in Complete Multipart Upload
This mechanism is used by Amazon S3 to keep client's
connection alive while object is being constructed from
the upload parts.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-03-20 13:34:22 +03:00

687 lines
19 KiB
Go

package main
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/wallet"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/xml"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/gorilla/mux"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/spf13/viper"
"go.uber.org/zap"
)
type (
// App is the main application structure.
App struct {
ctr auth.Center
log *zap.Logger
cfg *viper.Viper
pool *pool.Pool
key *keys.PrivateKey
nc *notifications.Controller
obj layer.Client
api api.Handler
servers []Server
metrics *metrics.AppMetrics
bucketResolver *resolver.BucketResolver
services []*Service
settings *appSettings
maxClients api.MaxClients
webDone chan struct{}
wrkDone chan struct{}
}
appSettings struct {
logLevel zap.AtomicLevel
policies *placementPolicy
xmlDecoder *xml.DecoderProvider
}
Logger struct {
logger *zap.Logger
lvl zap.AtomicLevel
}
placementPolicy struct {
mu sync.RWMutex
defaultPolicy netmap.PlacementPolicy
regionMap map[string]netmap.PlacementPolicy
}
)
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
conns, key := getPool(ctx, log.logger, v)
// prepare auth center
ctr := auth.New(frostfs.NewAuthmateFrostFS(conns), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, log.logger))
app := &App{
ctr: ctr,
log: log.logger,
cfg: v,
pool: conns,
key: key,
webDone: make(chan struct{}, 1),
wrkDone: make(chan struct{}, 1),
maxClients: newMaxClients(v),
settings: newAppSettings(log, v),
}
app.init(ctx)
return app
}
func (a *App) init(ctx context.Context) {
a.initAPI(ctx)
a.initMetrics()
a.initServers(ctx)
}
func (a *App) initLayer(ctx context.Context) {
a.initResolver()
treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint)
treeService, err := frostfs.NewTreeClient(ctx, treeServiceEndpoint, a.key)
if err != nil {
a.log.Fatal("failed to create tree service", zap.Error(err))
}
a.log.Info("init tree service", zap.String("endpoint", treeServiceEndpoint))
// prepare random key for anonymous requests
randomKey, err := keys.NewPrivateKey()
if err != nil {
a.log.Fatal("couldn't generate random key", zap.Error(err))
}
layerCfg := &layer.Config{
Caches: getCacheOptions(a.cfg, a.log),
AnonKey: layer.AnonymousKey{
Key: randomKey,
},
Resolver: a.bucketResolver,
TreeService: treeService,
}
// prepare object layer
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool), layerCfg)
if a.cfg.GetBool(cfgEnableNATS) {
nopts := getNotificationsOptions(a.cfg, a.log)
a.nc, err = notifications.NewController(nopts, a.log)
if err != nil {
a.log.Fatal("failed to enable notifications", zap.Error(err))
}
if err = a.obj.Initialize(ctx, a.nc); err != nil {
a.log.Fatal("couldn't initialize layer", zap.Error(err))
}
}
}
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
policies, err := newPlacementPolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile))
if err != nil {
log.logger.Fatal("failed to create new policy mapping", zap.Error(err))
}
return &appSettings{
logLevel: log.lvl,
policies: policies,
xmlDecoder: xml.NewDecoderProvider(v.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload)),
}
}
func getDefaultPolicyValue(v *viper.Viper) string {
defaultPolicyStr := handler.DefaultPolicy
if v.IsSet(cfgPolicyDefault) {
defaultPolicyStr = v.GetString(cfgPolicyDefault)
}
return defaultPolicyStr
}
func (a *App) initAPI(ctx context.Context) {
a.initLayer(ctx)
a.initHandler()
}
func (a *App) initMetrics() {
a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled))
}
func (a *App) initResolver() {
var err error
a.bucketResolver, err = resolver.NewBucketResolver(a.getResolverConfig())
if err != nil {
a.log.Fatal("failed to create resolver", zap.Error(err))
}
}
func (a *App) getResolverConfig() ([]string, *resolver.Config) {
resolveCfg := &resolver.Config{
FrostFS: frostfs.NewResolverFrostFS(a.pool),
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
}
order := a.cfg.GetStringSlice(cfgResolveOrder)
if resolveCfg.RPCAddress == "" {
order = remove(order, resolver.NNSResolver)
a.log.Warn(fmt.Sprintf("resolver '%s' won't be used since '%s' isn't provided", resolver.NNSResolver, cfgRPCEndpoint))
}
if len(order) == 0 {
a.log.Info("container resolver will be disabled because of resolvers 'resolver_order' is empty")
}
return order, resolveCfg
}
func newMaxClients(cfg *viper.Viper) api.MaxClients {
maxClientsCount := cfg.GetInt(cfgMaxClientsCount)
if maxClientsCount <= 0 {
maxClientsCount = defaultMaxClientsCount
}
maxClientsDeadline := cfg.GetDuration(cfgMaxClientsDeadline)
if maxClientsDeadline <= 0 {
maxClientsDeadline = defaultMaxClientsDeadline
}
return api.NewMaxClientsMiddleware(maxClientsCount, maxClientsDeadline)
}
func getPool(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *keys.PrivateKey) {
var prm pool.InitParameters
password := wallet.GetPassword(cfg, cfgWalletPassphrase)
key, err := wallet.GetKeyFromPath(cfg.GetString(cfgWalletPath), cfg.GetString(cfgWalletAddress), password)
if err != nil {
logger.Fatal("could not load FrostFS private key", zap.Error(err))
}
prm.SetKey(&key.PrivateKey)
logger.Info("using credentials", zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
for _, peer := range fetchPeers(logger, cfg) {
prm.AddNode(peer)
}
connTimeout := cfg.GetDuration(cfgConnectTimeout)
if connTimeout <= 0 {
connTimeout = defaultConnectTimeout
}
prm.SetNodeDialTimeout(connTimeout)
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
if streamTimeout <= 0 {
streamTimeout = defaultStreamTimeout
}
prm.SetNodeStreamTimeout(streamTimeout)
healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout)
if healthCheckTimeout <= 0 {
healthCheckTimeout = defaultHealthcheckTimeout
}
prm.SetHealthcheckTimeout(healthCheckTimeout)
rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval)
if rebalanceInterval <= 0 {
rebalanceInterval = defaultRebalanceInterval
}
prm.SetClientRebalanceInterval(rebalanceInterval)
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
if errorThreshold <= 0 {
errorThreshold = defaultPoolErrorThreshold
}
prm.SetErrorThreshold(errorThreshold)
prm.SetLogger(logger)
p, err := pool.NewPool(prm)
if err != nil {
logger.Fatal("failed to create connection pool", zap.Error(err))
}
if err = p.Dial(ctx); err != nil {
logger.Fatal("failed to dial connection pool", zap.Error(err))
}
return p, key
}
func newPlacementPolicy(defaultPolicy string, regionPolicyFilepath string) (*placementPolicy, error) {
policies := &placementPolicy{
regionMap: make(map[string]netmap.PlacementPolicy),
}
return policies, policies.update(defaultPolicy, regionPolicyFilepath)
}
func (p *placementPolicy) Default() netmap.PlacementPolicy {
p.mu.RLock()
defer p.mu.RUnlock()
return p.defaultPolicy
}
func (p *placementPolicy) Get(name string) (netmap.PlacementPolicy, bool) {
p.mu.RLock()
policy, ok := p.regionMap[name]
p.mu.RUnlock()
return policy, ok
}
func (p *placementPolicy) update(defaultPolicy string, regionPolicyFilepath string) error {
var defaultPlacementPolicy netmap.PlacementPolicy
if err := defaultPlacementPolicy.DecodeString(defaultPolicy); err != nil {
return fmt.Errorf("parse default policy '%s': %w", defaultPolicy, err)
}
regionPolicyMap, err := readRegionMap(regionPolicyFilepath)
if err != nil {
return fmt.Errorf("read region map file: %w", err)
}
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap))
for region, policy := range regionPolicyMap {
var pp netmap.PlacementPolicy
if err = pp.DecodeString(policy); err == nil {
regionMap[region] = pp
continue
}
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
regionMap[region] = pp
continue
}
return fmt.Errorf("parse region '%s' to policy mapping: %w", region, err)
}
p.mu.Lock()
p.defaultPolicy = defaultPlacementPolicy
p.regionMap = regionMap
p.mu.Unlock()
return nil
}
func remove(list []string, element string) []string {
for i, item := range list {
if item == element {
return append(list[:i], list[i+1:]...)
}
}
return list
}
// Wait waits for an application to finish.
//
// Pre-logs a message about the launch of the application mentioning its
// version (version.Version) and its name (frostfs-s3-gw). At the end, it writes
// about the stop to the log.
func (a *App) Wait() {
a.log.Info("application started",
zap.String("name", "frostfs-s3-gw"),
zap.String("version", version.Version),
)
a.setHealthStatus()
<-a.webDone // wait for web-server to be stopped
a.log.Info("application finished")
}
func (a *App) setHealthStatus() {
a.metrics.SetHealth(1)
}
// Serve runs HTTP server to handle S3 API requests.
func (a *App) Serve(ctx context.Context) {
// Attach S3 API:
domains := a.cfg.GetStringSlice(cfgListenDomains)
a.log.Info("fetch domains, prepare to use API", zap.Strings("domains", domains))
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
api.Attach(router, domains, a.maxClients, a.api, a.ctr, a.log, a.metrics)
// Use mux.Router as http.Handler
srv := new(http.Server)
srv.Handler = router
srv.ErrorLog = zap.NewStdLog(a.log)
a.startServices()
for i := range a.servers {
go func(i int) {
a.log.Info("starting server", zap.String("address", a.servers[i].Address()))
if err := srv.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
a.log.Fatal("listen and serve", zap.Error(err))
}
}(i)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case <-sigs:
a.configReload()
}
}
ctx, cancel := shutdownContext()
defer cancel()
a.log.Info("stopping server", zap.Error(srv.Shutdown(ctx)))
a.metrics.Shutdown()
a.stopServices()
close(a.webDone)
}
func shutdownContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultShutdownTimeout)
}
func (a *App) configReload() {
a.log.Info("SIGHUP config reload started")
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
a.log.Warn("failed to reload config because it's missed")
return
}
if err := readInConfig(a.cfg); err != nil {
a.log.Warn("failed to reload config", zap.Error(err))
return
}
if err := a.bucketResolver.UpdateResolvers(a.getResolverConfig()); err != nil {
a.log.Warn("failed to reload resolvers", zap.Error(err))
}
if err := a.updateServers(); err != nil {
a.log.Warn("failed to reload server parameters", zap.Error(err))
}
a.stopServices()
a.startServices()
a.updateSettings()
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.setHealthStatus()
a.log.Info("SIGHUP config reload completed")
}
func (a *App) updateSettings() {
if lvl, err := getLogLevel(a.cfg); err != nil {
a.log.Warn("log level won't be updated", zap.Error(err))
} else {
a.settings.logLevel.SetLevel(lvl)
}
if err := a.settings.policies.update(getDefaultPolicyValue(a.cfg), a.cfg.GetString(cfgPolicyRegionMapFile)); err != nil {
a.log.Warn("policies won't be updated", zap.Error(err))
}
a.settings.xmlDecoder.UseDefaultNamespaceForCompleteMultipart(a.cfg.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload))
}
func (a *App) startServices() {
a.services = a.services[:0]
pprofService := NewPprofService(a.cfg, a.log)
a.services = append(a.services, pprofService)
go pprofService.Start()
prometheusService := NewPrometheusService(a.cfg, a.log, a.metrics.Handler())
a.services = append(a.services, prometheusService)
go prometheusService.Start()
}
func (a *App) initServers(ctx context.Context) {
serversInfo := fetchServers(a.cfg)
a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.log.Warn("failed to add server", append(fields, zap.Error(err))...)
continue
}
a.servers = append(a.servers, srv)
a.log.Info("add server", fields...)
}
if len(a.servers) == 0 {
a.log.Fatal("no healthy servers")
}
}
func (a *App) updateServers() error {
serversInfo := fetchServers(a.cfg)
var found bool
for _, serverInfo := range serversInfo {
index := a.serverIndex(serverInfo.Address)
if index == -1 {
continue
}
if serverInfo.TLS.Enabled {
if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
return fmt.Errorf("failed to update tls certs: %w", err)
}
}
found = true
}
if !found {
return fmt.Errorf("invalid servers configuration: no known server found")
}
return nil
}
func (a *App) serverIndex(address string) int {
for i := range a.servers {
if a.servers[i].Address() == address {
return i
}
}
return -1
}
func (a *App) stopServices() {
ctx, cancel := shutdownContext()
defer cancel()
for _, svc := range a.services {
svc.ShutDown(ctx)
}
}
func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options {
cfg := notifications.Options{}
cfg.URL = v.GetString(cfgNATSEndpoint)
cfg.Timeout = v.GetDuration(cfgNATSTimeout)
if cfg.Timeout <= 0 {
l.Error("invalid lifetime, using default value (in seconds)",
zap.String("parameter", cfgNATSTimeout),
zap.Duration("value in config", cfg.Timeout),
zap.Duration("default", notifications.DefaultTimeout))
cfg.Timeout = notifications.DefaultTimeout
}
cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile)
cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile)
cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles)
return &cfg
}
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg := layer.DefaultCachesConfigs(l)
cacheCfg.Objects.Lifetime = getLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime)
cacheCfg.Objects.Size = getSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size)
cacheCfg.ObjectsList.Lifetime = getLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
cacheCfg.ObjectsList.Size = getSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
cacheCfg.Buckets.Lifetime = getLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
cacheCfg.Buckets.Size = getSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)
cacheCfg.Names.Lifetime = getLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime)
cacheCfg.Names.Size = getSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size)
cacheCfg.System.Lifetime = getLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime)
cacheCfg.System.Size = getSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size)
cacheCfg.AccessControl.Lifetime = getLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
cacheCfg.AccessControl.Size = getSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
return cacheCfg
}
func getLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
if v.IsSet(cfgEntry) {
lifetime := v.GetDuration(cfgEntry)
if lifetime <= 0 {
l.Error("invalid lifetime, using default value (in seconds)",
zap.String("parameter", cfgEntry),
zap.Duration("value in config", lifetime),
zap.Duration("default", defaultValue))
} else {
return lifetime
}
}
return defaultValue
}
func getSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
if v.IsSet(cfgEntry) {
size := v.GetInt(cfgEntry)
if size <= 0 {
l.Error("invalid cache size, using default value",
zap.String("parameter", cfgEntry),
zap.Int("value in config", size),
zap.Int("default", defaultValue))
} else {
return size
}
}
return defaultValue
}
func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
cacheCfg := cache.DefaultAccessBoxConfig(l)
cacheCfg.Lifetime = getLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime)
cacheCfg.Size = getSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size)
return cacheCfg
}
func (a *App) initHandler() {
cfg := &handler.Config{
Policy: a.settings.policies,
DefaultMaxAge: handler.DefaultMaxAge,
NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS),
CopiesNumber: handler.DefaultCopiesNumber,
XMLDecoder: a.settings.xmlDecoder,
}
if a.cfg.IsSet(cfgDefaultMaxAge) {
defaultMaxAge := a.cfg.GetInt(cfgDefaultMaxAge)
if defaultMaxAge <= 0 && defaultMaxAge != -1 {
a.log.Fatal("invalid defaultMaxAge",
zap.String("parameter", cfgDefaultMaxAge),
zap.String("value in config", strconv.Itoa(defaultMaxAge)))
}
cfg.DefaultMaxAge = defaultMaxAge
}
if val := a.cfg.GetUint32(cfgSetCopiesNumber); val > 0 {
cfg.CopiesNumber = val
}
cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketAllow)
cfg.IsResolveListAllow = len(cfg.ResolveZoneList) > 0
if !cfg.IsResolveListAllow {
cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketDeny)
}
cfg.CompleteMultipartKeepalive = a.cfg.GetDuration(cfgKludgeCompleteMultipartUploadKeepalive)
var err error
a.api, err = handler.New(a.log, a.obj, a.nc, cfg)
if err != nil {
a.log.Fatal("could not initialize API handler", zap.Error(err))
}
}
func readRegionMap(filePath string) (map[string]string, error) {
regionMap := make(map[string]string)
if filePath == "" {
return regionMap, nil
}
data, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("coudln't read file '%s'", filePath)
}
if err = json.Unmarshal(data, &regionMap); err != nil {
return nil, fmt.Errorf("unmarshal policies: %w", err)
}
if _, ok := regionMap[api.DefaultLocationConstraint]; ok {
return nil, fmt.Errorf("config overrides %s location constraint", api.DefaultLocationConstraint)
}
return regionMap, nil
}