Compare commits

...

4 commits

Author SHA1 Message Date
6a9d3261a7 [#117] Refactor fetch/parse config parameters functions
Signed-off-by: Roman Loginov <r.loginov@yadro.com>
2023-08-22 08:05:26 +00:00
012ece40bb [#180] Fix linter issues
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2023-08-21 17:23:24 +03:00
c750c87a61 [#51] metrics: Add a metric of currently used nodes
Signed-off-by: Artem Tataurov <a.tataurov@yadro.com>
2023-08-17 14:26:25 +03:00
94a42fa273 [#51] Update frostfs-sdk-go
Signed-off-by: Artem Tataurov <a.tataurov@yadro.com>
2023-08-17 14:26:25 +03:00
15 changed files with 234 additions and 160 deletions

View file

@ -15,6 +15,7 @@ This document outlines major changes between releases.
- Fix parsing signed headers in presigned urls (#182) - Fix parsing signed headers in presigned urls (#182)
### Added ### Added
- Add a metric with addresses of nodes of the same and highest priority that are currently healthy (#51)
- Support dump metrics descriptions (#80) - Support dump metrics descriptions (#80)
- Add `copies_numbers` section to `placement_policy` in config file and support vectors of copies numbers (#70, #101) - Add `copies_numbers` section to `placement_policy` in config file and support vectors of copies numbers (#70, #101)
- Support impersonate bearer token (#81, #105) - Support impersonate bearer token (#81, #105)

View file

@ -36,7 +36,7 @@ func (m credentialsMock) addBox(addr oid.Address, box *accessbox.Box) {
func (m credentialsMock) GetBox(_ context.Context, addr oid.Address) (*accessbox.Box, error) { func (m credentialsMock) GetBox(_ context.Context, addr oid.Address) (*accessbox.Box, error) {
box, ok := m.boxes[addr.String()] box, ok := m.boxes[addr.String()]
if !ok { if !ok {
return nil, apistatus.ObjectNotFound{} return nil, &apistatus.ObjectNotFound{}
} }
return box, nil return box, nil

View file

@ -150,8 +150,7 @@ func isErrObjectLocked(err error) bool {
switch err.(type) { switch err.(type) {
default: default:
return strings.Contains(err.Error(), "object is locked") return strings.Contains(err.Error(), "object is locked")
case apistatus.ObjectLocked, case *apistatus.ObjectLocked:
*apistatus.ObjectLocked:
return true return true
} }
} }

View file

@ -28,7 +28,7 @@ func TestDeleteBucketOnAlreadyRemovedError(t *testing.T) {
putObject(t, hc, bktName, objName) putObject(t, hc, bktName, objName)
addr := getAddressOfLastVersion(hc, bktInfo, objName) addr := getAddressOfLastVersion(hc, bktInfo, objName)
hc.tp.SetObjectError(addr, apistatus.ObjectAlreadyRemoved{}) hc.tp.SetObjectError(addr, &apistatus.ObjectAlreadyRemoved{})
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}}) deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
@ -73,7 +73,7 @@ func TestDeleteBucketOnNotFoundError(t *testing.T) {
var addr oid.Address var addr oid.Address
addr.SetContainer(bktInfo.CID) addr.SetContainer(bktInfo.CID)
addr.SetObject(nodeVersion.OID) addr.SetObject(nodeVersion.OID)
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{}) hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{})
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}}) deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})

View file

@ -192,8 +192,8 @@ func TestGetObject(t *testing.T) {
checkFound(hc.t, hc, bktName, objName, emptyVersion) checkFound(hc.t, hc, bktName, objName, emptyVersion)
addr := getAddressOfLastVersion(hc, bktInfo, objName) addr := getAddressOfLastVersion(hc, bktInfo, objName)
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{}) hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{})
hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{}) hc.tp.SetObjectError(objInfo.Address(), &apistatus.ObjectNotFound{})
getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion) getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey) getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)

View file

@ -120,8 +120,8 @@ func TestHeadObject(t *testing.T) {
checkFound(hc.t, hc, bktName, objName, emptyVersion) checkFound(hc.t, hc, bktName, objName, emptyVersion)
addr := getAddressOfLastVersion(hc, bktInfo, objName) addr := getAddressOfLastVersion(hc, bktInfo, objName)
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{}) hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{})
hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{}) hc.tp.SetObjectError(objInfo.Address(), &apistatus.ObjectNotFound{})
headObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion) headObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
headObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey) headObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)

