forked from TrueCloudLab/frostfs-s3-gw
[#168] Skip only invalid policies and copies instead of ignoring all of them
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
41a128b1aa
commit
b5fce5c8d2
5 changed files with 186 additions and 225 deletions
|
@ -41,6 +41,7 @@ This document outlines major changes between releases.
|
|||
- Use chi router instead of archived gorlilla/mux (#149)
|
||||
- Complete multipart upload doesn't unnecessary copy now. Thus, the total time of multipart upload was reduced by 2 times (#63)
|
||||
- Use gate key to form object owner (#175)
|
||||
- Apply placement policies and copies if there is at least one valid value (#168)
|
||||
|
||||
### Removed
|
||||
- Drop `tree.service` param (now endpoints from `peers` section are used) (#133)
|
||||
|
|
|
@ -57,13 +57,6 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultPolicy is a default policy of placing containers in FrostFS if it's not set at the request.
|
||||
DefaultPolicy = "REP 3"
|
||||
// DefaultCopiesNumber is a default number of object copies that is enough to consider put successful if it's not set in config.
|
||||
DefaultCopiesNumber uint32 = 0
|
||||
)
|
||||
|
||||
var _ api.Handler = (*handler)(nil)
|
||||
|
||||
// New creates new api.Handler using given logger and client.
|
||||
|
|
101
cmd/s3-gw/app.go
101
cmd/s3-gw/app.go
|
@ -3,7 +3,6 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -166,14 +165,9 @@ func (a *App) initLayer(ctx context.Context) {
|
|||
}
|
||||
|
||||
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||
policies, err := newPlacementPolicy(log.logger, v)
|
||||
if err != nil {
|
||||
log.logger.Fatal(logs.FailedToCreateNewPolicyMapping, zap.Error(err))
|
||||
}
|
||||
|
||||
settings := &appSettings{
|
||||
logLevel: log.lvl,
|
||||
policies: policies,
|
||||
policies: newPlacementPolicy(log.logger, v),
|
||||
xmlDecoder: xml.NewDecoderProvider(v.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload)),
|
||||
maxClient: newMaxClients(v),
|
||||
}
|
||||
|
@ -352,16 +346,10 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
|
|||
return p, treePool, key
|
||||
}
|
||||
|
||||
func newPlacementPolicy(l *zap.Logger, v *viper.Viper) (*placementPolicy, error) {
|
||||
policies := &placementPolicy{
|
||||
regionMap: make(map[string]netmap.PlacementPolicy),
|
||||
defaultCopiesNumbers: []uint32{handler.DefaultCopiesNumber},
|
||||
}
|
||||
|
||||
policies.updateCopiesNumbers(l, v)
|
||||
policies.updateDefaultCopiesNumbers(l, v)
|
||||
|
||||
return policies, policies.updatePolicy(v)
|
||||
func newPlacementPolicy(l *zap.Logger, v *viper.Viper) *placementPolicy {
|
||||
var policies placementPolicy
|
||||
policies.update(l, v)
|
||||
return &policies
|
||||
}
|
||||
|
||||
func (p *placementPolicy) DefaultPlacementPolicy() netmap.PlacementPolicy {
|
||||
|
@ -393,56 +381,18 @@ func (p *placementPolicy) DefaultCopiesNumbers() []uint32 {
|
|||
}
|
||||
|
||||
func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) {
|
||||
if err := p.updatePolicy(v); err != nil {
|
||||
l.Warn(logs.PoliciesWontBeUpdated, zap.Error(err))
|
||||
}
|
||||
|
||||
p.updateCopiesNumbers(l, v)
|
||||
p.updateDefaultCopiesNumbers(l, v)
|
||||
}
|
||||
|
||||
func (p *placementPolicy) updatePolicy(v *viper.Viper) error {
|
||||
defaultPlacementPolicy, err := fetchDefaultPolicy(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
regionMap, err := fetchRegionMappingPolicies(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defaultPolicy := fetchDefaultPolicy(l, v)
|
||||
regionMap := fetchRegionMappingPolicies(l, v)
|
||||
defaultCopies := fetchDefaultCopiesNumbers(l, v)
|
||||
copiesNumbers := fetchCopiesNumbers(l, v)
|
||||
|
||||
p.mu.Lock()
|
||||
p.defaultPolicy = defaultPlacementPolicy
|
||||
defer p.mu.Unlock()
|
||||
|
||||
p.defaultPolicy = defaultPolicy
|
||||
p.regionMap = regionMap
|
||||
p.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *placementPolicy) updateCopiesNumbers(l *zap.Logger, v *viper.Viper) {
|
||||
if newCopiesNumbers, err := fetchCopiesNumbers(l, v); err != nil {
|
||||
l.Warn(logs.CopiesNumbersWontBeUpdated, zap.Error(err))
|
||||
} else {
|
||||
p.mu.Lock()
|
||||
p.copiesNumbers = newCopiesNumbers
|
||||
p.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *placementPolicy) updateDefaultCopiesNumbers(l *zap.Logger, v *viper.Viper) {
|
||||
configuredValues, err := fetchDefaultCopiesNumbers(v)
|
||||
|
||||
if err == nil {
|
||||
p.mu.Lock()
|
||||
p.defaultCopiesNumbers = configuredValues
|
||||
p.mu.Unlock()
|
||||
l.Info(logs.DefaultCopiesNumbers, zap.Uint32s("vector", p.defaultCopiesNumbers))
|
||||
return
|
||||
}
|
||||
|
||||
l.Error(logs.CannotParseDefaultCopiesNumbers, zap.Error(err))
|
||||
l.Warn(logs.DefaultCopiesNumbersWontBeUpdated, zap.Uint32s("current value", p.DefaultCopiesNumbers()))
|
||||
p.defaultCopiesNumbers = defaultCopies
|
||||
p.copiesNumbers = copiesNumbers
|
||||
}
|
||||
|
||||
func remove(list []string, element string) []string {
|
||||
|
@ -729,26 +679,3 @@ func (a *App) initHandler() {
|
|||
a.log.Fatal(logs.CouldNotInitializeAPIHandler, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func readRegionMap(filePath string) (map[string]string, error) {
|
||||
regionMap := make(map[string]string)
|
||||
|
||||
if filePath == "" {
|
||||
return regionMap, nil
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("coudln't read file '%s'", filePath)
|
||||
}
|
||||
|
||||
if err = json.Unmarshal(data, ®ionMap); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal policies: %w", err)
|
||||
}
|
||||
|
||||
if _, ok := regionMap[api.DefaultLocationConstraint]; ok {
|
||||
return nil, fmt.Errorf("config overrides %s location constraint", api.DefaultLocationConstraint)
|
||||
}
|
||||
|
||||
return regionMap, nil
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -10,6 +11,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
||||
|
@ -31,11 +33,14 @@ const (
|
|||
defaultShutdownTimeout = 15 * time.Second
|
||||
|
||||
defaultPoolErrorThreshold uint32 = 100
|
||||
defaultPlacementPolicy = "REP 3"
|
||||
|
||||
defaultMaxClientsCount = 100
|
||||
defaultMaxClientsDeadline = time.Second * 30
|
||||
)
|
||||
|
||||
var defaultCopiesNumbers = []uint32{0}
|
||||
|
||||
const ( // Settings.
|
||||
// Logger.
|
||||
cfgLoggerLevel = "logger.level"
|
||||
|
@ -225,18 +230,24 @@ func fetchMaxClientsDeadline(cfg *viper.Viper) time.Duration {
|
|||
return maxClientsDeadline
|
||||
}
|
||||
|
||||
func fetchDefaultPolicy(cfg *viper.Viper) (netmap.PlacementPolicy, error) {
|
||||
defaultPolicyStr := handler.DefaultPolicy
|
||||
func fetchDefaultPolicy(l *zap.Logger, cfg *viper.Viper) netmap.PlacementPolicy {
|
||||
var policy netmap.PlacementPolicy
|
||||
|
||||
if cfg.IsSet(cfgPolicyDefault) {
|
||||
defaultPolicyStr = cfg.GetString(cfgPolicyDefault)
|
||||
policyStr := cfg.GetString(cfgPolicyDefault)
|
||||
if err := policy.DecodeString(policyStr); err != nil {
|
||||
l.Warn(logs.FailedToParseDefaultLocationConstraint,
|
||||
zap.String("policy", policyStr), zap.String("default", defaultPlacementPolicy), zap.Error(err))
|
||||
} else {
|
||||
return policy
|
||||
}
|
||||
}
|
||||
|
||||
var defaultPlacementPolicy netmap.PlacementPolicy
|
||||
if err := defaultPlacementPolicy.DecodeString(defaultPolicyStr); err != nil {
|
||||
return netmap.PlacementPolicy{}, fmt.Errorf("parse default policy '%s': %w", defaultPolicyStr, err)
|
||||
if err := policy.DecodeString(defaultPlacementPolicy); err != nil {
|
||||
l.Fatal("failed to parse default 'default' location constraint", zap.String("policy", defaultPlacementPolicy))
|
||||
}
|
||||
|
||||
return defaultPlacementPolicy, nil
|
||||
return policy
|
||||
}
|
||||
|
||||
func fetchNATSTimeout(cfg *viper.Viper, l *zap.Logger) time.Duration {
|
||||
|
@ -300,14 +311,21 @@ func fetchDefaultMaxAge(cfg *viper.Viper, l *zap.Logger) int {
|
|||
return defaultMaxAge
|
||||
}
|
||||
|
||||
func fetchRegionMappingPolicies(cfg *viper.Viper) (map[string]netmap.PlacementPolicy, error) {
|
||||
regionPolicyMap, err := readRegionMap(cfg.GetString(cfgPolicyRegionMapFile))
|
||||
func fetchRegionMappingPolicies(l *zap.Logger, cfg *viper.Viper) map[string]netmap.PlacementPolicy {
|
||||
filepath := cfg.GetString(cfgPolicyRegionMapFile)
|
||||
regionPolicyMap, err := readRegionMap(filepath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read region map file: %w", err)
|
||||
l.Warn(logs.FailedToReadRegionMapFilePolicies, zap.String("file", filepath), zap.Error(err))
|
||||
return make(map[string]netmap.PlacementPolicy)
|
||||
}
|
||||
|
||||
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap))
|
||||
for region, policy := range regionPolicyMap {
|
||||
if region == api.DefaultLocationConstraint {
|
||||
l.Warn(logs.DefaultLocationConstraintCantBeOverriden, zap.String("policy", policy))
|
||||
continue
|
||||
}
|
||||
|
||||
var pp netmap.PlacementPolicy
|
||||
if err = pp.DecodeString(policy); err == nil {
|
||||
regionMap[region] = pp
|
||||
|
@ -319,29 +337,49 @@ func fetchRegionMappingPolicies(cfg *viper.Viper) (map[string]netmap.PlacementPo
|
|||
continue
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("parse region '%s' to policy mapping: %w", region, err)
|
||||
l.Warn(logs.FailedToParseLocationConstraint, zap.String("region", region), zap.String("policy", policy))
|
||||
}
|
||||
|
||||
return regionMap
|
||||
}
|
||||
|
||||
func readRegionMap(filePath string) (map[string]string, error) {
|
||||
regionMap := make(map[string]string)
|
||||
|
||||
if filePath == "" {
|
||||
return regionMap, nil
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("coudln't read file '%s'", filePath)
|
||||
}
|
||||
|
||||
if err = json.Unmarshal(data, ®ionMap); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal policies: %w", err)
|
||||
}
|
||||
|
||||
return regionMap, nil
|
||||
}
|
||||
|
||||
func fetchDefaultCopiesNumbers(v *viper.Viper) ([]uint32, error) {
|
||||
func fetchDefaultCopiesNumbers(l *zap.Logger, v *viper.Viper) []uint32 {
|
||||
unparsed := v.GetStringSlice(cfgSetCopiesNumber)
|
||||
var result []uint32
|
||||
|
||||
result := make([]uint32, len(unparsed))
|
||||
for i := range unparsed {
|
||||
parsedValue, err := strconv.ParseUint(unparsed[i], 10, 32)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
l.Warn(logs.FailedToParseDefaultCopiesNumbers,
|
||||
zap.Strings("copies numbers", unparsed), zap.Uint32s("default", defaultCopiesNumbers), zap.Error(err))
|
||||
return defaultCopiesNumbers
|
||||
}
|
||||
result = append(result, uint32(parsedValue))
|
||||
result[i] = uint32(parsedValue)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
return result
|
||||
}
|
||||
|
||||
func fetchCopiesNumbers(l *zap.Logger, v *viper.Viper) (map[string][]uint32, error) {
|
||||
var copiesNums = make(map[string][]uint32)
|
||||
func fetchCopiesNumbers(l *zap.Logger, v *viper.Viper) map[string][]uint32 {
|
||||
copiesNums := make(map[string][]uint32)
|
||||
for i := 0; ; i++ {
|
||||
key := cfgCopiesNumbers + "." + strconv.Itoa(i) + "."
|
||||
constraint := v.GetString(key + "location_constraint")
|
||||
|
@ -355,7 +393,9 @@ func fetchCopiesNumbers(l *zap.Logger, v *viper.Viper) (map[string][]uint32, err
|
|||
for j := range vector {
|
||||
parsedValue, err := strconv.ParseUint(vector[j], 10, 32)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
l.Warn(logs.FailedToParseCopiesNumbers, zap.String("location", constraint),
|
||||
zap.Strings("copies numbers", vector), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
vector32[j] = uint32(parsedValue)
|
||||
}
|
||||
|
@ -363,7 +403,7 @@ func fetchCopiesNumbers(l *zap.Logger, v *viper.Viper) (map[string][]uint32, err
|
|||
copiesNums[constraint] = vector32
|
||||
l.Info(logs.ConstraintAdded, zap.String("location", constraint), zap.Strings("copies numbers", vector))
|
||||
}
|
||||
return copiesNums, nil
|
||||
return copiesNums
|
||||
}
|
||||
|
||||
func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
|
||||
|
|
|
@ -17,11 +17,6 @@ const (
|
|||
TracingConfigUpdated = "tracing config updated" // Info in ../../cmd/s3-gw/app.go
|
||||
FailedToShutdownTracing = "failed to shutdown tracing" // Warn in ../../cmd/s3-gw/app.go
|
||||
UsingCredentials = "using credentials" // Info in ../../cmd/s3-gw/app.go
|
||||
PoliciesWontBeUpdated = "policies won't be updated" // Warn in ../../cmd/s3-gw/app.go
|
||||
CopiesNumbersWontBeUpdated = "copies numbers won't be updated" // Warn in ../../cmd/s3-gw/app.go
|
||||
DefaultCopiesNumbers = "default copies numbers" // Info in ../../cmd/s3-gw/app.go
|
||||
CannotParseDefaultCopiesNumbers = "cannot parse default copies numbers" // Error in ../../cmd/s3-gw/app.go
|
||||
DefaultCopiesNumbersWontBeUpdated = "default copies numbers won't be updated" // Warn in ../../cmd/s3-gw/app.go
|
||||
ApplicationStarted = "application started" // Info in ../../cmd/s3-gw/app.go
|
||||
ApplicationFinished = "application finished" // Info in ../../cmd/s3-gw/app.go
|
||||
FetchDomainsPrepareToUseAPI = "fetch domains, prepare to use API" // Info in ../../cmd/s3-gw/app.go
|
||||
|
@ -39,6 +34,12 @@ const (
|
|||
ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided = "resolver 'nns' won't be used since 'rpc_endpoint' isn't provided" // Warn in ../../cmd/s3-gw/app.go
|
||||
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/s3-gw/app_settings.go
|
||||
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/s3-gw/app_settings.go
|
||||
FailedToParseDefaultLocationConstraint = "failed to parse 'default' location constraint, default one will be used" // Warn in cmd/s3-gw/app_settings.go
|
||||
FailedToReadRegionMapFilePolicies = "failed to read region map file, policies will be empty" // Warn in cmd/s3-gw/app_settings.go
|
||||
DefaultLocationConstraintCantBeOverriden = "'default' location constraint can't be overriden by custom policy, use 'placement_policy.default'" // Warn in cmd/s3-gw/app_settings.go
|
||||
FailedToParseLocationConstraint = "failed to parse location constraint, it cannot be used" // Warn in cmd/s3-gw/app_settings.go
|
||||
FailedToParseDefaultCopiesNumbers = "failed to parse 'default' copies numbers, default one will be used" // Warn in cmd/s3-gw/app_settings.go
|
||||
FailedToParseCopiesNumbers = "failed to parse copies numbers, skip" // Warn in cmd/s3-gw/app_settings.go
|
||||
ConstraintAdded = "constraint added" // Info in ../../cmd/s3-gw/app_settings.go
|
||||
SkipEmptyAddress = "skip, empty address" // Warn in ../../cmd/s3-gw/app_settings.go
|
||||
AddedStoragePeer = "added storage peer" // Info in ../../cmd/s3-gw/app_settings.go
|
||||
|
@ -101,7 +102,6 @@ const (
|
|||
CouldntGenerateRandomKey = "couldn't generate random key" // Fatal in ../../cmd/s3-gw/app.go
|
||||
FailedToEnableNotifications = "failed to enable notifications" // Fatal in ../../cmd/s3-gw/app.go
|
||||
CouldntInitializeLayer = "couldn't initialize layer" // Fatal in ../../cmd/s3-gw/app.go
|
||||
FailedToCreateNewPolicyMapping = "failed to create new policy mapping" // Fatal in ../../cmd/s3-gw/app.go
|
||||
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../cmd/s3-gw/app.go
|
||||
CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key" // Fatal in ../../cmd/s3-gw/app.go
|
||||
FailedToCreateConnectionPool = "failed to create connection pool" // Fatal in ../../cmd/s3-gw/app.go
|
||||
|
|
Loading…
Reference in a new issue