forked from TrueCloudLab/frostfs-s3-gw
1078 lines
29 KiB
Go
1078 lines
29 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/xml"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"runtime/debug"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
|
s3middleware "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/frostfsid"
|
|
ffidcontract "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/frostfsid/contract"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/policy"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/policy/contract"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/wallet"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/go-chi/chi/v5/middleware"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/spf13/viper"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/exp/slices"
|
|
"golang.org/x/text/encoding/ianaindex"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const awsDefaultNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
|
|
|
|
type (
|
|
// App is the main application structure.
|
|
App struct {
|
|
ctr s3middleware.Center
|
|
log *zap.Logger
|
|
cfg *viper.Viper
|
|
pool *pool.Pool
|
|
treePool *treepool.Pool
|
|
key *keys.PrivateKey
|
|
obj *layer.Layer
|
|
api api.Handler
|
|
|
|
frostfsid *frostfsid.FrostFSID
|
|
|
|
policyStorage *policy.Storage
|
|
|
|
servers []Server
|
|
unbindServers []ServerInfo
|
|
mu sync.RWMutex
|
|
|
|
metrics *metrics.AppMetrics
|
|
bucketResolver *resolver.BucketResolver
|
|
services []*Service
|
|
settings *appSettings
|
|
|
|
webDone chan struct{}
|
|
wrkDone chan struct{}
|
|
}
|
|
|
|
appSettings struct {
|
|
logLevel zap.AtomicLevel
|
|
maxClient maxClientsConfig
|
|
defaultMaxAge int
|
|
reconnectInterval time.Duration
|
|
resolveZoneList []string
|
|
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
|
|
frostfsidValidation bool
|
|
|
|
mu sync.RWMutex
|
|
namespaces Namespaces
|
|
defaultXMLNS bool
|
|
bypassContentEncodingInChunks bool
|
|
clientCut bool
|
|
maxBufferSizeForPut uint64
|
|
md5Enabled bool
|
|
namespaceHeader string
|
|
defaultNamespaces []string
|
|
policyDenyByDefault bool
|
|
sourceIPHeader string
|
|
retryMaxAttempts int
|
|
retryMaxBackoff time.Duration
|
|
retryStrategy handler.RetryStrategy
|
|
}
|
|
|
|
maxClientsConfig struct {
|
|
deadline time.Duration
|
|
count int
|
|
}
|
|
|
|
Logger struct {
|
|
logger *zap.Logger
|
|
lvl zap.AtomicLevel
|
|
}
|
|
)
|
|
|
|
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
|
objPool, treePool, key := getPools(ctx, log.logger, v)
|
|
|
|
cfg := tokens.Config{
|
|
FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(objPool, key)),
|
|
Key: key,
|
|
CacheConfig: getAccessBoxCacheConfig(v, log.logger),
|
|
RemovingCheckAfterDurations: fetchRemovingCheckInterval(v, log.logger),
|
|
}
|
|
|
|
// prepare auth center
|
|
ctr := auth.New(tokens.New(cfg), v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes))
|
|
|
|
app := &App{
|
|
ctr: ctr,
|
|
log: log.logger,
|
|
cfg: v,
|
|
pool: objPool,
|
|
treePool: treePool,
|
|
key: key,
|
|
|
|
webDone: make(chan struct{}, 1),
|
|
wrkDone: make(chan struct{}, 1),
|
|
|
|
settings: newAppSettings(log, v),
|
|
}
|
|
|
|
app.init(ctx)
|
|
|
|
return app
|
|
}
|
|
|
|
func (a *App) init(ctx context.Context) {
|
|
a.setRuntimeParameters()
|
|
a.initFrostfsID(ctx)
|
|
a.initPolicyStorage(ctx)
|
|
a.initAPI(ctx)
|
|
a.initMetrics()
|
|
a.initServers(ctx)
|
|
a.initTracing(ctx)
|
|
}
|
|
|
|
func (a *App) initLayer(ctx context.Context) {
|
|
a.initResolver()
|
|
|
|
// prepare random key for anonymous requests
|
|
randomKey, err := keys.NewPrivateKey()
|
|
if err != nil {
|
|
a.log.Fatal(logs.CouldntGenerateRandomKey, zap.Error(err))
|
|
}
|
|
|
|
var gateOwner user.ID
|
|
user.IDFromKey(&gateOwner, a.key.PrivateKey.PublicKey)
|
|
|
|
var lifecycleCnrInfo *data.BucketInfo
|
|
if a.cfg.IsSet(cfgContainersLifecycle) {
|
|
lifecycleCnrInfo, err = a.fetchContainerInfo(ctx, cfgContainersLifecycle)
|
|
if err != nil {
|
|
a.log.Fatal(logs.CouldNotFetchLifecycleContainerInfo, zap.Error(err))
|
|
}
|
|
}
|
|
|
|
layerCfg := &layer.Config{
|
|
Cache: layer.NewCache(getCacheOptions(a.cfg, a.log)),
|
|
AnonKey: layer.AnonymousKey{
|
|
Key: randomKey,
|
|
},
|
|
GateOwner: gateOwner,
|
|
Resolver: a.bucketResolver,
|
|
TreeService: tree.NewTree(services.NewPoolWrapper(a.treePool), a.log),
|
|
Features: a.settings,
|
|
LifecycleCnrInfo: lifecycleCnrInfo,
|
|
GateKey: a.key,
|
|
}
|
|
|
|
// prepare object layer
|
|
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
|
}
|
|
|
|
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
|
settings := &appSettings{
|
|
logLevel: log.lvl,
|
|
maxClient: newMaxClients(v),
|
|
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
|
reconnectInterval: fetchReconnectInterval(v),
|
|
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
|
}
|
|
|
|
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
|
settings.isResolveListAllow = len(settings.resolveZoneList) > 0
|
|
if !settings.isResolveListAllow {
|
|
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketDeny)
|
|
}
|
|
|
|
settings.update(v, log.logger)
|
|
|
|
return settings
|
|
}
|
|
|
|
func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
|
s.updateNamespacesSettings(v, log)
|
|
s.useDefaultXMLNamespace(v.GetBool(cfgKludgeUseDefaultXMLNS))
|
|
s.setBypassContentEncodingInChunks(v.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
|
|
s.setClientCut(v.GetBool(cfgClientCut))
|
|
s.setBufferMaxSizeForPut(v.GetUint64(cfgBufferMaxSizeForPut))
|
|
s.setMD5Enabled(v.GetBool(cfgMD5Enabled))
|
|
s.setPolicyDenyByDefault(v.GetBool(cfgPolicyDenyByDefault))
|
|
s.setSourceIPHeader(v.GetString(cfgSourceIPHeader))
|
|
s.setRetryMaxAttempts(fetchRetryMaxAttempts(v))
|
|
s.setRetryMaxBackoff(fetchRetryMaxBackoff(v))
|
|
s.setRetryStrategy(fetchRetryStrategy(v))
|
|
}
|
|
|
|
func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger) {
|
|
nsHeader := v.GetString(cfgResolveNamespaceHeader)
|
|
nsConfig, defaultNamespaces := fetchNamespacesConfig(log, v)
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.namespaceHeader = nsHeader
|
|
s.defaultNamespaces = defaultNamespaces
|
|
s.namespaces = nsConfig.Namespaces
|
|
}
|
|
|
|
func (s *appSettings) BypassContentEncodingInChunks() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.bypassContentEncodingInChunks
|
|
}
|
|
|
|
func (s *appSettings) setBypassContentEncodingInChunks(bypass bool) {
|
|
s.mu.Lock()
|
|
s.bypassContentEncodingInChunks = bypass
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *appSettings) ClientCut() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.clientCut
|
|
}
|
|
|
|
func (s *appSettings) setClientCut(clientCut bool) {
|
|
s.mu.Lock()
|
|
s.clientCut = clientCut
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *appSettings) BufferMaxSizeForPut() uint64 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.maxBufferSizeForPut
|
|
}
|
|
|
|
func (s *appSettings) setBufferMaxSizeForPut(size uint64) {
|
|
s.mu.Lock()
|
|
s.maxBufferSizeForPut = size
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *appSettings) DefaultPlacementPolicy(namespace string) netmap.PlacementPolicy {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.namespaces[namespace].LocationConstraints[defaultConstraintName]
|
|
}
|
|
|
|
func (s *appSettings) PlacementPolicy(namespace, constraint string) (netmap.PlacementPolicy, bool) {
|
|
s.mu.RLock()
|
|
placementPolicy, ok := s.namespaces[namespace].LocationConstraints[constraint]
|
|
s.mu.RUnlock()
|
|
|
|
return placementPolicy, ok
|
|
}
|
|
|
|
func (s *appSettings) CopiesNumbers(namespace, constraint string) ([]uint32, bool) {
|
|
s.mu.RLock()
|
|
copiesNumbers, ok := s.namespaces[namespace].CopiesNumbers[constraint]
|
|
s.mu.RUnlock()
|
|
|
|
return copiesNumbers, ok
|
|
}
|
|
|
|
func (s *appSettings) DefaultCopiesNumbers(namespace string) []uint32 {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.namespaces[namespace].CopiesNumbers[defaultConstraintName]
|
|
}
|
|
|
|
func (s *appSettings) NewXMLDecoder(r io.Reader) *xml.Decoder {
|
|
dec := xml.NewDecoder(r)
|
|
dec.CharsetReader = func(charset string, reader io.Reader) (io.Reader, error) {
|
|
enc, err := ianaindex.IANA.Encoding(charset)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("charset %s: %w", charset, err)
|
|
}
|
|
return enc.NewDecoder().Reader(reader), nil
|
|
}
|
|
|
|
s.mu.RLock()
|
|
if s.defaultXMLNS {
|
|
dec.DefaultSpace = awsDefaultNamespace
|
|
}
|
|
s.mu.RUnlock()
|
|
|
|
return dec
|
|
}
|
|
|
|
func (s *appSettings) useDefaultXMLNamespace(useDefaultNamespace bool) {
|
|
s.mu.Lock()
|
|
s.defaultXMLNS = useDefaultNamespace
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *appSettings) DefaultMaxAge() int {
|
|
return s.defaultMaxAge
|
|
}
|
|
|
|
func (s *appSettings) ResolveZoneList() []string {
|
|
return s.resolveZoneList
|
|
}
|
|
|
|
func (s *appSettings) IsResolveListAllow() bool {
|
|
return s.isResolveListAllow
|
|
}
|
|
|
|
func (s *appSettings) MD5Enabled() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.md5Enabled
|
|
}
|
|
|
|
func (s *appSettings) setMD5Enabled(md5Enabled bool) {
|
|
s.mu.Lock()
|
|
s.md5Enabled = md5Enabled
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *appSettings) NamespaceHeader() string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.namespaceHeader
|
|
}
|
|
|
|
func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) {
|
|
if len(ns) == 0 {
|
|
return v2container.SysAttributeZoneDefault, true
|
|
}
|
|
|
|
return ns + ".ns", false
|
|
}
|
|
|
|
func (s *appSettings) isDefaultNamespace(ns string) bool {
|
|
s.mu.RLock()
|
|
namespaces := s.defaultNamespaces
|
|
s.mu.RUnlock()
|
|
return slices.Contains(namespaces, ns)
|
|
}
|
|
|
|
func (s *appSettings) ResolveNamespaceAlias(namespace string) string {
|
|
if s.isDefaultNamespace(namespace) {
|
|
return defaultNamespace
|
|
}
|
|
|
|
return namespace
|
|
}
|
|
|
|
func (s *appSettings) PolicyDenyByDefault() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.policyDenyByDefault
|
|
}
|
|
|
|
func (s *appSettings) setPolicyDenyByDefault(policyDenyByDefault bool) {
|
|
s.mu.Lock()
|
|
s.policyDenyByDefault = policyDenyByDefault
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *appSettings) setSourceIPHeader(header string) {
|
|
s.mu.Lock()
|
|
s.sourceIPHeader = header
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *appSettings) SourceIPHeader() string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.sourceIPHeader
|
|
}
|
|
|
|
func (s *appSettings) setRetryMaxAttempts(maxAttempts int) {
|
|
s.mu.Lock()
|
|
s.retryMaxAttempts = maxAttempts
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *appSettings) RetryMaxAttempts() int {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.retryMaxAttempts
|
|
}
|
|
|
|
func (s *appSettings) setRetryMaxBackoff(maxBackoff time.Duration) {
|
|
s.mu.Lock()
|
|
s.retryMaxBackoff = maxBackoff
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *appSettings) RetryMaxBackoff() time.Duration {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.retryMaxBackoff
|
|
}
|
|
|
|
func (s *appSettings) setRetryStrategy(strategy handler.RetryStrategy) {
|
|
s.mu.Lock()
|
|
s.retryStrategy = strategy
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *appSettings) RetryStrategy() handler.RetryStrategy {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.retryStrategy
|
|
}
|
|
|
|
func (a *App) initAPI(ctx context.Context) {
|
|
a.initLayer(ctx)
|
|
a.initHandler()
|
|
}
|
|
|
|
func (a *App) initMetrics() {
|
|
cfg := metrics.AppMetricsConfig{
|
|
Logger: a.log,
|
|
PoolStatistics: frostfs.NewPoolStatistic(a.pool),
|
|
Enabled: a.cfg.GetBool(cfgPrometheusEnabled),
|
|
}
|
|
|
|
a.metrics = metrics.NewAppMetrics(cfg)
|
|
a.metrics.State().SetHealth(metrics.HealthStatusStarting)
|
|
}
|
|
|
|
func (a *App) initFrostfsID(ctx context.Context) {
|
|
cli, err := ffidcontract.New(ctx, ffidcontract.Config{
|
|
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
|
Contract: a.cfg.GetString(cfgFrostfsIDContract),
|
|
ProxyContract: a.cfg.GetString(cfgProxyContract),
|
|
Key: a.key,
|
|
})
|
|
if err != nil {
|
|
a.log.Fatal(logs.InitFrostfsIDContractFailed, zap.Error(err))
|
|
}
|
|
|
|
a.frostfsid, err = frostfsid.NewFrostFSID(frostfsid.Config{
|
|
Cache: cache.NewFrostfsIDCache(getFrostfsIDCacheConfig(a.cfg, a.log)),
|
|
FrostFSID: cli,
|
|
Logger: a.log,
|
|
})
|
|
if err != nil {
|
|
a.log.Fatal(logs.InitFrostfsIDContractFailed, zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (a *App) initPolicyStorage(ctx context.Context) {
|
|
policyContract, err := contract.New(ctx, contract.Config{
|
|
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
|
Contract: a.cfg.GetString(cfgPolicyContract),
|
|
ProxyContract: a.cfg.GetString(cfgProxyContract),
|
|
Key: a.key,
|
|
})
|
|
if err != nil {
|
|
a.log.Fatal(logs.InitPolicyContractFailed, zap.Error(err))
|
|
}
|
|
|
|
a.policyStorage = policy.NewStorage(policy.StorageConfig{
|
|
Contract: policyContract,
|
|
Cache: cache.NewMorphPolicyCache(getMorphPolicyCacheConfig(a.cfg, a.log)),
|
|
Log: a.log,
|
|
})
|
|
}
|
|
|
|
func (a *App) initResolver() {
|
|
var err error
|
|
a.bucketResolver, err = resolver.NewBucketResolver(a.getResolverOrder(), a.getResolverConfig())
|
|
if err != nil {
|
|
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (a *App) getResolverConfig() *resolver.Config {
|
|
return &resolver.Config{
|
|
FrostFS: frostfs.NewResolverFrostFS(a.pool),
|
|
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
|
Settings: a.settings,
|
|
}
|
|
}
|
|
|
|
func (a *App) getResolverOrder() []string {
|
|
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
|
if a.cfg.GetString(cfgRPCEndpoint) == "" {
|
|
order = remove(order, resolver.NNSResolver)
|
|
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
|
|
}
|
|
|
|
if len(order) == 0 {
|
|
a.log.Info(logs.ContainerResolverWillBeDisabled)
|
|
}
|
|
|
|
return order
|
|
}
|
|
|
|
func (a *App) initTracing(ctx context.Context) {
|
|
instanceID := ""
|
|
if len(a.servers) > 0 {
|
|
instanceID = a.servers[0].Address()
|
|
}
|
|
cfg := tracing.Config{
|
|
Enabled: a.cfg.GetBool(cfgTracingEnabled),
|
|
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
|
|
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
|
|
Service: "frostfs-s3-gw",
|
|
InstanceID: instanceID,
|
|
Version: version.Version,
|
|
}
|
|
updated, err := tracing.Setup(ctx, cfg)
|
|
if err != nil {
|
|
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
|
}
|
|
if updated {
|
|
a.log.Info(logs.TracingConfigUpdated)
|
|
}
|
|
}
|
|
|
|
func (a *App) shutdownTracing() {
|
|
const tracingShutdownTimeout = 5 * time.Second
|
|
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
|
|
defer cancel()
|
|
|
|
if err := tracing.Shutdown(shdnCtx); err != nil {
|
|
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func newMaxClients(cfg *viper.Viper) maxClientsConfig {
|
|
config := maxClientsConfig{}
|
|
|
|
config.count = fetchMaxClientsCount(cfg)
|
|
|
|
config.deadline = fetchMaxClientsDeadline(cfg)
|
|
|
|
return config
|
|
}
|
|
|
|
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
|
var prm pool.InitParameters
|
|
var prmTree treepool.InitParameters
|
|
|
|
password := wallet.GetPassword(cfg, cfgWalletPassphrase)
|
|
key, err := wallet.GetKeyFromPath(cfg.GetString(cfgWalletPath), cfg.GetString(cfgWalletAddress), password)
|
|
if err != nil {
|
|
logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
|
}
|
|
|
|
prm.SetKey(&key.PrivateKey)
|
|
prmTree.SetKey(key)
|
|
logger.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
|
|
|
for _, peer := range fetchPeers(logger, cfg) {
|
|
prm.AddNode(peer)
|
|
prmTree.AddNode(peer)
|
|
}
|
|
|
|
connTimeout := fetchConnectTimeout(cfg)
|
|
prm.SetNodeDialTimeout(connTimeout)
|
|
prmTree.SetNodeDialTimeout(connTimeout)
|
|
|
|
streamTimeout := fetchStreamTimeout(cfg)
|
|
prm.SetNodeStreamTimeout(streamTimeout)
|
|
prmTree.SetNodeStreamTimeout(streamTimeout)
|
|
|
|
healthCheckTimeout := fetchHealthCheckTimeout(cfg)
|
|
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
|
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
|
|
|
rebalanceInterval := fetchRebalanceInterval(cfg)
|
|
prm.SetClientRebalanceInterval(rebalanceInterval)
|
|
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
|
|
|
errorThreshold := fetchErrorThreshold(cfg)
|
|
prm.SetErrorThreshold(errorThreshold)
|
|
prm.SetLogger(logger)
|
|
prmTree.SetLogger(logger)
|
|
|
|
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts))
|
|
|
|
var apiGRPCDialOpts []grpc.DialOption
|
|
var treeGRPCDialOpts []grpc.DialOption
|
|
if cfg.GetBool(cfgTracingEnabled) {
|
|
interceptors := []grpc.DialOption{
|
|
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
|
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
|
|
}
|
|
treeGRPCDialOpts = append(treeGRPCDialOpts, interceptors...)
|
|
apiGRPCDialOpts = append(apiGRPCDialOpts, interceptors...)
|
|
}
|
|
prm.SetGRPCDialOptions(apiGRPCDialOpts...)
|
|
prmTree.SetGRPCDialOptions(treeGRPCDialOpts...)
|
|
|
|
p, err := pool.NewPool(prm)
|
|
if err != nil {
|
|
logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
|
|
}
|
|
|
|
if err = p.Dial(ctx); err != nil {
|
|
logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
|
}
|
|
|
|
treePool, err := treepool.NewPool(prmTree)
|
|
if err != nil {
|
|
logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
|
|
}
|
|
if err = treePool.Dial(ctx); err != nil {
|
|
logger.Fatal(logs.FailedToDialTreePool, zap.Error(err))
|
|
}
|
|
|
|
return p, treePool, key
|
|
}
|
|
|
|
func remove(list []string, element string) []string {
|
|
for i, item := range list {
|
|
if item == element {
|
|
return append(list[:i], list[i+1:]...)
|
|
}
|
|
}
|
|
return list
|
|
}
|
|
|
|
// Wait waits for an application to finish.
|
|
//
|
|
// Pre-logs a message about the launch of the application mentioning its
|
|
// version (version.Version) and its name (frostfs-s3-gw). At the end, it writes
|
|
// about the stop to the log.
|
|
func (a *App) Wait() {
|
|
a.log.Info(logs.ApplicationStarted,
|
|
zap.String("name", "frostfs-s3-gw"),
|
|
zap.String("version", version.Version),
|
|
)
|
|
|
|
a.metrics.State().SetVersion(version.Version)
|
|
a.setHealthStatus()
|
|
|
|
<-a.webDone // wait for web-server to be stopped
|
|
|
|
a.log.Info(logs.ApplicationFinished)
|
|
}
|
|
|
|
func (a *App) setHealthStatus() {
|
|
a.metrics.State().SetHealth(metrics.HealthStatusReady)
|
|
}
|
|
|
|
// Serve runs HTTP server to handle S3 API requests.
|
|
func (a *App) Serve(ctx context.Context) {
|
|
// Attach S3 API:
|
|
domains := a.cfg.GetStringSlice(cfgListenDomains)
|
|
a.log.Info(logs.FetchDomainsPrepareToUseAPI, zap.Strings("domains", domains))
|
|
|
|
cfg := api.Config{
|
|
Throttle: middleware.ThrottleOpts{
|
|
Limit: a.settings.maxClient.count,
|
|
BacklogTimeout: a.settings.maxClient.deadline,
|
|
},
|
|
Handler: a.api,
|
|
Center: a.ctr,
|
|
Log: a.log,
|
|
Metrics: a.metrics,
|
|
Domains: domains,
|
|
|
|
MiddlewareSettings: a.settings,
|
|
PolicyChecker: a.policyStorage,
|
|
|
|
FrostfsID: a.frostfsid,
|
|
FrostFSIDValidation: a.settings.frostfsidValidation,
|
|
|
|
XMLDecoder: a.settings,
|
|
Tagging: a.obj,
|
|
}
|
|
|
|
chiRouter := api.NewRouter(cfg)
|
|
|
|
// Use mux.Router as http.Handler
|
|
srv := new(http.Server)
|
|
srv.Handler = chiRouter
|
|
srv.ErrorLog = zap.NewStdLog(a.log)
|
|
srv.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout)
|
|
srv.ReadHeaderTimeout = a.cfg.GetDuration(cfgWebReadHeaderTimeout)
|
|
srv.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout)
|
|
srv.IdleTimeout = a.cfg.GetDuration(cfgWebIdleTimeout)
|
|
|
|
a.startServices()
|
|
|
|
servs := a.getServers()
|
|
|
|
for i := range servs {
|
|
go func(i int) {
|
|
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
|
|
|
|
if err := srv.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
|
|
a.metrics.MarkUnhealthy(servs[i].Address())
|
|
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
if len(a.unbindServers) != 0 {
|
|
a.scheduleReconnect(ctx, srv)
|
|
}
|
|
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, syscall.SIGHUP)
|
|
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
break LOOP
|
|
case <-sigs:
|
|
a.configReload(ctx)
|
|
}
|
|
}
|
|
|
|
ctx, cancel := shutdownContext()
|
|
defer cancel()
|
|
|
|
a.log.Info(logs.StoppingServer, zap.Error(srv.Shutdown(ctx)))
|
|
|
|
a.metrics.Shutdown()
|
|
a.stopServices()
|
|
a.shutdownTracing()
|
|
|
|
close(a.webDone)
|
|
}
|
|
|
|
func shutdownContext() (context.Context, context.CancelFunc) {
|
|
return context.WithTimeout(context.Background(), defaultShutdownTimeout)
|
|
}
|
|
|
|
func (a *App) configReload(ctx context.Context) {
|
|
a.log.Info(logs.SIGHUPConfigReloadStarted)
|
|
|
|
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
|
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
|
return
|
|
}
|
|
if err := readInConfig(a.cfg); err != nil {
|
|
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if err := a.bucketResolver.UpdateResolvers(a.getResolverOrder()); err != nil {
|
|
a.log.Warn(logs.FailedToReloadResolvers, zap.Error(err))
|
|
}
|
|
|
|
if err := a.updateServers(); err != nil {
|
|
a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
|
|
}
|
|
|
|
a.setRuntimeParameters()
|
|
|
|
a.stopServices()
|
|
a.startServices()
|
|
|
|
a.updateSettings()
|
|
|
|
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
|
a.initTracing(ctx)
|
|
a.setHealthStatus()
|
|
|
|
a.log.Info(logs.SIGHUPConfigReloadCompleted)
|
|
}
|
|
|
|
func (a *App) updateSettings() {
|
|
if lvl, err := getLogLevel(a.cfg); err != nil {
|
|
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
|
|
} else {
|
|
a.settings.logLevel.SetLevel(lvl)
|
|
}
|
|
|
|
a.settings.update(a.cfg, a.log)
|
|
}
|
|
|
|
func (a *App) startServices() {
|
|
a.services = a.services[:0]
|
|
|
|
pprofService := NewPprofService(a.cfg, a.log)
|
|
a.services = append(a.services, pprofService)
|
|
go pprofService.Start()
|
|
|
|
prometheusService := NewPrometheusService(a.cfg, a.log, a.metrics.Handler())
|
|
a.services = append(a.services, prometheusService)
|
|
go prometheusService.Start()
|
|
}
|
|
|
|
func (a *App) initServers(ctx context.Context) {
|
|
serversInfo := fetchServers(a.cfg, a.log)
|
|
|
|
a.servers = make([]Server, 0, len(serversInfo))
|
|
for _, serverInfo := range serversInfo {
|
|
fields := []zap.Field{
|
|
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
|
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
|
}
|
|
srv, err := newServer(ctx, serverInfo)
|
|
if err != nil {
|
|
a.unbindServers = append(a.unbindServers, serverInfo)
|
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
|
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
|
|
continue
|
|
}
|
|
a.metrics.MarkHealthy(serverInfo.Address)
|
|
|
|
a.servers = append(a.servers, srv)
|
|
a.log.Info(logs.AddServer, fields...)
|
|
}
|
|
|
|
if len(a.servers) == 0 {
|
|
a.log.Fatal(logs.NoHealthyServers)
|
|
}
|
|
}
|
|
|
|
func (a *App) updateServers() error {
|
|
serversInfo := fetchServers(a.cfg, a.log)
|
|
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
|
|
var found bool
|
|
for _, serverInfo := range serversInfo {
|
|
ser := a.getServer(serverInfo.Address)
|
|
if ser != nil {
|
|
if serverInfo.TLS.Enabled {
|
|
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
|
return fmt.Errorf("failed to update tls certs: %w", err)
|
|
}
|
|
found = true
|
|
}
|
|
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
|
|
found = true
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
return fmt.Errorf("invalid servers configuration: no known server found")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *App) stopServices() {
|
|
ctx, cancel := shutdownContext()
|
|
defer cancel()
|
|
|
|
for _, svc := range a.services {
|
|
svc.ShutDown(ctx)
|
|
}
|
|
}
|
|
|
|
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
|
|
cacheCfg := layer.DefaultCachesConfigs(l)
|
|
|
|
cacheCfg.Objects.Lifetime = fetchCacheLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime)
|
|
cacheCfg.Objects.Size = fetchCacheSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size)
|
|
|
|
cacheCfg.ObjectsList.Lifetime = fetchCacheLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
|
|
cacheCfg.ObjectsList.Size = fetchCacheSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
|
|
|
|
cacheCfg.SessionList.Lifetime = fetchCacheLifetime(v, l, cfgSessionListCacheLifetime, cacheCfg.SessionList.Lifetime)
|
|
cacheCfg.SessionList.Size = fetchCacheSize(v, l, cfgSessionListCacheSize, cacheCfg.SessionList.Size)
|
|
|
|
cacheCfg.Buckets.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
|
|
cacheCfg.Buckets.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)
|
|
|
|
cacheCfg.Names.Lifetime = fetchCacheLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime)
|
|
cacheCfg.Names.Size = fetchCacheSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size)
|
|
|
|
cacheCfg.System.Lifetime = fetchCacheLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime)
|
|
cacheCfg.System.Size = fetchCacheSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size)
|
|
|
|
cacheCfg.AccessControl.Lifetime = fetchCacheLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
|
|
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
|
|
|
|
return cacheCfg
|
|
}
|
|
|
|
func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
|
cacheCfg := cache.DefaultAccessBoxConfig(l)
|
|
|
|
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime)
|
|
cacheCfg.Size = fetchCacheSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size)
|
|
|
|
return cacheCfg
|
|
}
|
|
|
|
func getMorphPolicyCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
|
cacheCfg := cache.DefaultMorphPolicyConfig(l)
|
|
|
|
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgMorphPolicyCacheLifetime, cacheCfg.Lifetime)
|
|
cacheCfg.Size = fetchCacheSize(v, l, cfgMorphPolicyCacheSize, cacheCfg.Size)
|
|
|
|
return cacheCfg
|
|
}
|
|
|
|
func getFrostfsIDCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
|
cacheCfg := cache.DefaultFrostfsIDConfig(l)
|
|
|
|
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgFrostfsIDCacheLifetime, cacheCfg.Lifetime)
|
|
cacheCfg.Size = fetchCacheSize(v, l, cfgFrostfsIDCacheSize, cacheCfg.Size)
|
|
|
|
return cacheCfg
|
|
}
|
|
|
|
func (a *App) initHandler() {
|
|
var err error
|
|
|
|
a.api, err = handler.New(a.log, a.obj, a.settings, a.policyStorage, a.frostfsid)
|
|
if err != nil {
|
|
a.log.Fatal(logs.CouldNotInitializeAPIHandler, zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (a *App) getServer(address string) Server {
|
|
for i := range a.servers {
|
|
if a.servers[i].Address() == address {
|
|
return a.servers[i]
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *App) updateUnbindServerInfo(info ServerInfo) bool {
|
|
for i := range a.unbindServers {
|
|
if a.unbindServers[i].Address == info.Address {
|
|
a.unbindServers[i] = info
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (a *App) getServers() []Server {
|
|
a.mu.RLock()
|
|
defer a.mu.RUnlock()
|
|
return a.servers
|
|
}
|
|
|
|
func (a *App) setRuntimeParameters() {
|
|
if len(os.Getenv("GOMEMLIMIT")) != 0 {
|
|
// default limit < yaml limit < app env limit < GOMEMLIMIT
|
|
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT)
|
|
return
|
|
}
|
|
|
|
softMemoryLimit := fetchSoftMemoryLimit(a.cfg)
|
|
previous := debug.SetMemoryLimit(softMemoryLimit)
|
|
if softMemoryLimit != previous {
|
|
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
|
|
zap.Int64("new_value", softMemoryLimit),
|
|
zap.Int64("old_value", previous))
|
|
}
|
|
}
|
|
|
|
func (a *App) scheduleReconnect(ctx context.Context, srv *http.Server) {
|
|
go func() {
|
|
t := time.NewTicker(a.settings.reconnectInterval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
if a.tryReconnect(ctx, srv) {
|
|
return
|
|
}
|
|
t.Reset(a.settings.reconnectInterval)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (a *App) tryReconnect(ctx context.Context, sr *http.Server) bool {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
|
|
a.log.Info(logs.ServerReconnecting)
|
|
var failedServers []ServerInfo
|
|
|
|
for _, serverInfo := range a.unbindServers {
|
|
fields := []zap.Field{
|
|
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
|
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
|
}
|
|
|
|
srv, err := newServer(ctx, serverInfo)
|
|
if err != nil {
|
|
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
|
|
failedServers = append(failedServers, serverInfo)
|
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
|
continue
|
|
}
|
|
|
|
go func() {
|
|
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
|
|
a.metrics.MarkHealthy(serverInfo.Address)
|
|
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
a.log.Warn(logs.ListenAndServe, zap.Error(err))
|
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
|
}
|
|
}()
|
|
|
|
a.servers = append(a.servers, srv)
|
|
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
|
|
}
|
|
|
|
a.unbindServers = failedServers
|
|
|
|
return len(a.unbindServers) == 0
|
|
}
|
|
|
|
func (a *App) fetchContainerInfo(ctx context.Context, cfgKey string) (info *data.BucketInfo, err error) {
|
|
containerString := a.cfg.GetString(cfgKey)
|
|
|
|
var id cid.ID
|
|
if err = id.DecodeString(containerString); err != nil {
|
|
if id, err = a.bucketResolver.Resolve(ctx, containerString); err != nil {
|
|
return nil, fmt.Errorf("resolve container name %s: %w", containerString, err)
|
|
}
|
|
}
|
|
|
|
return getContainerInfo(ctx, id, a.pool)
|
|
}
|
|
|
|
func getContainerInfo(ctx context.Context, id cid.ID, frostFSPool *pool.Pool) (*data.BucketInfo, error) {
|
|
prm := pool.PrmContainerGet{
|
|
ContainerID: id,
|
|
}
|
|
|
|
res, err := frostFSPool.GetContainer(ctx, prm)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &data.BucketInfo{
|
|
CID: id,
|
|
HomomorphicHashDisabled: container.IsHomomorphicHashingDisabled(res),
|
|
}, nil
|
|
}
|