frostfs-s3-gw/cmd/s3-gw/app.go
Pavel Pogodaev d986e74897 [#147] Add Kludge profiles
Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
2024-12-13 11:25:07 +00:00

1274 lines
35 KiB
Go

package main
import (
"context"
"crypto/x509"
"encoding/hex"
"encoding/xml"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-contract/commonclient"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
s3middleware "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/frostfsid"
ffidcontract "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/frostfsid/contract"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/policy"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/policy/contract"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
internalnet "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/wallet"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
v2container "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/go-chi/chi/v5/middleware"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/text/encoding/ianaindex"
"google.golang.org/grpc"
)
const awsDefaultNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
type (
// App is the main application structure.
App struct {
ctr s3middleware.Center
log *zap.Logger
cfg *viper.Viper
pool *pool.Pool
treePool *treepool.Pool
key *keys.PrivateKey
obj *layer.Layer
api api.Handler
frostfsid *frostfsid.FrostFSID
policyStorage *policy.Storage
servers []Server
unbindServers []ServerInfo
mu sync.RWMutex
metrics *metrics.AppMetrics
bucketResolver *resolver.BucketResolver
services []*Service
settings *appSettings
loggerSettings *loggerSettings
webDone chan struct{}
wrkDone chan struct{}
}
loggerSettings struct {
mu sync.RWMutex
appMetrics *metrics.AppMetrics
}
appSettings struct {
logLevel zap.AtomicLevel
httpLogging s3middleware.LogHTTPConfig
maxClient maxClientsConfig
defaultMaxAge int
reconnectInterval time.Duration
resolveZoneList []string
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
frostfsidValidation bool
accessbox *cid.ID
dialerSource *internalnet.DialerSource
workerPoolSize int
mu sync.RWMutex
namespaces Namespaces
defaultXMLNS bool
bypassContentEncodingInChunks bool
kludgeProfiles map[string]*KludgeParams
clientCut bool
maxBufferSizeForPut uint64
md5Enabled bool
namespaceHeader string
defaultNamespaces []string
policyDenyByDefault bool
sourceIPHeader string
retryMaxAttempts int
domains []string
vhsEnabled bool
vhsHeader string
servernameHeader string
vhsNamespacesEnabled map[string]bool
retryMaxBackoff time.Duration
retryStrategy handler.RetryStrategy
tombstoneMembersSize int
tombstoneLifetime uint64
tlsTerminationHeader string
}
maxClientsConfig struct {
deadline time.Duration
count int
}
Logger struct {
logger *zap.Logger
lvl zap.AtomicLevel
}
)
func (s *loggerSettings) DroppedLogsInc() {
s.mu.RLock()
defer s.mu.RUnlock()
if s.appMetrics != nil {
s.appMetrics.Statistic().DroppedLogsInc()
}
}
func (s *loggerSettings) setMetrics(appMetrics *metrics.AppMetrics) {
s.mu.Lock()
defer s.mu.Unlock()
s.appMetrics = appMetrics
}
func newApp(ctx context.Context, v *viper.Viper) *App {
logSettings := &loggerSettings{}
log := pickLogger(v, logSettings)
settings := newAppSettings(log, v)
objPool, treePool, key := getPools(ctx, log.logger, v, settings.dialerSource)
app := &App{
log: log.logger,
cfg: v,
pool: objPool,
treePool: treePool,
key: key,
webDone: make(chan struct{}, 1),
wrkDone: make(chan struct{}, 1),
settings: settings,
loggerSettings: logSettings,
}
app.init(ctx)
return app
}
func (a *App) init(ctx context.Context) {
a.initResolver()
a.initAuthCenter(ctx)
a.setRuntimeParameters()
a.initFrostfsID(ctx)
a.initPolicyStorage(ctx)
a.initAPI(ctx)
a.initMetrics()
a.initServers(ctx)
a.initTracing(ctx)
}
func (a *App) initAuthCenter(ctx context.Context) {
if a.cfg.IsSet(cfgContainersAccessBox) {
cnrID, err := a.resolveContainerID(ctx, cfgContainersAccessBox)
if err != nil {
a.log.Fatal(logs.CouldNotFetchAccessBoxContainerInfo, zap.Error(err))
}
a.settings.accessbox = &cnrID
}
cfg := tokens.Config{
FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(a.pool, a.key), a.log),
Key: a.key,
CacheConfig: getAccessBoxCacheConfig(a.cfg, a.log),
RemovingCheckAfterDurations: fetchRemovingCheckInterval(a.cfg, a.log),
}
a.ctr = auth.New(tokens.New(cfg), a.cfg.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), a.settings)
}
func (a *App) initLayer(ctx context.Context) {
// prepare random key for anonymous requests
randomKey, err := keys.NewPrivateKey()
if err != nil {
a.log.Fatal(logs.CouldntGenerateRandomKey, zap.Error(err))
}
var gateOwner user.ID
user.IDFromKey(&gateOwner, a.key.PrivateKey.PublicKey)
var corsCnrInfo *data.BucketInfo
if a.cfg.IsSet(cfgContainersCORS) {
corsCnrInfo, err = a.fetchContainerInfo(ctx, cfgContainersCORS)
if err != nil {
a.log.Fatal(logs.CouldNotFetchCORSContainerInfo, zap.Error(err))
}
}
var lifecycleCnrInfo *data.BucketInfo
if a.cfg.IsSet(cfgContainersLifecycle) {
lifecycleCnrInfo, err = a.fetchContainerInfo(ctx, cfgContainersLifecycle)
if err != nil {
a.log.Fatal(logs.CouldNotFetchLifecycleContainerInfo, zap.Error(err))
}
}
layerCfg := &layer.Config{
Cache: layer.NewCache(getCacheOptions(a.cfg, a.log)),
AnonKey: layer.AnonymousKey{
Key: randomKey,
},
GateOwner: gateOwner,
Resolver: a.bucketResolver,
TreeService: tree.NewTree(services.NewPoolWrapper(a.treePool), a.log),
Features: a.settings,
GateKey: a.key,
CORSCnrInfo: corsCnrInfo,
LifecycleCnrInfo: lifecycleCnrInfo,
WorkerPool: a.initWorkerPool(),
}
// prepare object layer
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
}
func (a *App) initWorkerPool() *ants.Pool {
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
if err != nil {
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
}
return workerPool
}
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
settings := &appSettings{
logLevel: log.lvl,
httpLogging: s3middleware.LogHTTPConfig{},
maxClient: newMaxClients(v),
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
reconnectInterval: fetchReconnectInterval(v),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
dialerSource: getDialerSource(log.logger, v),
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
}
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
settings.isResolveListAllow = len(settings.resolveZoneList) > 0
if !settings.isResolveListAllow {
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketDeny)
}
settings.update(v, log.logger)
return settings
}
func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
namespaceHeader := v.GetString(cfgResolveNamespaceHeader)
nsConfig, defaultNamespaces := fetchNamespacesConfig(log, v)
vhsNamespacesEnabled := s.prepareVHSNamespaces(v, log, defaultNamespaces)
defaultXMLNS := v.GetBool(cfgKludgeUseDefaultXMLNS)
bypassContentEncodingInChunks := v.GetBool(cfgKludgeBypassContentEncodingCheckInChunks)
kludgeProfiles := fetchKludgeProfiles(v)
clientCut := v.GetBool(cfgClientCut)
maxBufferSizeForPut := v.GetUint64(cfgBufferMaxSizeForPut)
md5Enabled := v.GetBool(cfgMD5Enabled)
policyDenyByDefault := v.GetBool(cfgPolicyDenyByDefault)
sourceIPHeader := v.GetString(cfgSourceIPHeader)
retryMaxAttempts := fetchRetryMaxAttempts(v)
retryMaxBackoff := fetchRetryMaxBackoff(v)
retryStrategy := fetchRetryStrategy(v)
domains := fetchDomains(v, log)
vhsEnabled := v.GetBool(cfgVHSEnabled)
vhsHeader := v.GetString(cfgVHSHeader)
servernameHeader := v.GetString(cfgServernameHeader)
httpLoggingEnabled := v.GetBool(cfgHTTPLoggingEnabled)
httpLoggingMaxBody := v.GetInt64(cfgHTTPLoggingMaxBody)
httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize)
httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination)
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
tombstoneMembersSize := fetchTombstoneMembersSize(v)
tombstoneLifetime := fetchTombstoneLifetime(v)
tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader)
s.mu.Lock()
defer s.mu.Unlock()
s.httpLogging.Enabled = httpLoggingEnabled
s.httpLogging.MaxBody = httpLoggingMaxBody
s.httpLogging.MaxLogSize = httpLoggingMaxLogSize
s.httpLogging.OutputPath = httpLoggingOutputPath
s.httpLogging.UseGzip = httpLoggingUseGzip
s.httpLogging.InitHTTPLogger(log)
s.namespaceHeader = namespaceHeader
s.defaultNamespaces = defaultNamespaces
s.namespaces = nsConfig.Namespaces
s.defaultXMLNS = defaultXMLNS
s.bypassContentEncodingInChunks = bypassContentEncodingInChunks
s.kludgeProfiles = kludgeProfiles
s.clientCut = clientCut
s.maxBufferSizeForPut = maxBufferSizeForPut
s.md5Enabled = md5Enabled
s.policyDenyByDefault = policyDenyByDefault
s.sourceIPHeader = sourceIPHeader
s.retryMaxAttempts = retryMaxAttempts
s.retryMaxBackoff = retryMaxBackoff
s.retryStrategy = retryStrategy
s.domains = domains
s.vhsEnabled = vhsEnabled
s.vhsHeader = vhsHeader
s.servernameHeader = servernameHeader
s.vhsNamespacesEnabled = vhsNamespacesEnabled
s.tombstoneMembersSize = tombstoneMembersSize
s.tombstoneLifetime = tombstoneLifetime
s.tlsTerminationHeader = tlsTerminationHeader
}
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
nsMap := fetchVHSNamespaces(v, log)
vhsNamespaces := make(map[string]bool, len(nsMap))
for ns, flag := range nsMap {
if slices.Contains(defaultNamespaces, ns) {
ns = defaultNamespace
}
vhsNamespaces[ns] = flag
}
return vhsNamespaces
}
func (s *appSettings) Domains() []string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.domains
}
func (s *appSettings) GlobalVHS() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.vhsEnabled
}
func (s *appSettings) VHSHeader() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.vhsHeader
}
func (s *appSettings) ServernameHeader() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.servernameHeader
}
func (s *appSettings) VHSNamespacesEnabled() map[string]bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.vhsNamespacesEnabled
}
func (s *appSettings) BypassContentEncodingInChunks(agent string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
profiles := s.kludgeProfiles
for p := range profiles {
if strings.Contains(agent, p) {
return profiles[p].BypassContentEncodingCheckInChunks
}
}
return s.bypassContentEncodingInChunks
}
func (s *appSettings) ClientCut() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.clientCut
}
func (s *appSettings) BufferMaxSizeForPut() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.maxBufferSizeForPut
}
func (s *appSettings) DefaultPlacementPolicy(namespace string) netmap.PlacementPolicy {
s.mu.RLock()
defer s.mu.RUnlock()
return s.namespaces[namespace].LocationConstraints[defaultConstraintName]
}
func (s *appSettings) PlacementPolicy(namespace, constraint string) (netmap.PlacementPolicy, bool) {
s.mu.RLock()
placementPolicy, ok := s.namespaces[namespace].LocationConstraints[constraint]
s.mu.RUnlock()
return placementPolicy, ok
}
func (s *appSettings) CopiesNumbers(namespace, constraint string) ([]uint32, bool) {
s.mu.RLock()
copiesNumbers, ok := s.namespaces[namespace].CopiesNumbers[constraint]
s.mu.RUnlock()
return copiesNumbers, ok
}
func (s *appSettings) DefaultCopiesNumbers(namespace string) []uint32 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.namespaces[namespace].CopiesNumbers[defaultConstraintName]
}
func (s *appSettings) LogHTTPConfig() s3middleware.LogHTTPConfig {
s.mu.RLock()
defer s.mu.RUnlock()
return s.httpLogging
}
func (s *appSettings) NewXMLDecoder(r io.Reader, agent string) *xml.Decoder {
dec := xml.NewDecoder(r)
dec.CharsetReader = func(charset string, reader io.Reader) (io.Reader, error) {
enc, err := ianaindex.IANA.Encoding(charset)
if err != nil {
return nil, fmt.Errorf("charset %s: %w", charset, err)
}
return enc.NewDecoder().Reader(reader), nil
}
s.mu.RLock()
defer s.mu.RUnlock()
for p := range s.kludgeProfiles {
if strings.Contains(agent, p) {
if s.kludgeProfiles[p].UseDefaultXMLNS {
dec.DefaultSpace = awsDefaultNamespace
}
return dec
}
}
if s.defaultXMLNS {
dec.DefaultSpace = awsDefaultNamespace
}
return dec
}
func (s *appSettings) DefaultMaxAge() int {
return s.defaultMaxAge
}
func (s *appSettings) ResolveZoneList() []string {
return s.resolveZoneList
}
func (s *appSettings) IsResolveListAllow() bool {
return s.isResolveListAllow
}
func (s *appSettings) MD5Enabled() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.md5Enabled
}
func (s *appSettings) NamespaceHeader() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.namespaceHeader
}
func (s *appSettings) FormContainerZone(ns string) string {
if len(ns) == 0 {
return v2container.SysAttributeZoneDefault
}
return ns + ".ns"
}
func (s *appSettings) isDefaultNamespace(ns string) bool {
s.mu.RLock()
namespaces := s.defaultNamespaces
s.mu.RUnlock()
return slices.Contains(namespaces, ns)
}
func (s *appSettings) ResolveNamespaceAlias(namespace string) string {
if s.isDefaultNamespace(namespace) {
return defaultNamespace
}
return namespace
}
func (s *appSettings) PolicyDenyByDefault() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.policyDenyByDefault
}
func (s *appSettings) SourceIPHeader() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.sourceIPHeader
}
func (s *appSettings) RetryMaxAttempts() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.retryMaxAttempts
}
func (s *appSettings) RetryMaxBackoff() time.Duration {
s.mu.RLock()
defer s.mu.RUnlock()
return s.retryMaxBackoff
}
func (s *appSettings) RetryStrategy() handler.RetryStrategy {
s.mu.RLock()
defer s.mu.RUnlock()
return s.retryStrategy
}
func (s *appSettings) TLSTerminationHeader() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.tlsTerminationHeader
}
func (s *appSettings) AccessBoxContainer() (cid.ID, bool) {
if s.accessbox != nil {
return *s.accessbox, true
}
return cid.ID{}, false
}
func (s *appSettings) TombstoneMembersSize() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.tombstoneMembersSize
}
func (s *appSettings) TombstoneLifetime() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.tombstoneLifetime
}
func (a *App) initAPI(ctx context.Context) {
a.initLayer(ctx)
a.initHandler()
}
func (a *App) initMetrics() {
cfg := metrics.AppMetricsConfig{
Logger: a.log,
PoolStatistics: frostfs.NewPoolStatistic(a.pool),
TreeStatistic: a.treePool,
Enabled: a.cfg.GetBool(cfgPrometheusEnabled),
}
a.metrics = metrics.NewAppMetrics(cfg)
a.metrics.State().SetHealth(metrics.HealthStatusStarting)
a.loggerSettings.setMetrics(a.metrics)
}
func (a *App) initFrostfsID(ctx context.Context) {
cli, err := ffidcontract.New(ctx, ffidcontract.Config{
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
Contract: a.cfg.GetString(cfgFrostfsIDContract),
ProxyContract: a.cfg.GetString(cfgProxyContract),
Key: a.key,
Waiter: commonclient.WaiterOptions{
IgnoreAlreadyExistsError: false,
VerifyExecResults: true,
},
})
if err != nil {
a.log.Fatal(logs.InitFrostfsIDContractFailed, zap.Error(err))
}
a.frostfsid, err = frostfsid.NewFrostFSID(frostfsid.Config{
Cache: cache.NewFrostfsIDCache(getFrostfsIDCacheConfig(a.cfg, a.log)),
FrostFSID: cli,
Logger: a.log,
})
if err != nil {
a.log.Fatal(logs.InitFrostfsIDContractFailed, zap.Error(err))
}
}
func (a *App) initPolicyStorage(ctx context.Context) {
policyContract, err := contract.New(ctx, contract.Config{
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
Contract: a.cfg.GetString(cfgPolicyContract),
ProxyContract: a.cfg.GetString(cfgProxyContract),
Key: a.key,
Waiter: commonclient.WaiterOptions{
IgnoreAlreadyExistsError: false,
VerifyExecResults: true,
},
})
if err != nil {
a.log.Fatal(logs.InitPolicyContractFailed, zap.Error(err))
}
a.policyStorage = policy.NewStorage(policy.StorageConfig{
Contract: policyContract,
Cache: cache.NewMorphPolicyCache(getMorphPolicyCacheConfig(a.cfg, a.log)),
Log: a.log,
})
}
func (a *App) initResolver() {
var err error
a.bucketResolver, err = resolver.NewBucketResolver(a.getResolverOrder(), a.getResolverConfig())
if err != nil {
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err))
}
}
func (a *App) getResolverConfig() *resolver.Config {
return &resolver.Config{
FrostFS: frostfs.NewResolverFrostFS(a.pool),
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
}
}
func (a *App) getResolverOrder() []string {
order := a.cfg.GetStringSlice(cfgResolveOrder)
if a.cfg.GetString(cfgRPCEndpoint) == "" {
order = remove(order, resolver.NNSResolver)
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
}
if len(order) == 0 {
a.log.Info(logs.ContainerResolverWillBeDisabled)
}
return order
}
func (a *App) initTracing(ctx context.Context) {
instanceID := ""
if len(a.servers) > 0 {
instanceID = a.servers[0].Address()
}
cfg := tracing.Config{
Enabled: a.cfg.GetBool(cfgTracingEnabled),
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
Service: "frostfs-s3-gw",
InstanceID: instanceID,
Version: version.Version,
}
if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" {
caBytes, err := os.ReadFile(trustedCa)
if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
return
}
certPool := x509.NewCertPool()
ok := certPool.AppendCertsFromPEM(caBytes)
if !ok {
a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"))
return
}
cfg.ServerCaCertPool = certPool
}
attributes, err := fetchTracingAttributes(a.cfg)
if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
return
}
cfg.Attributes = attributes
updated, err := tracing.Setup(ctx, cfg)
if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
}
if updated {
a.log.Info(logs.TracingConfigUpdated)
}
}
func (a *App) shutdownTracing() {
const tracingShutdownTimeout = 5 * time.Second
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
defer cancel()
if err := tracing.Shutdown(shdnCtx); err != nil {
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err))
}
}
func newMaxClients(cfg *viper.Viper) maxClientsConfig {
config := maxClientsConfig{}
config.count = fetchMaxClientsCount(cfg)
config.deadline = fetchMaxClientsDeadline(cfg)
return config
}
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger))
if err != nil {
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err))
}
return source
}
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
var prm pool.InitParameters
var prmTree treepool.InitParameters
password := wallet.GetPassword(cfg, cfgWalletPassphrase)
key, err := wallet.GetKeyFromPath(cfg.GetString(cfgWalletPath), cfg.GetString(cfgWalletAddress), password)
if err != nil {
logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
}
prm.SetKey(&key.PrivateKey)
prmTree.SetKey(key)
logger.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
for _, peer := range fetchPeers(logger, cfg) {
prm.AddNode(peer)
prmTree.AddNode(peer)
}
connTimeout := fetchConnectTimeout(cfg)
prm.SetNodeDialTimeout(connTimeout)
prmTree.SetNodeDialTimeout(connTimeout)
streamTimeout := fetchStreamTimeout(cfg)
prm.SetNodeStreamTimeout(streamTimeout)
prmTree.SetNodeStreamTimeout(streamTimeout)
healthCheckTimeout := fetchHealthCheckTimeout(cfg)
prm.SetHealthcheckTimeout(healthCheckTimeout)
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
rebalanceInterval := fetchRebalanceInterval(cfg)
prm.SetClientRebalanceInterval(rebalanceInterval)
prmTree.SetClientRebalanceInterval(rebalanceInterval)
errorThreshold := fetchErrorThreshold(cfg)
prm.SetErrorThreshold(errorThreshold)
prm.SetGracefulCloseOnSwitchTimeout(fetchSetGracefulCloseOnSwitchTimeout(cfg))
prm.SetLogger(logger)
prmTree.SetLogger(logger)
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts))
interceptors := []grpc.DialOption{
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
grpc.WithContextDialer(dialSource.GrpcContextDialer()),
}
prm.SetGRPCDialOptions(interceptors...)
prmTree.SetGRPCDialOptions(interceptors...)
p, err := pool.NewPool(prm)
if err != nil {
logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
}
if err = p.Dial(ctx); err != nil {
logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
}
treePool, err := treepool.NewPool(prmTree)
if err != nil {
logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
}
if err = treePool.Dial(ctx); err != nil {
logger.Fatal(logs.FailedToDialTreePool, zap.Error(err))
}
return p, treePool, key
}
func remove(list []string, element string) []string {
for i, item := range list {
if item == element {
return append(list[:i], list[i+1:]...)
}
}
return list
}
// Wait waits for an application to finish.
//
// Pre-logs a message about the launch of the application mentioning its
// version (version.Version) and its name (frostfs-s3-gw). At the end, it writes
// about the stop to the log.
func (a *App) Wait() {
a.log.Info(logs.ApplicationStarted,
zap.String("name", "frostfs-s3-gw"),
zap.String("version", version.Version),
)
a.metrics.State().SetVersion(version.Version)
a.setHealthStatus()
<-a.webDone // wait for web-server to be stopped
a.log.Info(logs.ApplicationFinished)
}
func (a *App) setHealthStatus() {
a.metrics.State().SetHealth(metrics.HealthStatusReady)
}
// Serve runs HTTP server to handle S3 API requests.
func (a *App) Serve(ctx context.Context) {
cfg := api.Config{
Throttle: middleware.ThrottleOpts{
Limit: a.settings.maxClient.count,
BacklogTimeout: a.settings.maxClient.deadline,
},
Handler: a.api,
Center: a.ctr,
Log: a.log,
Metrics: a.metrics,
MiddlewareSettings: a.settings,
PolicyChecker: a.policyStorage,
FrostfsID: a.frostfsid,
FrostFSIDValidation: a.settings.frostfsidValidation,
XMLDecoder: a.settings,
Tagging: a.obj,
}
chiRouter := api.NewRouter(cfg)
// Use mux.Router as http.Handler
srv := new(http.Server)
srv.Handler = chiRouter
srv.ErrorLog = zap.NewStdLog(a.log)
srv.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout)
srv.ReadHeaderTimeout = a.cfg.GetDuration(cfgWebReadHeaderTimeout)
srv.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout)
srv.IdleTimeout = a.cfg.GetDuration(cfgWebIdleTimeout)
a.startServices()
servs := a.getServers()
for i := range servs {
go func(i int) {
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
if err := srv.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(servs[i].Address())
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
}
}(i)
}
if len(a.unbindServers) != 0 {
a.scheduleReconnect(ctx, srv)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case <-sigs:
a.configReload(ctx)
}
}
ctx, cancel := shutdownContext()
defer cancel()
a.log.Info(logs.StoppingServer, zap.Error(srv.Shutdown(ctx)))
a.metrics.Shutdown()
a.stopServices()
a.shutdownTracing()
close(a.webDone)
}
func shutdownContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultShutdownTimeout)
}
func (a *App) configReload(ctx context.Context) {
a.log.Info(logs.SIGHUPConfigReloadStarted)
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
return
}
if err := readInConfig(a.cfg); err != nil {
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
return
}
if err := a.bucketResolver.UpdateResolvers(a.getResolverOrder()); err != nil {
a.log.Warn(logs.FailedToReloadResolvers, zap.Error(err))
}
if err := a.updateServers(); err != nil {
a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
}
a.setRuntimeParameters()
a.stopServices()
a.startServices()
a.updateSettings()
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.initTracing(ctx)
a.setHealthStatus()
a.log.Info(logs.SIGHUPConfigReloadCompleted)
}
func (a *App) updateSettings() {
if lvl, err := getLogLevel(a.cfg); err != nil {
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
} else {
a.settings.logLevel.SetLevel(lvl)
}
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
}
a.settings.update(a.cfg, a.log)
}
func (a *App) startServices() {
a.services = a.services[:0]
pprofService := NewPprofService(a.cfg, a.log)
a.services = append(a.services, pprofService)
go pprofService.Start()
prometheusService := NewPrometheusService(a.cfg, a.log, a.metrics.Handler())
a.services = append(a.services, prometheusService)
go prometheusService.Start()
}
func (a *App) initServers(ctx context.Context) {
serversInfo := fetchServers(a.cfg, a.log)
a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.unbindServers = append(a.unbindServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
continue
}
a.metrics.MarkHealthy(serverInfo.Address)
a.servers = append(a.servers, srv)
a.log.Info(logs.AddServer, fields...)
}
if len(a.servers) == 0 {
a.log.Fatal(logs.NoHealthyServers)
}
}
func (a *App) updateServers() error {
serversInfo := fetchServers(a.cfg, a.log)
a.mu.Lock()
defer a.mu.Unlock()
var found bool
for _, serverInfo := range serversInfo {
ser := a.getServer(serverInfo.Address)
if ser != nil {
if serverInfo.TLS.Enabled {
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
return fmt.Errorf("failed to update tls certs: %w", err)
}
found = true
}
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
found = true
}
}
if !found {
return fmt.Errorf("invalid servers configuration: no known server found")
}
return nil
}
func (a *App) stopServices() {
ctx, cancel := shutdownContext()
defer cancel()
for _, svc := range a.services {
svc.ShutDown(ctx)
}
}
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg := layer.DefaultCachesConfigs(l)
cacheCfg.Objects.Lifetime = fetchCacheLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime)
cacheCfg.Objects.Size = fetchCacheSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size)
cacheCfg.ObjectsList.Lifetime = fetchCacheLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
cacheCfg.ObjectsList.Size = fetchCacheSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
cacheCfg.SessionList.Lifetime = fetchCacheLifetime(v, l, cfgSessionListCacheLifetime, cacheCfg.SessionList.Lifetime)
cacheCfg.SessionList.Size = fetchCacheSize(v, l, cfgSessionListCacheSize, cacheCfg.SessionList.Size)
cacheCfg.Buckets.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
cacheCfg.Buckets.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)
cacheCfg.Names.Lifetime = fetchCacheLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime)
cacheCfg.Names.Size = fetchCacheSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size)
cacheCfg.System.Lifetime = fetchCacheLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime)
cacheCfg.System.Size = fetchCacheSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size)
cacheCfg.AccessControl.Lifetime = fetchCacheLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime)
return cacheCfg
}
func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
cacheCfg := cache.DefaultAccessBoxConfig(l)
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime)
cacheCfg.Size = fetchCacheSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size)
return cacheCfg
}
func getMorphPolicyCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
cacheCfg := cache.DefaultMorphPolicyConfig(l)
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgMorphPolicyCacheLifetime, cacheCfg.Lifetime)
cacheCfg.Size = fetchCacheSize(v, l, cfgMorphPolicyCacheSize, cacheCfg.Size)
return cacheCfg
}
func getFrostfsIDCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
cacheCfg := cache.DefaultFrostfsIDConfig(l)
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgFrostfsIDCacheLifetime, cacheCfg.Lifetime)
cacheCfg.Size = fetchCacheSize(v, l, cfgFrostfsIDCacheSize, cacheCfg.Size)
return cacheCfg
}
func (a *App) initHandler() {
var err error
a.api, err = handler.New(a.log, a.obj, a.settings, a.policyStorage, a.frostfsid)
if err != nil {
a.log.Fatal(logs.CouldNotInitializeAPIHandler, zap.Error(err))
}
}
func (a *App) getServer(address string) Server {
for i := range a.servers {
if a.servers[i].Address() == address {
return a.servers[i]
}
}
return nil
}
func (a *App) updateUnbindServerInfo(info ServerInfo) bool {
for i := range a.unbindServers {
if a.unbindServers[i].Address == info.Address {
a.unbindServers[i] = info
return true
}
}
return false
}
func (a *App) getServers() []Server {
a.mu.RLock()
defer a.mu.RUnlock()
return a.servers
}
func (a *App) setRuntimeParameters() {
if len(os.Getenv("GOMEMLIMIT")) != 0 {
// default limit < yaml limit < app env limit < GOMEMLIMIT
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT)
return
}
softMemoryLimit := fetchSoftMemoryLimit(a.cfg)
previous := debug.SetMemoryLimit(softMemoryLimit)
if softMemoryLimit != previous {
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
zap.Int64("new_value", softMemoryLimit),
zap.Int64("old_value", previous))
}
}
func (a *App) scheduleReconnect(ctx context.Context, srv *http.Server) {
go func() {
t := time.NewTicker(a.settings.reconnectInterval)
defer t.Stop()
for {
select {
case <-t.C:
if a.tryReconnect(ctx, srv) {
return
}
t.Reset(a.settings.reconnectInterval)
case <-ctx.Done():
return
}
}
}()
}
func (a *App) tryReconnect(ctx context.Context, sr *http.Server) bool {
a.mu.Lock()
defer a.mu.Unlock()
a.log.Info(logs.ServerReconnecting)
var failedServers []ServerInfo
for _, serverInfo := range a.unbindServers {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
failedServers = append(failedServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
continue
}
go func() {
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
a.metrics.MarkHealthy(serverInfo.Address)
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
a.log.Warn(logs.ListenAndServe, zap.Error(err))
a.metrics.MarkUnhealthy(serverInfo.Address)
}
}()
a.servers = append(a.servers, srv)
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
}
a.unbindServers = failedServers
return len(a.unbindServers) == 0
}
func (a *App) fetchContainerInfo(ctx context.Context, cfgKey string) (info *data.BucketInfo, err error) {
cnrID, err := a.resolveContainerID(ctx, cfgKey)
if err != nil {
return nil, err
}
return getContainerInfo(ctx, cnrID, a.pool)
}
func (a *App) resolveContainerID(ctx context.Context, cfgKey string) (cid.ID, error) {
containerString := a.cfg.GetString(cfgKey)
var id cid.ID
if err := id.DecodeString(containerString); err != nil {
i := strings.Index(containerString, ".")
if i < 0 {
return cid.ID{}, fmt.Errorf("invalid container address: %s", containerString)
}
if id, err = a.bucketResolver.Resolve(ctx, containerString[i+1:], containerString[:i]); err != nil {
return cid.ID{}, fmt.Errorf("resolve container address %s: %w", containerString, err)
}
}
return id, nil
}
func getContainerInfo(ctx context.Context, id cid.ID, frostFSPool *pool.Pool) (*data.BucketInfo, error) {
prm := pool.PrmContainerGet{
ContainerID: id,
}
res, err := frostFSPool.GetContainer(ctx, prm)
if err != nil {
return nil, err
}
return &data.BucketInfo{
CID: id,
HomomorphicHashDisabled: container.IsHomomorphicHashingDisabled(res),
}, nil
}