View file

@ -198,7 +198,7 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
}, nil }, nil
} }
return nil, fmt.Errorf("%w: %s", apistatus.ObjectNotFound{}, addr) return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
} }
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) { func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {

View file

@ -8,7 +8,6 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
@ -189,15 +188,6 @@ func (s *appSettings) setBypassContentEncodingInChunks(bypass bool) {
s.bypassContentEncodingInChunks.Store(bypass) s.bypassContentEncodingInChunks.Store(bypass)
} }
func getDefaultPolicyValue(v *viper.Viper) string {
defaultPolicyStr := handler.DefaultPolicy
if v.IsSet(cfgPolicyDefault) {
defaultPolicyStr = v.GetString(cfgPolicyDefault)
}
return defaultPolicyStr
}
func (a *App) initAPI(ctx context.Context) { func (a *App) initAPI(ctx context.Context) {
a.initLayer(ctx) a.initLayer(ctx)
a.initHandler() a.initHandler()
@ -270,15 +260,9 @@ func (a *App) shutdownTracing() {
func newMaxClients(cfg *viper.Viper) maxClientsConfig { func newMaxClients(cfg *viper.Viper) maxClientsConfig {
config := maxClientsConfig{} config := maxClientsConfig{}
config.count = cfg.GetInt(cfgMaxClientsCount) config.count = fetchMaxClientsCount(cfg)
if config.count <= 0 {
config.count = defaultMaxClientsCount
}
config.deadline = cfg.GetDuration(cfgMaxClientsDeadline) config.deadline = fetchMaxClientsDeadline(cfg)
if config.deadline <= 0 {
config.deadline = defaultMaxClientsDeadline
}
return config return config
} }
@ -302,38 +286,23 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
prmTree.AddNode(peer) prmTree.AddNode(peer)
} }
connTimeout := cfg.GetDuration(cfgConnectTimeout) connTimeout := fetchConnectTimeout(cfg)
if connTimeout <= 0 {
connTimeout = defaultConnectTimeout
}
prm.SetNodeDialTimeout(connTimeout) prm.SetNodeDialTimeout(connTimeout)
prmTree.SetNodeDialTimeout(connTimeout) prmTree.SetNodeDialTimeout(connTimeout)
streamTimeout := cfg.GetDuration(cfgStreamTimeout) streamTimeout := fetchStreamTimeout(cfg)
if streamTimeout <= 0 {
streamTimeout = defaultStreamTimeout
}
prm.SetNodeStreamTimeout(streamTimeout) prm.SetNodeStreamTimeout(streamTimeout)
prmTree.SetNodeStreamTimeout(streamTimeout) prmTree.SetNodeStreamTimeout(streamTimeout)
healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout) healthCheckTimeout := fetchHealthCheckTimeout(cfg)
if healthCheckTimeout <= 0 {
healthCheckTimeout = defaultHealthcheckTimeout
}
prm.SetHealthcheckTimeout(healthCheckTimeout) prm.SetHealthcheckTimeout(healthCheckTimeout)
prmTree.SetHealthcheckTimeout(healthCheckTimeout) prmTree.SetHealthcheckTimeout(healthCheckTimeout)
rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval) rebalanceInterval := fetchRebalanceInterval(cfg)
if rebalanceInterval <= 0 {
rebalanceInterval = defaultRebalanceInterval
}
prm.SetClientRebalanceInterval(rebalanceInterval) prm.SetClientRebalanceInterval(rebalanceInterval)
prmTree.SetClientRebalanceInterval(rebalanceInterval) prmTree.SetClientRebalanceInterval(rebalanceInterval)
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold) errorThreshold := fetchErrorThreshold(cfg)
if errorThreshold <= 0 {
errorThreshold = defaultPoolErrorThreshold
}
prm.SetErrorThreshold(errorThreshold) prm.SetErrorThreshold(errorThreshold)
prm.SetLogger(logger) prm.SetLogger(logger)
prmTree.SetLogger(logger) prmTree.SetLogger(logger)
@ -380,7 +349,7 @@ func newPlacementPolicy(l *zap.Logger, v *viper.Viper) (*placementPolicy, error)
policies.updateCopiesNumbers(l, v) policies.updateCopiesNumbers(l, v)
policies.updateDefaultCopiesNumbers(l, v) policies.updateDefaultCopiesNumbers(l, v)
return policies, policies.updatePolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile)) return policies, policies.updatePolicy(v)
} }
func (p *placementPolicy) DefaultPlacementPolicy() netmap.PlacementPolicy { func (p *placementPolicy) DefaultPlacementPolicy() netmap.PlacementPolicy {
@ -412,7 +381,7 @@ func (p *placementPolicy) DefaultCopiesNumbers() []uint32 {
} }
func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) { func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) {
if err := p.updatePolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile)); err != nil { if err := p.updatePolicy(v); err != nil {
l.Warn("policies won't be updated", zap.Error(err)) l.Warn("policies won't be updated", zap.Error(err))
} }
@ -420,31 +389,15 @@ func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) {
p.updateDefaultCopiesNumbers(l, v) p.updateDefaultCopiesNumbers(l, v)
} }
func (p *placementPolicy) updatePolicy(defaultPolicy string, regionPolicyFilepath string) error { func (p *placementPolicy) updatePolicy(v *viper.Viper) error {
var defaultPlacementPolicy netmap.PlacementPolicy defaultPlacementPolicy, err := fetchDefaultPolicy(v)
if err := defaultPlacementPolicy.DecodeString(defaultPolicy); err != nil {
return fmt.Errorf("parse default policy '%s': %w", defaultPolicy, err)
}
regionPolicyMap, err := readRegionMap(regionPolicyFilepath)
if err != nil { if err != nil {
return fmt.Errorf("read region map file: %w", err) return err
} }
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap)) regionMap, err := fetchRegionMappingPolicies(v)
for region, policy := range regionPolicyMap { if err != nil {
var pp netmap.PlacementPolicy return err
if err = pp.DecodeString(policy); err == nil {
regionMap[region] = pp
continue
}
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
regionMap[region] = pp
continue
}
return fmt.Errorf("parse region '%s' to policy mapping: %w", region, err)
} }
p.mu.Lock() p.mu.Lock()
@ -699,14 +652,7 @@ func (a *App) stopServices() {
func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options { func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options {
cfg := notifications.Options{} cfg := notifications.Options{}
cfg.URL = v.GetString(cfgNATSEndpoint) cfg.URL = v.GetString(cfgNATSEndpoint)
cfg.Timeout = v.GetDuration(cfgNATSTimeout) cfg.Timeout = fetchNATSTimeout(v, l)
if cfg.Timeout <= 0 {
l.Error("invalid lifetime, using default value (in seconds)",
zap.String("parameter", cfgNATSTimeout),
zap.Duration("value in config", cfg.Timeout),
zap.Duration("default", notifications.DefaultTimeout))
cfg.Timeout = notifications.DefaultTimeout
}
cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile) cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile)
cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile) cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile)
cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles) cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles)
@ -717,62 +663,32 @@ func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Optio
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig { func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg := layer.DefaultCachesConfigs(l) cacheCfg := layer.DefaultCachesConfigs(l)
cacheCfg.Objects.Lifetime = getLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime) cacheCfg.Objects.Lifetime = fetchCacheLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime)
cacheCfg.Objects.Size = getSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size) cacheCfg.Objects.Size = fetchCacheSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size)
cacheCfg.ObjectsList.Lifetime = getLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime) cacheCfg.ObjectsList.Lifetime = fetchCacheLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
cacheCfg.ObjectsList.Size = getSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size) cacheCfg.ObjectsList.Size = fetchCacheSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
cacheCfg.Buckets.Lifetime = getLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime) cacheCfg.Buckets.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
cacheCfg.Buckets.Size = getSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size) cacheCfg.Buckets.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)
cacheCfg.Names.Lifetime = getLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime) cacheCfg.Names.Lifetime = fetchCacheLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime)
cacheCfg.Names.Size = getSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size) cacheCfg.Names.Size = fetchCacheSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size)
cacheCfg.System.Lifetime = getLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime) cacheCfg.System.Lifetime = fetchCacheLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime)
cacheCfg.System.Size = getSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size) cacheCfg.System.Size = fetchCacheSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size)
cacheCfg.AccessControl.Lifetime = getLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime) cacheCfg.AccessControl.Lifetime = fetchCacheLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
cacheCfg.AccessControl.Size = getSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size) cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
return cacheCfg return cacheCfg
} }
func getLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
if v.IsSet(cfgEntry) {
lifetime := v.GetDuration(cfgEntry)
if lifetime <= 0 {
l.Error("invalid lifetime, using default value (in seconds)",
zap.String("parameter", cfgEntry),
zap.Duration("value in config", lifetime),
zap.Duration("default", defaultValue))
} else {
return lifetime
}
}
return defaultValue
}
func getSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
if v.IsSet(cfgEntry) {
size := v.GetInt(cfgEntry)
if size <= 0 {
l.Error("invalid cache size, using default value",
zap.String("parameter", cfgEntry),
zap.Int("value in config", size),
zap.Int("default", defaultValue))
} else {
return size
}
}
return defaultValue
}
func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config { func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
cacheCfg := cache.DefaultAccessBoxConfig(l) cacheCfg := cache.DefaultAccessBoxConfig(l)
cacheCfg.Lifetime = getLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime) cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime)
cacheCfg.Size = getSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size) cacheCfg.Size = fetchCacheSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size)
return cacheCfg return cacheCfg
} }
@ -780,22 +696,11 @@ func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
func (a *App) initHandler() { func (a *App) initHandler() {
cfg := &handler.Config{ cfg := &handler.Config{
Policy: a.settings.policies, Policy: a.settings.policies,
DefaultMaxAge: handler.DefaultMaxAge, DefaultMaxAge: fetchDefaultMaxAge(a.cfg, a.log),
NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS), NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS),
XMLDecoder: a.settings.xmlDecoder, XMLDecoder: a.settings.xmlDecoder,
} }
if a.cfg.IsSet(cfgDefaultMaxAge) {
defaultMaxAge := a.cfg.GetInt(cfgDefaultMaxAge)
if defaultMaxAge <= 0 && defaultMaxAge != -1 {
a.log.Fatal("invalid defaultMaxAge",
zap.String("parameter", cfgDefaultMaxAge),
zap.String("value in config", strconv.Itoa(defaultMaxAge)))
}
cfg.DefaultMaxAge = defaultMaxAge
}
cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketAllow) cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketAllow)
cfg.IsResolveListAllow = len(cfg.ResolveZoneList) > 0 cfg.IsResolveListAllow = len(cfg.ResolveZoneList) > 0
if !cfg.IsResolveListAllow { if !cfg.IsResolveListAllow {

View file

@ -10,8 +10,11 @@ import (
"strings" "strings"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"github.com/spf13/viper" "github.com/spf13/viper"
@ -156,6 +159,169 @@ var ignore = map[string]struct{}{
cmdVersion: {}, cmdVersion: {},
} }
func fetchConnectTimeout(cfg *viper.Viper) time.Duration {
connTimeout := cfg.GetDuration(cfgConnectTimeout)
if connTimeout <= 0 {
connTimeout = defaultConnectTimeout
}
return connTimeout
}
func fetchStreamTimeout(cfg *viper.Viper) time.Duration {
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
if streamTimeout <= 0 {
streamTimeout = defaultStreamTimeout
}
return streamTimeout
}
func fetchHealthCheckTimeout(cfg *viper.Viper) time.Duration {
healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout)
if healthCheckTimeout <= 0 {
healthCheckTimeout = defaultHealthcheckTimeout
}
return healthCheckTimeout
}
func fetchRebalanceInterval(cfg *viper.Viper) time.Duration {
rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval)
if rebalanceInterval <= 0 {
rebalanceInterval = defaultRebalanceInterval
}
return rebalanceInterval
}
func fetchErrorThreshold(cfg *viper.Viper) uint32 {
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
if errorThreshold <= 0 {
errorThreshold = defaultPoolErrorThreshold
}
return errorThreshold
}
func fetchMaxClientsCount(cfg *viper.Viper) int {
maxClientsCount := cfg.GetInt(cfgMaxClientsCount)
if maxClientsCount <= 0 {
maxClientsCount = defaultMaxClientsCount
}
return maxClientsCount
}
func fetchMaxClientsDeadline(cfg *viper.Viper) time.Duration {
maxClientsDeadline := cfg.GetDuration(cfgMaxClientsDeadline)
if maxClientsDeadline <= 0 {
maxClientsDeadline = defaultMaxClientsDeadline
}
return maxClientsDeadline
}
func fetchDefaultPolicy(cfg *viper.Viper) (netmap.PlacementPolicy, error) {
defaultPolicyStr := handler.DefaultPolicy
if cfg.IsSet(cfgPolicyDefault) {
defaultPolicyStr = cfg.GetString(cfgPolicyDefault)
}
var defaultPlacementPolicy netmap.PlacementPolicy
if err := defaultPlacementPolicy.DecodeString(defaultPolicyStr); err != nil {
return netmap.PlacementPolicy{}, fmt.Errorf("parse default policy '%s': %w", defaultPolicyStr, err)
}
return defaultPlacementPolicy, nil
}
func fetchNATSTimeout(cfg *viper.Viper, l *zap.Logger) time.Duration {
timeout := cfg.GetDuration(cfgNATSTimeout)
if timeout <= 0 {
l.Error("invalid lifetime, using default value (in seconds)",
zap.String("parameter", cfgNATSTimeout),
zap.Duration("value in config", timeout),
zap.Duration("default", notifications.DefaultTimeout))
timeout = notifications.DefaultTimeout
}
return timeout
}
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
if v.IsSet(cfgEntry) {
lifetime := v.GetDuration(cfgEntry)
if lifetime <= 0 {
l.Error("invalid lifetime, using default value (in seconds)",
zap.String("parameter", cfgEntry),
zap.Duration("value in config", lifetime),
zap.Duration("default", defaultValue))
} else {
return lifetime
}
}
return defaultValue
}
func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
if v.IsSet(cfgEntry) {
size := v.GetInt(cfgEntry)
if size <= 0 {
l.Error("invalid cache size, using default value",
zap.String("parameter", cfgEntry),
zap.Int("value in config", size),
zap.Int("default", defaultValue))
} else {
return size
}
}
return defaultValue
}
func fetchDefaultMaxAge(cfg *viper.Viper, l *zap.Logger) int {
defaultMaxAge := handler.DefaultMaxAge
if cfg.IsSet(cfgDefaultMaxAge) {
defaultMaxAge = cfg.GetInt(cfgDefaultMaxAge)
if defaultMaxAge <= 0 && defaultMaxAge != -1 {
l.Fatal("invalid defaultMaxAge",
zap.String("parameter", cfgDefaultMaxAge),
zap.String("value in config", strconv.Itoa(defaultMaxAge)))
}
}
return defaultMaxAge
}
func fetchRegionMappingPolicies(cfg *viper.Viper) (map[string]netmap.PlacementPolicy, error) {
regionPolicyMap, err := readRegionMap(cfg.GetString(cfgPolicyRegionMapFile))
if err != nil {
return nil, fmt.Errorf("read region map file: %w", err)
}
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap))
for region, policy := range regionPolicyMap {
var pp netmap.PlacementPolicy
if err = pp.DecodeString(policy); err == nil {
regionMap[region] = pp
continue
}
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
regionMap[region] = pp
continue
}
return nil, fmt.Errorf("parse region '%s' to policy mapping: %w", region, err)
}
return regionMap, nil
}
func fetchDefaultCopiesNumbers(v *viper.Viper) ([]uint32, error) { func fetchDefaultCopiesNumbers(v *viper.Viper) ([]uint32, error) {
unparsed := v.GetStringSlice(cfgSetCopiesNumber) unparsed := v.GetStringSlice(cfgSetCopiesNumber)
var result []uint32 var result []uint32

2
go.mod
View file

@ -5,7 +5,7 @@ go 1.19
require ( require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44 git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821090303-202412230a05
github.com/aws/aws-sdk-go v1.44.6 github.com/aws/aws-sdk-go v1.44.6
github.com/bluele/gcache v0.0.2 github.com/bluele/gcache v0.0.2
github.com/go-chi/chi/v5 v5.0.8 github.com/go-chi/chi/v5 v5.0.8

6
go.sum
View file

@ -44,8 +44,10 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU= git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6 h1:u6lzNotV6MEMNEG/XeS7g+FjPrrf+j4gnOHtvun2KJc= git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821073319-342524159ac3 h1:GBRTOTRrtIvxi2TgxG7z/J7uRXiyb1SxR4247FaYCgU=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6/go.mod h1:LI2GOj0pEx0jYTjB3QHja2PNhQFYL2pCm71RAFwDv0M= git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821073319-342524159ac3/go.mod h1:t1akKcUH7iBrFHX8rSXScYMP17k2kYQXMbZooiL5Juw=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821090303-202412230a05 h1:OuViMF54N87FXmaBEpYw3jhzaLrJ/EWOlPL1wUkimE0=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821090303-202412230a05/go.mod h1:t1akKcUH7iBrFHX8rSXScYMP17k2kYQXMbZooiL5Juw=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA= git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=

View file

@ -25,8 +25,6 @@ func IsErrObjectAccessDenied(err error) (string, bool) {
switch err := err.(type) { switch err := err.(type) {
default: default:
return "", false return "", false
case apistatus.ObjectAccessDenied:
return err.Reason(), true
case *apistatus.ObjectAccessDenied: case *apistatus.ObjectAccessDenied:
return err.Reason(), true return err.Reason(), true
} }

View file

@ -41,9 +41,7 @@ const (
// NewFrostFS creates new FrostFS using provided pool.Pool. // NewFrostFS creates new FrostFS using provided pool.Pool.
func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS { func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
var await pool.WaitParams await := pool.WaitParams{PollInterval: defaultPollInterval, Timeout: defaultPollTimeout}
await.SetPollInterval(defaultPollInterval)
await.SetTimeout(defaultPollTimeout)
var owner user.ID var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey) user.IDFromKey(&owner, key.PrivateKey.PublicKey)
@ -93,8 +91,7 @@ func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (u
// Container implements frostfs.FrostFS interface method. // Container implements frostfs.FrostFS interface method.
func (x *FrostFS) Container(ctx context.Context, idCnr cid.ID) (*container.Container, error) { func (x *FrostFS) Container(ctx context.Context, idCnr cid.ID) (*container.Container, error) {
var prm pool.PrmContainerGet prm := pool.PrmContainerGet{ContainerID: idCnr}
prm.SetContainerID(idCnr)
res, err := x.pool.GetContainer(ctx, prm) res, err := x.pool.GetContainer(ctx, prm)
if err != nil { if err != nil {
@ -167,13 +164,7 @@ func (x *FrostFS) UserContainers(ctx context.Context, id user.ID) ([]cid.ID, err
// SetContainerEACL implements frostfs.FrostFS interface method. // SetContainerEACL implements frostfs.FrostFS interface method.
func (x *FrostFS) SetContainerEACL(ctx context.Context, table eacl.Table, sessionToken *session.Container) error { func (x *FrostFS) SetContainerEACL(ctx context.Context, table eacl.Table, sessionToken *session.Container) error {
var prm pool.PrmContainerSetEACL prm := pool.PrmContainerSetEACL{Table: table, Session: sessionToken, WaitParams: &x.await}
prm.SetTable(table)
prm.SetWaitParams(x.await)
if sessionToken != nil {
prm.WithinSession(*sessionToken)
}
err := x.pool.SetEACL(ctx, prm) err := x.pool.SetEACL(ctx, prm)
return handleObjectError("save eACL via connection pool", err) return handleObjectError("save eACL via connection pool", err)
@ -194,13 +185,7 @@ func (x *FrostFS) ContainerEACL(ctx context.Context, id cid.ID) (*eacl.Table, er
// DeleteContainer implements frostfs.FrostFS interface method. // DeleteContainer implements frostfs.FrostFS interface method.
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error { func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
var prm pool.PrmContainerDelete prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
prm.SetContainerID(id)
prm.SetWaitParams(x.await)
if token != nil {
prm.SetSessionToken(*token)
}
err := x.pool.DeleteContainer(ctx, prm) err := x.pool.DeleteContainer(ctx, prm)
return handleObjectError("delete container via connection pool", err) return handleObjectError("delete container via connection pool", err)

View file

@ -48,6 +48,14 @@ var appMetricsDesc = map[string]map[string]Description{
Help: "Average request duration (in milliseconds) for specific method on node in pool", Help: "Average request duration (in milliseconds) for specific method on node in pool",
VariableLabels: []string{"node", "method"}, VariableLabels: []string{"node", "method"},
}, },
currentNodesMetric: Description{
Type: dto.MetricType_GAUGE,
Namespace: namespace,
Subsystem: poolSubsystem,
Name: currentNodesMetric,
Help: "Addresses of nodes of the same and highest priority that are currently healthy",
VariableLabels: []string{"address"},
},
}, },
billingSubsystem: { billingSubsystem: {
userRequestsMetric: Description{ userRequestsMetric: Description{

View file

@ -15,6 +15,7 @@ const (
overallNodeRequestsMetric = "overall_node_requests" overallNodeRequestsMetric = "overall_node_requests"
currentErrorMetric = "current_errors" currentErrorMetric = "current_errors"
avgRequestDurationMetric = "avg_request_duration" avgRequestDurationMetric = "avg_request_duration"
currentNodesMetric = "current_nodes"
) )
const ( const (
@ -42,6 +43,7 @@ type poolMetricsCollector struct {
overallNodeRequests *prometheus.GaugeVec overallNodeRequests *prometheus.GaugeVec
currentErrors *prometheus.GaugeVec currentErrors *prometheus.GaugeVec
requestDuration *prometheus.GaugeVec requestDuration *prometheus.GaugeVec
currentNodes *prometheus.GaugeVec
} }
func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector { func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
@ -52,6 +54,7 @@ func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
overallNodeRequests: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeRequestsMetric]), overallNodeRequests: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeRequestsMetric]),
currentErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentErrorMetric]), currentErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentErrorMetric]),
requestDuration: mustNewGaugeVec(appMetricsDesc[poolSubsystem][avgRequestDurationMetric]), requestDuration: mustNewGaugeVec(appMetricsDesc[poolSubsystem][avgRequestDurationMetric]),
currentNodes: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentNodesMetric]),
} }
} }
@ -62,6 +65,7 @@ func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
m.overallNodeRequests.Collect(ch) m.overallNodeRequests.Collect(ch)
m.currentErrors.Collect(ch) m.currentErrors.Collect(ch)
m.requestDuration.Collect(ch) m.requestDuration.Collect(ch)
m.currentNodes.Collect(ch)
} }
func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) { func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
@ -70,6 +74,7 @@ func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
m.overallNodeRequests.Describe(descs) m.overallNodeRequests.Describe(descs)
m.currentErrors.Describe(descs) m.currentErrors.Describe(descs)
m.requestDuration.Describe(descs) m.requestDuration.Describe(descs)
m.currentNodes.Describe(descs)
} }
func (m *poolMetricsCollector) updateStatistic() { func (m *poolMetricsCollector) updateStatistic() {
@ -79,6 +84,7 @@ func (m *poolMetricsCollector) updateStatistic() {
m.overallNodeRequests.Reset() m.overallNodeRequests.Reset()
m.currentErrors.Reset() m.currentErrors.Reset()
m.requestDuration.Reset() m.requestDuration.Reset()
m.currentNodes.Reset()
for _, node := range stat.Nodes() { for _, node := range stat.Nodes() {
m.overallNodeErrors.WithLabelValues(node.Address()).Set(float64(node.OverallErrors())) m.overallNodeErrors.WithLabelValues(node.Address()).Set(float64(node.OverallErrors()))
@ -88,6 +94,10 @@ func (m *poolMetricsCollector) updateStatistic() {
m.updateRequestsDuration(node) m.updateRequestsDuration(node)
} }
for _, addr := range stat.CurrentNodes() {
m.currentNodes.WithLabelValues(addr).Set(1)
}
m.overallErrors.Set(float64(stat.OverallErrors())) m.overallErrors.Set(float64(stat.OverallErrors()))
} }