forked from TrueCloudLab/frostfs-s3-gw
Compare commits
No commits in common. "6a9d3261a760310bc882d475880aa08d39cfc14b" and "40d7f844e3104a0d7acd5f9df8e802993799bbbb" have entirely different histories.
6a9d3261a7
...
40d7f844e3
15 changed files with 160 additions and 234 deletions
|
@ -15,7 +15,6 @@ This document outlines major changes between releases.
|
||||||
- Fix parsing signed headers in presigned urls (#182)
|
- Fix parsing signed headers in presigned urls (#182)
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- Add a metric with addresses of nodes of the same and highest priority that are currently healthy (#51)
|
|
||||||
- Support dump metrics descriptions (#80)
|
- Support dump metrics descriptions (#80)
|
||||||
- Add `copies_numbers` section to `placement_policy` in config file and support vectors of copies numbers (#70, #101)
|
- Add `copies_numbers` section to `placement_policy` in config file and support vectors of copies numbers (#70, #101)
|
||||||
- Support impersonate bearer token (#81, #105)
|
- Support impersonate bearer token (#81, #105)
|
||||||
|
|
|
@ -36,7 +36,7 @@ func (m credentialsMock) addBox(addr oid.Address, box *accessbox.Box) {
|
||||||
func (m credentialsMock) GetBox(_ context.Context, addr oid.Address) (*accessbox.Box, error) {
|
func (m credentialsMock) GetBox(_ context.Context, addr oid.Address) (*accessbox.Box, error) {
|
||||||
box, ok := m.boxes[addr.String()]
|
box, ok := m.boxes[addr.String()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, &apistatus.ObjectNotFound{}
|
return nil, apistatus.ObjectNotFound{}
|
||||||
}
|
}
|
||||||
|
|
||||||
return box, nil
|
return box, nil
|
||||||
|
|
|
@ -150,7 +150,8 @@ func isErrObjectLocked(err error) bool {
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
default:
|
default:
|
||||||
return strings.Contains(err.Error(), "object is locked")
|
return strings.Contains(err.Error(), "object is locked")
|
||||||
case *apistatus.ObjectLocked:
|
case apistatus.ObjectLocked,
|
||||||
|
*apistatus.ObjectLocked:
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ func TestDeleteBucketOnAlreadyRemovedError(t *testing.T) {
|
||||||
putObject(t, hc, bktName, objName)
|
putObject(t, hc, bktName, objName)
|
||||||
|
|
||||||
addr := getAddressOfLastVersion(hc, bktInfo, objName)
|
addr := getAddressOfLastVersion(hc, bktInfo, objName)
|
||||||
hc.tp.SetObjectError(addr, &apistatus.ObjectAlreadyRemoved{})
|
hc.tp.SetObjectError(addr, apistatus.ObjectAlreadyRemoved{})
|
||||||
|
|
||||||
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
||||||
|
|
||||||
|
@ -73,7 +73,7 @@ func TestDeleteBucketOnNotFoundError(t *testing.T) {
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(bktInfo.CID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
addr.SetObject(nodeVersion.OID)
|
addr.SetObject(nodeVersion.OID)
|
||||||
hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{})
|
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{})
|
||||||
|
|
||||||
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
||||||
|
|
||||||
|
|
|
@ -192,8 +192,8 @@ func TestGetObject(t *testing.T) {
|
||||||
checkFound(hc.t, hc, bktName, objName, emptyVersion)
|
checkFound(hc.t, hc, bktName, objName, emptyVersion)
|
||||||
|
|
||||||
addr := getAddressOfLastVersion(hc, bktInfo, objName)
|
addr := getAddressOfLastVersion(hc, bktInfo, objName)
|
||||||
hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{})
|
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{})
|
||||||
hc.tp.SetObjectError(objInfo.Address(), &apistatus.ObjectNotFound{})
|
hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{})
|
||||||
|
|
||||||
getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
|
getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
|
||||||
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)
|
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)
|
||||||
|
|
|
@ -120,8 +120,8 @@ func TestHeadObject(t *testing.T) {
|
||||||
checkFound(hc.t, hc, bktName, objName, emptyVersion)
|
checkFound(hc.t, hc, bktName, objName, emptyVersion)
|
||||||
|
|
||||||
addr := getAddressOfLastVersion(hc, bktInfo, objName)
|
addr := getAddressOfLastVersion(hc, bktInfo, objName)
|
||||||
hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{})
|
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{})
|
||||||
hc.tp.SetObjectError(objInfo.Address(), &apistatus.ObjectNotFound{})
|
hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{})
|
||||||
|
|
||||||
headObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
|
headObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
|
||||||
headObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)
|
headObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)
|
||||||
|
|
|
@ -198,7 +198,7 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
|
return nil, fmt.Errorf("%w: %s", apistatus.ObjectNotFound{}, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
||||||
|
|
157
cmd/s3-gw/app.go
157
cmd/s3-gw/app.go
|
@ -8,6 +8,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
@ -188,6 +189,15 @@ func (s *appSettings) setBypassContentEncodingInChunks(bypass bool) {
|
||||||
s.bypassContentEncodingInChunks.Store(bypass)
|
s.bypassContentEncodingInChunks.Store(bypass)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getDefaultPolicyValue(v *viper.Viper) string {
|
||||||
|
defaultPolicyStr := handler.DefaultPolicy
|
||||||
|
if v.IsSet(cfgPolicyDefault) {
|
||||||
|
defaultPolicyStr = v.GetString(cfgPolicyDefault)
|
||||||
|
}
|
||||||
|
|
||||||
|
return defaultPolicyStr
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) initAPI(ctx context.Context) {
|
func (a *App) initAPI(ctx context.Context) {
|
||||||
a.initLayer(ctx)
|
a.initLayer(ctx)
|
||||||
a.initHandler()
|
a.initHandler()
|
||||||
|
@ -260,9 +270,15 @@ func (a *App) shutdownTracing() {
|
||||||
func newMaxClients(cfg *viper.Viper) maxClientsConfig {
|
func newMaxClients(cfg *viper.Viper) maxClientsConfig {
|
||||||
config := maxClientsConfig{}
|
config := maxClientsConfig{}
|
||||||
|
|
||||||
config.count = fetchMaxClientsCount(cfg)
|
config.count = cfg.GetInt(cfgMaxClientsCount)
|
||||||
|
if config.count <= 0 {
|
||||||
|
config.count = defaultMaxClientsCount
|
||||||
|
}
|
||||||
|
|
||||||
config.deadline = fetchMaxClientsDeadline(cfg)
|
config.deadline = cfg.GetDuration(cfgMaxClientsDeadline)
|
||||||
|
if config.deadline <= 0 {
|
||||||
|
config.deadline = defaultMaxClientsDeadline
|
||||||
|
}
|
||||||
|
|
||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
@ -286,23 +302,38 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
|
||||||
prmTree.AddNode(peer)
|
prmTree.AddNode(peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
connTimeout := fetchConnectTimeout(cfg)
|
connTimeout := cfg.GetDuration(cfgConnectTimeout)
|
||||||
|
if connTimeout <= 0 {
|
||||||
|
connTimeout = defaultConnectTimeout
|
||||||
|
}
|
||||||
prm.SetNodeDialTimeout(connTimeout)
|
prm.SetNodeDialTimeout(connTimeout)
|
||||||
prmTree.SetNodeDialTimeout(connTimeout)
|
prmTree.SetNodeDialTimeout(connTimeout)
|
||||||
|
|
||||||
streamTimeout := fetchStreamTimeout(cfg)
|
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
|
||||||
|
if streamTimeout <= 0 {
|
||||||
|
streamTimeout = defaultStreamTimeout
|
||||||
|
}
|
||||||
prm.SetNodeStreamTimeout(streamTimeout)
|
prm.SetNodeStreamTimeout(streamTimeout)
|
||||||
prmTree.SetNodeStreamTimeout(streamTimeout)
|
prmTree.SetNodeStreamTimeout(streamTimeout)
|
||||||
|
|
||||||
healthCheckTimeout := fetchHealthCheckTimeout(cfg)
|
healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout)
|
||||||
|
if healthCheckTimeout <= 0 {
|
||||||
|
healthCheckTimeout = defaultHealthcheckTimeout
|
||||||
|
}
|
||||||
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
||||||
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
||||||
|
|
||||||
rebalanceInterval := fetchRebalanceInterval(cfg)
|
rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval)
|
||||||
|
if rebalanceInterval <= 0 {
|
||||||
|
rebalanceInterval = defaultRebalanceInterval
|
||||||
|
}
|
||||||
prm.SetClientRebalanceInterval(rebalanceInterval)
|
prm.SetClientRebalanceInterval(rebalanceInterval)
|
||||||
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
||||||
|
|
||||||
errorThreshold := fetchErrorThreshold(cfg)
|
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
|
||||||
|
if errorThreshold <= 0 {
|
||||||
|
errorThreshold = defaultPoolErrorThreshold
|
||||||
|
}
|
||||||
prm.SetErrorThreshold(errorThreshold)
|
prm.SetErrorThreshold(errorThreshold)
|
||||||
prm.SetLogger(logger)
|
prm.SetLogger(logger)
|
||||||
prmTree.SetLogger(logger)
|
prmTree.SetLogger(logger)
|
||||||
|
@ -349,7 +380,7 @@ func newPlacementPolicy(l *zap.Logger, v *viper.Viper) (*placementPolicy, error)
|
||||||
policies.updateCopiesNumbers(l, v)
|
policies.updateCopiesNumbers(l, v)
|
||||||
policies.updateDefaultCopiesNumbers(l, v)
|
policies.updateDefaultCopiesNumbers(l, v)
|
||||||
|
|
||||||
return policies, policies.updatePolicy(v)
|
return policies, policies.updatePolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *placementPolicy) DefaultPlacementPolicy() netmap.PlacementPolicy {
|
func (p *placementPolicy) DefaultPlacementPolicy() netmap.PlacementPolicy {
|
||||||
|
@ -381,7 +412,7 @@ func (p *placementPolicy) DefaultCopiesNumbers() []uint32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) {
|
func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) {
|
||||||
if err := p.updatePolicy(v); err != nil {
|
if err := p.updatePolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile)); err != nil {
|
||||||
l.Warn("policies won't be updated", zap.Error(err))
|
l.Warn("policies won't be updated", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -389,15 +420,31 @@ func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) {
|
||||||
p.updateDefaultCopiesNumbers(l, v)
|
p.updateDefaultCopiesNumbers(l, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *placementPolicy) updatePolicy(v *viper.Viper) error {
|
func (p *placementPolicy) updatePolicy(defaultPolicy string, regionPolicyFilepath string) error {
|
||||||
defaultPlacementPolicy, err := fetchDefaultPolicy(v)
|
var defaultPlacementPolicy netmap.PlacementPolicy
|
||||||
if err != nil {
|
if err := defaultPlacementPolicy.DecodeString(defaultPolicy); err != nil {
|
||||||
return err
|
return fmt.Errorf("parse default policy '%s': %w", defaultPolicy, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
regionMap, err := fetchRegionMappingPolicies(v)
|
regionPolicyMap, err := readRegionMap(regionPolicyFilepath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("read region map file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap))
|
||||||
|
for region, policy := range regionPolicyMap {
|
||||||
|
var pp netmap.PlacementPolicy
|
||||||
|
if err = pp.DecodeString(policy); err == nil {
|
||||||
|
regionMap[region] = pp
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
|
||||||
|
regionMap[region] = pp
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("parse region '%s' to policy mapping: %w", region, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
|
@ -652,7 +699,14 @@ func (a *App) stopServices() {
|
||||||
func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options {
|
func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options {
|
||||||
cfg := notifications.Options{}
|
cfg := notifications.Options{}
|
||||||
cfg.URL = v.GetString(cfgNATSEndpoint)
|
cfg.URL = v.GetString(cfgNATSEndpoint)
|
||||||
cfg.Timeout = fetchNATSTimeout(v, l)
|
cfg.Timeout = v.GetDuration(cfgNATSTimeout)
|
||||||
|
if cfg.Timeout <= 0 {
|
||||||
|
l.Error("invalid lifetime, using default value (in seconds)",
|
||||||
|
zap.String("parameter", cfgNATSTimeout),
|
||||||
|
zap.Duration("value in config", cfg.Timeout),
|
||||||
|
zap.Duration("default", notifications.DefaultTimeout))
|
||||||
|
cfg.Timeout = notifications.DefaultTimeout
|
||||||
|
}
|
||||||
cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile)
|
cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile)
|
||||||
cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile)
|
cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile)
|
||||||
cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles)
|
cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles)
|
||||||
|
@ -663,32 +717,62 @@ func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Optio
|
||||||
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
|
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
|
||||||
cacheCfg := layer.DefaultCachesConfigs(l)
|
cacheCfg := layer.DefaultCachesConfigs(l)
|
||||||
|
|
||||||
cacheCfg.Objects.Lifetime = fetchCacheLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime)
|
cacheCfg.Objects.Lifetime = getLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime)
|
||||||
cacheCfg.Objects.Size = fetchCacheSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size)
|
cacheCfg.Objects.Size = getSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size)
|
||||||
|
|
||||||
cacheCfg.ObjectsList.Lifetime = fetchCacheLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
|
cacheCfg.ObjectsList.Lifetime = getLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
|
||||||
cacheCfg.ObjectsList.Size = fetchCacheSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
|
cacheCfg.ObjectsList.Size = getSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
|
||||||
|
|
||||||
cacheCfg.Buckets.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
|
cacheCfg.Buckets.Lifetime = getLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
|
||||||
cacheCfg.Buckets.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)
|
cacheCfg.Buckets.Size = getSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)
|
||||||
|
|
||||||
cacheCfg.Names.Lifetime = fetchCacheLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime)
|
cacheCfg.Names.Lifetime = getLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime)
|
||||||
cacheCfg.Names.Size = fetchCacheSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size)
|
cacheCfg.Names.Size = getSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size)
|
||||||
|
|
||||||
cacheCfg.System.Lifetime = fetchCacheLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime)
|
cacheCfg.System.Lifetime = getLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime)
|
||||||
cacheCfg.System.Size = fetchCacheSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size)
|
cacheCfg.System.Size = getSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size)
|
||||||
|
|
||||||
cacheCfg.AccessControl.Lifetime = fetchCacheLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
|
cacheCfg.AccessControl.Lifetime = getLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
|
||||||
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
|
cacheCfg.AccessControl.Size = getSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
|
||||||
|
|
||||||
return cacheCfg
|
return cacheCfg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
||||||
|
if v.IsSet(cfgEntry) {
|
||||||
|
lifetime := v.GetDuration(cfgEntry)
|
||||||
|
if lifetime <= 0 {
|
||||||
|
l.Error("invalid lifetime, using default value (in seconds)",
|
||||||
|
zap.String("parameter", cfgEntry),
|
||||||
|
zap.Duration("value in config", lifetime),
|
||||||
|
zap.Duration("default", defaultValue))
|
||||||
|
} else {
|
||||||
|
return lifetime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
|
||||||
|
if v.IsSet(cfgEntry) {
|
||||||
|
size := v.GetInt(cfgEntry)
|
||||||
|
if size <= 0 {
|
||||||
|
l.Error("invalid cache size, using default value",
|
||||||
|
zap.String("parameter", cfgEntry),
|
||||||
|
zap.Int("value in config", size),
|
||||||
|
zap.Int("default", defaultValue))
|
||||||
|
} else {
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
|
||||||
func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
||||||
cacheCfg := cache.DefaultAccessBoxConfig(l)
|
cacheCfg := cache.DefaultAccessBoxConfig(l)
|
||||||
|
|
||||||
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime)
|
cacheCfg.Lifetime = getLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime)
|
||||||
cacheCfg.Size = fetchCacheSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size)
|
cacheCfg.Size = getSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size)
|
||||||
|
|
||||||
return cacheCfg
|
return cacheCfg
|
||||||
}
|
}
|
||||||
|
@ -696,11 +780,22 @@ func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
||||||
func (a *App) initHandler() {
|
func (a *App) initHandler() {
|
||||||
cfg := &handler.Config{
|
cfg := &handler.Config{
|
||||||
Policy: a.settings.policies,
|
Policy: a.settings.policies,
|
||||||
DefaultMaxAge: fetchDefaultMaxAge(a.cfg, a.log),
|
DefaultMaxAge: handler.DefaultMaxAge,
|
||||||
NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS),
|
NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS),
|
||||||
XMLDecoder: a.settings.xmlDecoder,
|
XMLDecoder: a.settings.xmlDecoder,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if a.cfg.IsSet(cfgDefaultMaxAge) {
|
||||||
|
defaultMaxAge := a.cfg.GetInt(cfgDefaultMaxAge)
|
||||||
|
|
||||||
|
if defaultMaxAge <= 0 && defaultMaxAge != -1 {
|
||||||
|
a.log.Fatal("invalid defaultMaxAge",
|
||||||
|
zap.String("parameter", cfgDefaultMaxAge),
|
||||||
|
zap.String("value in config", strconv.Itoa(defaultMaxAge)))
|
||||||
|
}
|
||||||
|
cfg.DefaultMaxAge = defaultMaxAge
|
||||||
|
}
|
||||||
|
|
||||||
cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketAllow)
|
cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketAllow)
|
||||||
cfg.IsResolveListAllow = len(cfg.ResolveZoneList) > 0
|
cfg.IsResolveListAllow = len(cfg.ResolveZoneList) > 0
|
||||||
if !cfg.IsResolveListAllow {
|
if !cfg.IsResolveListAllow {
|
||||||
|
|
|
@ -10,11 +10,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
@ -159,169 +156,6 @@ var ignore = map[string]struct{}{
|
||||||
cmdVersion: {},
|
cmdVersion: {},
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchConnectTimeout(cfg *viper.Viper) time.Duration {
|
|
||||||
connTimeout := cfg.GetDuration(cfgConnectTimeout)
|
|
||||||
if connTimeout <= 0 {
|
|
||||||
connTimeout = defaultConnectTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
return connTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchStreamTimeout(cfg *viper.Viper) time.Duration {
|
|
||||||
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
|
|
||||||
if streamTimeout <= 0 {
|
|
||||||
streamTimeout = defaultStreamTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
return streamTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchHealthCheckTimeout(cfg *viper.Viper) time.Duration {
|
|
||||||
healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout)
|
|
||||||
if healthCheckTimeout <= 0 {
|
|
||||||
healthCheckTimeout = defaultHealthcheckTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
return healthCheckTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchRebalanceInterval(cfg *viper.Viper) time.Duration {
|
|
||||||
rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval)
|
|
||||||
if rebalanceInterval <= 0 {
|
|
||||||
rebalanceInterval = defaultRebalanceInterval
|
|
||||||
}
|
|
||||||
|
|
||||||
return rebalanceInterval
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchErrorThreshold(cfg *viper.Viper) uint32 {
|
|
||||||
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
|
|
||||||
if errorThreshold <= 0 {
|
|
||||||
errorThreshold = defaultPoolErrorThreshold
|
|
||||||
}
|
|
||||||
|
|
||||||
return errorThreshold
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchMaxClientsCount(cfg *viper.Viper) int {
|
|
||||||
maxClientsCount := cfg.GetInt(cfgMaxClientsCount)
|
|
||||||
if maxClientsCount <= 0 {
|
|
||||||
maxClientsCount = defaultMaxClientsCount
|
|
||||||
}
|
|
||||||
|
|
||||||
return maxClientsCount
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchMaxClientsDeadline(cfg *viper.Viper) time.Duration {
|
|
||||||
maxClientsDeadline := cfg.GetDuration(cfgMaxClientsDeadline)
|
|
||||||
if maxClientsDeadline <= 0 {
|
|
||||||
maxClientsDeadline = defaultMaxClientsDeadline
|
|
||||||
}
|
|
||||||
|
|
||||||
return maxClientsDeadline
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchDefaultPolicy(cfg *viper.Viper) (netmap.PlacementPolicy, error) {
|
|
||||||
defaultPolicyStr := handler.DefaultPolicy
|
|
||||||
if cfg.IsSet(cfgPolicyDefault) {
|
|
||||||
defaultPolicyStr = cfg.GetString(cfgPolicyDefault)
|
|
||||||
}
|
|
||||||
|
|
||||||
var defaultPlacementPolicy netmap.PlacementPolicy
|
|
||||||
if err := defaultPlacementPolicy.DecodeString(defaultPolicyStr); err != nil {
|
|
||||||
return netmap.PlacementPolicy{}, fmt.Errorf("parse default policy '%s': %w", defaultPolicyStr, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return defaultPlacementPolicy, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchNATSTimeout(cfg *viper.Viper, l *zap.Logger) time.Duration {
|
|
||||||
timeout := cfg.GetDuration(cfgNATSTimeout)
|
|
||||||
if timeout <= 0 {
|
|
||||||
l.Error("invalid lifetime, using default value (in seconds)",
|
|
||||||
zap.String("parameter", cfgNATSTimeout),
|
|
||||||
zap.Duration("value in config", timeout),
|
|
||||||
zap.Duration("default", notifications.DefaultTimeout))
|
|
||||||
timeout = notifications.DefaultTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
return timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
|
||||||
if v.IsSet(cfgEntry) {
|
|
||||||
lifetime := v.GetDuration(cfgEntry)
|
|
||||||
if lifetime <= 0 {
|
|
||||||
l.Error("invalid lifetime, using default value (in seconds)",
|
|
||||||
zap.String("parameter", cfgEntry),
|
|
||||||
zap.Duration("value in config", lifetime),
|
|
||||||
zap.Duration("default", defaultValue))
|
|
||||||
} else {
|
|
||||||
return lifetime
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return defaultValue
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
|
|
||||||
if v.IsSet(cfgEntry) {
|
|
||||||
size := v.GetInt(cfgEntry)
|
|
||||||
if size <= 0 {
|
|
||||||
l.Error("invalid cache size, using default value",
|
|
||||||
zap.String("parameter", cfgEntry),
|
|
||||||
zap.Int("value in config", size),
|
|
||||||
zap.Int("default", defaultValue))
|
|
||||||
} else {
|
|
||||||
return size
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return defaultValue
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchDefaultMaxAge(cfg *viper.Viper, l *zap.Logger) int {
|
|
||||||
defaultMaxAge := handler.DefaultMaxAge
|
|
||||||
|
|
||||||
if cfg.IsSet(cfgDefaultMaxAge) {
|
|
||||||
defaultMaxAge = cfg.GetInt(cfgDefaultMaxAge)
|
|
||||||
|
|
||||||
if defaultMaxAge <= 0 && defaultMaxAge != -1 {
|
|
||||||
l.Fatal("invalid defaultMaxAge",
|
|
||||||
zap.String("parameter", cfgDefaultMaxAge),
|
|
||||||
zap.String("value in config", strconv.Itoa(defaultMaxAge)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return defaultMaxAge
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchRegionMappingPolicies(cfg *viper.Viper) (map[string]netmap.PlacementPolicy, error) {
|
|
||||||
regionPolicyMap, err := readRegionMap(cfg.GetString(cfgPolicyRegionMapFile))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("read region map file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap))
|
|
||||||
for region, policy := range regionPolicyMap {
|
|
||||||
var pp netmap.PlacementPolicy
|
|
||||||
if err = pp.DecodeString(policy); err == nil {
|
|
||||||
regionMap[region] = pp
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
|
|
||||||
regionMap[region] = pp
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("parse region '%s' to policy mapping: %w", region, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return regionMap, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchDefaultCopiesNumbers(v *viper.Viper) ([]uint32, error) {
|
func fetchDefaultCopiesNumbers(v *viper.Viper) ([]uint32, error) {
|
||||||
unparsed := v.GetStringSlice(cfgSetCopiesNumber)
|
unparsed := v.GetStringSlice(cfgSetCopiesNumber)
|
||||||
var result []uint32
|
var result []uint32
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -5,7 +5,7 @@ go 1.19
|
||||||
require (
|
require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821090303-202412230a05
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6
|
||||||
github.com/aws/aws-sdk-go v1.44.6
|
github.com/aws/aws-sdk-go v1.44.6
|
||||||
github.com/bluele/gcache v0.0.2
|
github.com/bluele/gcache v0.0.2
|
||||||
github.com/go-chi/chi/v5 v5.0.8
|
github.com/go-chi/chi/v5 v5.0.8
|
||||||
|
|
6
go.sum
6
go.sum
|
@ -44,10 +44,8 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821073319-342524159ac3 h1:GBRTOTRrtIvxi2TgxG7z/J7uRXiyb1SxR4247FaYCgU=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6 h1:u6lzNotV6MEMNEG/XeS7g+FjPrrf+j4gnOHtvun2KJc=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821073319-342524159ac3/go.mod h1:t1akKcUH7iBrFHX8rSXScYMP17k2kYQXMbZooiL5Juw=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6/go.mod h1:LI2GOj0pEx0jYTjB3QHja2PNhQFYL2pCm71RAFwDv0M=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821090303-202412230a05 h1:OuViMF54N87FXmaBEpYw3jhzaLrJ/EWOlPL1wUkimE0=
|
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230821090303-202412230a05/go.mod h1:t1akKcUH7iBrFHX8rSXScYMP17k2kYQXMbZooiL5Juw=
|
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||||
|
|
|
@ -25,6 +25,8 @@ func IsErrObjectAccessDenied(err error) (string, bool) {
|
||||||
switch err := err.(type) {
|
switch err := err.(type) {
|
||||||
default:
|
default:
|
||||||
return "", false
|
return "", false
|
||||||
|
case apistatus.ObjectAccessDenied:
|
||||||
|
return err.Reason(), true
|
||||||
case *apistatus.ObjectAccessDenied:
|
case *apistatus.ObjectAccessDenied:
|
||||||
return err.Reason(), true
|
return err.Reason(), true
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,9 @@ const (
|
||||||
|
|
||||||
// NewFrostFS creates new FrostFS using provided pool.Pool.
|
// NewFrostFS creates new FrostFS using provided pool.Pool.
|
||||||
func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
|
func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
|
||||||
await := pool.WaitParams{PollInterval: defaultPollInterval, Timeout: defaultPollTimeout}
|
var await pool.WaitParams
|
||||||
|
await.SetPollInterval(defaultPollInterval)
|
||||||
|
await.SetTimeout(defaultPollTimeout)
|
||||||
|
|
||||||
var owner user.ID
|
var owner user.ID
|
||||||
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
||||||
|
@ -91,7 +93,8 @@ func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (u
|
||||||
|
|
||||||
// Container implements frostfs.FrostFS interface method.
|
// Container implements frostfs.FrostFS interface method.
|
||||||
func (x *FrostFS) Container(ctx context.Context, idCnr cid.ID) (*container.Container, error) {
|
func (x *FrostFS) Container(ctx context.Context, idCnr cid.ID) (*container.Container, error) {
|
||||||
prm := pool.PrmContainerGet{ContainerID: idCnr}
|
var prm pool.PrmContainerGet
|
||||||
|
prm.SetContainerID(idCnr)
|
||||||
|
|
||||||
res, err := x.pool.GetContainer(ctx, prm)
|
res, err := x.pool.GetContainer(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -164,7 +167,13 @@ func (x *FrostFS) UserContainers(ctx context.Context, id user.ID) ([]cid.ID, err
|
||||||
|
|
||||||
// SetContainerEACL implements frostfs.FrostFS interface method.
|
// SetContainerEACL implements frostfs.FrostFS interface method.
|
||||||
func (x *FrostFS) SetContainerEACL(ctx context.Context, table eacl.Table, sessionToken *session.Container) error {
|
func (x *FrostFS) SetContainerEACL(ctx context.Context, table eacl.Table, sessionToken *session.Container) error {
|
||||||
prm := pool.PrmContainerSetEACL{Table: table, Session: sessionToken, WaitParams: &x.await}
|
var prm pool.PrmContainerSetEACL
|
||||||
|
prm.SetTable(table)
|
||||||
|
prm.SetWaitParams(x.await)
|
||||||
|
|
||||||
|
if sessionToken != nil {
|
||||||
|
prm.WithinSession(*sessionToken)
|
||||||
|
}
|
||||||
|
|
||||||
err := x.pool.SetEACL(ctx, prm)
|
err := x.pool.SetEACL(ctx, prm)
|
||||||
return handleObjectError("save eACL via connection pool", err)
|
return handleObjectError("save eACL via connection pool", err)
|
||||||
|
@ -185,7 +194,13 @@ func (x *FrostFS) ContainerEACL(ctx context.Context, id cid.ID) (*eacl.Table, er
|
||||||
|
|
||||||
// DeleteContainer implements frostfs.FrostFS interface method.
|
// DeleteContainer implements frostfs.FrostFS interface method.
|
||||||
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
|
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
|
||||||
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
|
var prm pool.PrmContainerDelete
|
||||||
|
prm.SetContainerID(id)
|
||||||
|
prm.SetWaitParams(x.await)
|
||||||
|
|
||||||
|
if token != nil {
|
||||||
|
prm.SetSessionToken(*token)
|
||||||
|
}
|
||||||
|
|
||||||
err := x.pool.DeleteContainer(ctx, prm)
|
err := x.pool.DeleteContainer(ctx, prm)
|
||||||
return handleObjectError("delete container via connection pool", err)
|
return handleObjectError("delete container via connection pool", err)
|
||||||
|
|
|
@ -48,14 +48,6 @@ var appMetricsDesc = map[string]map[string]Description{
|
||||||
Help: "Average request duration (in milliseconds) for specific method on node in pool",
|
Help: "Average request duration (in milliseconds) for specific method on node in pool",
|
||||||
VariableLabels: []string{"node", "method"},
|
VariableLabels: []string{"node", "method"},
|
||||||
},
|
},
|
||||||
currentNodesMetric: Description{
|
|
||||||
Type: dto.MetricType_GAUGE,
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: poolSubsystem,
|
|
||||||
Name: currentNodesMetric,
|
|
||||||
Help: "Addresses of nodes of the same and highest priority that are currently healthy",
|
|
||||||
VariableLabels: []string{"address"},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
billingSubsystem: {
|
billingSubsystem: {
|
||||||
userRequestsMetric: Description{
|
userRequestsMetric: Description{
|
||||||
|
|
|
@ -15,7 +15,6 @@ const (
|
||||||
overallNodeRequestsMetric = "overall_node_requests"
|
overallNodeRequestsMetric = "overall_node_requests"
|
||||||
currentErrorMetric = "current_errors"
|
currentErrorMetric = "current_errors"
|
||||||
avgRequestDurationMetric = "avg_request_duration"
|
avgRequestDurationMetric = "avg_request_duration"
|
||||||
currentNodesMetric = "current_nodes"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -43,7 +42,6 @@ type poolMetricsCollector struct {
|
||||||
overallNodeRequests *prometheus.GaugeVec
|
overallNodeRequests *prometheus.GaugeVec
|
||||||
currentErrors *prometheus.GaugeVec
|
currentErrors *prometheus.GaugeVec
|
||||||
requestDuration *prometheus.GaugeVec
|
requestDuration *prometheus.GaugeVec
|
||||||
currentNodes *prometheus.GaugeVec
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
|
func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
|
||||||
|
@ -54,7 +52,6 @@ func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
|
||||||
overallNodeRequests: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeRequestsMetric]),
|
overallNodeRequests: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeRequestsMetric]),
|
||||||
currentErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentErrorMetric]),
|
currentErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentErrorMetric]),
|
||||||
requestDuration: mustNewGaugeVec(appMetricsDesc[poolSubsystem][avgRequestDurationMetric]),
|
requestDuration: mustNewGaugeVec(appMetricsDesc[poolSubsystem][avgRequestDurationMetric]),
|
||||||
currentNodes: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentNodesMetric]),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,7 +62,6 @@ func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
|
||||||
m.overallNodeRequests.Collect(ch)
|
m.overallNodeRequests.Collect(ch)
|
||||||
m.currentErrors.Collect(ch)
|
m.currentErrors.Collect(ch)
|
||||||
m.requestDuration.Collect(ch)
|
m.requestDuration.Collect(ch)
|
||||||
m.currentNodes.Collect(ch)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
|
func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
|
||||||
|
@ -74,7 +70,6 @@ func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
|
||||||
m.overallNodeRequests.Describe(descs)
|
m.overallNodeRequests.Describe(descs)
|
||||||
m.currentErrors.Describe(descs)
|
m.currentErrors.Describe(descs)
|
||||||
m.requestDuration.Describe(descs)
|
m.requestDuration.Describe(descs)
|
||||||
m.currentNodes.Describe(descs)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *poolMetricsCollector) updateStatistic() {
|
func (m *poolMetricsCollector) updateStatistic() {
|
||||||
|
@ -84,7 +79,6 @@ func (m *poolMetricsCollector) updateStatistic() {
|
||||||
m.overallNodeRequests.Reset()
|
m.overallNodeRequests.Reset()
|
||||||
m.currentErrors.Reset()
|
m.currentErrors.Reset()
|
||||||
m.requestDuration.Reset()
|
m.requestDuration.Reset()
|
||||||
m.currentNodes.Reset()
|
|
||||||
|
|
||||||
for _, node := range stat.Nodes() {
|
for _, node := range stat.Nodes() {
|
||||||
m.overallNodeErrors.WithLabelValues(node.Address()).Set(float64(node.OverallErrors()))
|
m.overallNodeErrors.WithLabelValues(node.Address()).Set(float64(node.OverallErrors()))
|
||||||
|
@ -94,10 +88,6 @@ func (m *poolMetricsCollector) updateStatistic() {
|
||||||
m.updateRequestsDuration(node)
|
m.updateRequestsDuration(node)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, addr := range stat.CurrentNodes() {
|
|
||||||
m.currentNodes.WithLabelValues(addr).Set(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
m.overallErrors.Set(float64(stat.OverallErrors()))
|
m.overallErrors.Set(float64(stat.OverallErrors()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue