forked from TrueCloudLab/frostfs-node
[#1700] config: Move config struct to qos package
Change-Id: Ie642fff5cd1702cda00425628e11f3fd8c514798 Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
3be33b7117
commit
5aaa3df533
9 changed files with 109 additions and 75 deletions
|
@ -385,7 +385,7 @@ func (a *applicationConfiguration) setGCConfig(target *shardCfg, source *shardco
|
|||
}
|
||||
|
||||
func (a *applicationConfiguration) setLimiter(target *shardCfg, source *shardconfig.Config) error {
|
||||
limitsConfig := source.Limits()
|
||||
limitsConfig := source.Limits().ToConfig()
|
||||
limiter, err := qos.NewLimiter(limitsConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -11,10 +11,10 @@ import (
|
|||
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
||||
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||
gcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/gc"
|
||||
limitsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
|
||||
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/writecache"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -135,8 +135,8 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
||||
|
||||
readLimits := limits.Read()
|
||||
writeLimits := limits.Write()
|
||||
readLimits := limits.ToConfig().Read
|
||||
writeLimits := limits.ToConfig().Write
|
||||
require.Equal(t, 30*time.Second, readLimits.IdleTimeout)
|
||||
require.Equal(t, int64(10_000), readLimits.MaxRunningOps)
|
||||
require.Equal(t, int64(1_000), readLimits.MaxWaitingOps)
|
||||
|
@ -144,7 +144,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, int64(1_000), writeLimits.MaxRunningOps)
|
||||
require.Equal(t, int64(100), writeLimits.MaxWaitingOps)
|
||||
require.ElementsMatch(t, readLimits.Tags,
|
||||
[]limitsconfig.IOTagConfig{
|
||||
[]qos.IOTagConfig{
|
||||
{
|
||||
Tag: "internal",
|
||||
Weight: toPtr(20),
|
||||
|
@ -173,9 +173,14 @@ func TestEngineSection(t *testing.T) {
|
|||
LimitOps: toPtr(25000),
|
||||
Prohibited: true,
|
||||
},
|
||||
{
|
||||
Tag: "treesync",
|
||||
Weight: toPtr(5),
|
||||
LimitOps: toPtr(25),
|
||||
},
|
||||
})
|
||||
require.ElementsMatch(t, writeLimits.Tags,
|
||||
[]limitsconfig.IOTagConfig{
|
||||
[]qos.IOTagConfig{
|
||||
{
|
||||
Tag: "internal",
|
||||
Weight: toPtr(200),
|
||||
|
@ -203,6 +208,11 @@ func TestEngineSection(t *testing.T) {
|
|||
Weight: toPtr(50),
|
||||
LimitOps: toPtr(2500),
|
||||
},
|
||||
{
|
||||
Tag: "treesync",
|
||||
Weight: toPtr(50),
|
||||
LimitOps: toPtr(100),
|
||||
},
|
||||
})
|
||||
case 1:
|
||||
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
||||
|
@ -259,14 +269,14 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
||||
|
||||
readLimits := limits.Read()
|
||||
writeLimits := limits.Write()
|
||||
require.Equal(t, limitsconfig.DefaultIdleTimeout, readLimits.IdleTimeout)
|
||||
require.Equal(t, limitsconfig.NoLimit, readLimits.MaxRunningOps)
|
||||
require.Equal(t, limitsconfig.NoLimit, readLimits.MaxWaitingOps)
|
||||
require.Equal(t, limitsconfig.DefaultIdleTimeout, writeLimits.IdleTimeout)
|
||||
require.Equal(t, limitsconfig.NoLimit, writeLimits.MaxRunningOps)
|
||||
require.Equal(t, limitsconfig.NoLimit, writeLimits.MaxWaitingOps)
|
||||
readLimits := limits.ToConfig().Read
|
||||
writeLimits := limits.ToConfig().Write
|
||||
require.Equal(t, qos.DefaultIdleTimeout, readLimits.IdleTimeout)
|
||||
require.Equal(t, qos.NoLimit, readLimits.MaxRunningOps)
|
||||
require.Equal(t, qos.NoLimit, readLimits.MaxWaitingOps)
|
||||
require.Equal(t, qos.DefaultIdleTimeout, writeLimits.IdleTimeout)
|
||||
require.Equal(t, qos.NoLimit, writeLimits.MaxRunningOps)
|
||||
require.Equal(t, qos.NoLimit, writeLimits.MaxWaitingOps)
|
||||
require.Equal(t, 0, len(readLimits.Tags))
|
||||
require.Equal(t, 0, len(writeLimits.Tags))
|
||||
}
|
||||
|
|
|
@ -1,19 +1,13 @@
|
|||
package limits
|
||||
|
||||
import (
|
||||
"math"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"github.com/spf13/cast"
|
||||
)
|
||||
|
||||
const (
|
||||
NoLimit int64 = math.MaxInt64
|
||||
DefaultIdleTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
// From wraps config section into Config.
|
||||
func From(c *config.Config) *Config {
|
||||
return (*Config)(c)
|
||||
|
@ -23,36 +17,43 @@ func From(c *config.Config) *Config {
|
|||
// which provides access to Shard's limits configurations.
|
||||
type Config config.Config
|
||||
|
||||
// Read returns the value of "read" limits config section.
|
||||
func (x *Config) Read() OpConfig {
|
||||
func (x *Config) ToConfig() qos.LimiterConfig {
|
||||
result := qos.LimiterConfig{
|
||||
Read: x.read(),
|
||||
Write: x.write(),
|
||||
}
|
||||
panicOnErr(result.Validate())
|
||||
return result
|
||||
}
|
||||
|
||||
func (x *Config) read() qos.OpConfig {
|
||||
return x.parse("read")
|
||||
}
|
||||
|
||||
// Write returns the value of "write" limits config section.
|
||||
func (x *Config) Write() OpConfig {
|
||||
func (x *Config) write() qos.OpConfig {
|
||||
return x.parse("write")
|
||||
}
|
||||
|
||||
func (x *Config) parse(sub string) OpConfig {
|
||||
func (x *Config) parse(sub string) qos.OpConfig {
|
||||
c := (*config.Config)(x).Sub(sub)
|
||||
var result OpConfig
|
||||
var result qos.OpConfig
|
||||
|
||||
if s := config.Int(c, "max_waiting_ops"); s > 0 {
|
||||
result.MaxWaitingOps = s
|
||||
} else {
|
||||
result.MaxWaitingOps = NoLimit
|
||||
result.MaxWaitingOps = qos.NoLimit
|
||||
}
|
||||
|
||||
if s := config.Int(c, "max_running_ops"); s > 0 {
|
||||
result.MaxRunningOps = s
|
||||
} else {
|
||||
result.MaxRunningOps = NoLimit
|
||||
result.MaxRunningOps = qos.NoLimit
|
||||
}
|
||||
|
||||
if s := config.DurationSafe(c, "idle_timeout"); s > 0 {
|
||||
result.IdleTimeout = s
|
||||
} else {
|
||||
result.IdleTimeout = DefaultIdleTimeout
|
||||
result.IdleTimeout = qos.DefaultIdleTimeout
|
||||
}
|
||||
|
||||
result.Tags = tags(c)
|
||||
|
@ -60,43 +61,16 @@ func (x *Config) parse(sub string) OpConfig {
|
|||
return result
|
||||
}
|
||||
|
||||
type OpConfig struct {
|
||||
// MaxWaitingOps returns the value of "max_waiting_ops" config parameter.
|
||||
//
|
||||
// Equals NoLimit if the value is not a positive number.
|
||||
MaxWaitingOps int64
|
||||
// MaxRunningOps returns the value of "max_running_ops" config parameter.
|
||||
//
|
||||
// Equals NoLimit if the value is not a positive number.
|
||||
MaxRunningOps int64
|
||||
// IdleTimeout returns the value of "idle_timeout" config parameter.
|
||||
//
|
||||
// Equals DefaultIdleTimeout if the value is not a valid duration.
|
||||
IdleTimeout time.Duration
|
||||
// Tags returns the value of "tags" config parameter.
|
||||
//
|
||||
// Equals nil if the value is not a valid tags config slice.
|
||||
Tags []IOTagConfig
|
||||
}
|
||||
|
||||
type IOTagConfig struct {
|
||||
Tag string
|
||||
Weight *float64
|
||||
LimitOps *float64
|
||||
ReservedOps *float64
|
||||
Prohibited bool
|
||||
}
|
||||
|
||||
func tags(c *config.Config) []IOTagConfig {
|
||||
func tags(c *config.Config) []qos.IOTagConfig {
|
||||
c = c.Sub("tags")
|
||||
var result []IOTagConfig
|
||||
var result []qos.IOTagConfig
|
||||
for i := 0; ; i++ {
|
||||
tag := config.String(c, strconv.Itoa(i)+".tag")
|
||||
if tag == "" {
|
||||
return result
|
||||
}
|
||||
|
||||
var tagConfig IOTagConfig
|
||||
var tagConfig qos.IOTagConfig
|
||||
tagConfig.Tag = tag
|
||||
|
||||
v := c.Value(strconv.Itoa(i) + ".weight")
|
||||
|
|
|
@ -181,6 +181,9 @@ FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_TAG=policer
|
|||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_WEIGHT=5
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_LIMIT_OPS=25000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_PROHIBITED=true
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_5_TAG=treesync
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_5_WEIGHT=5
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_5_LIMIT_OPS=25
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_TAG=internal
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_WEIGHT=200
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_LIMIT_OPS=0
|
||||
|
@ -198,6 +201,9 @@ FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_3_LIMIT_OPS=2500
|
|||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_TAG=policer
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_WEIGHT=50
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_4_LIMIT_OPS=2500
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_5_TAG=treesync
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_5_WEIGHT=50
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_5_LIMIT_OPS=100
|
||||
|
||||
## 1 shard
|
||||
### Flag to refill Metabase from BlobStor
|
||||
|
|
|
@ -254,6 +254,11 @@
|
|||
"weight": 5,
|
||||
"limit_ops": 25000,
|
||||
"prohibited": true
|
||||
},
|
||||
{
|
||||
"tag": "treesync",
|
||||
"weight": 5,
|
||||
"limit_ops": 25
|
||||
}
|
||||
]
|
||||
},
|
||||
|
@ -288,6 +293,11 @@
|
|||
"tag": "policer",
|
||||
"weight": 50,
|
||||
"limit_ops": 2500
|
||||
},
|
||||
{
|
||||
"tag": "treesync",
|
||||
"weight": 50,
|
||||
"limit_ops": 100
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -253,6 +253,9 @@ storage:
|
|||
weight: 5
|
||||
limit_ops: 25000
|
||||
prohibited: true
|
||||
- tag: treesync
|
||||
weight: 5
|
||||
limit_ops: 25
|
||||
write:
|
||||
max_running_ops: 1000
|
||||
max_waiting_ops: 100
|
||||
|
@ -275,6 +278,9 @@ storage:
|
|||
- tag: policer
|
||||
weight: 50
|
||||
limit_ops: 2500
|
||||
- tag: treesync
|
||||
weight: 50
|
||||
limit_ops: 100
|
||||
|
||||
1:
|
||||
writecache:
|
||||
|
|
31
internal/qos/config.go
Normal file
31
internal/qos/config.go
Normal file
|
@ -0,0 +1,31 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
NoLimit int64 = math.MaxInt64
|
||||
DefaultIdleTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
type LimiterConfig struct {
|
||||
Read OpConfig
|
||||
Write OpConfig
|
||||
}
|
||||
|
||||
type OpConfig struct {
|
||||
MaxWaitingOps int64
|
||||
MaxRunningOps int64
|
||||
IdleTimeout time.Duration
|
||||
Tags []IOTagConfig
|
||||
}
|
||||
|
||||
type IOTagConfig struct {
|
||||
Tag string
|
||||
Weight *float64
|
||||
LimitOps *float64
|
||||
ReservedOps *float64
|
||||
Prohibited bool
|
||||
}
|
|
@ -8,7 +8,6 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
|
@ -37,15 +36,15 @@ type scheduler interface {
|
|||
Close()
|
||||
}
|
||||
|
||||
func NewLimiter(c *limits.Config) (Limiter, error) {
|
||||
if err := validateConfig(c); err != nil {
|
||||
func NewLimiter(c LimiterConfig) (Limiter, error) {
|
||||
if err := c.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
readScheduler, err := createScheduler(c.Read())
|
||||
readScheduler, err := createScheduler(c.Read)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create read scheduler: %w", err)
|
||||
}
|
||||
writeScheduler, err := createScheduler(c.Write())
|
||||
writeScheduler, err := createScheduler(c.Write)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create write scheduler: %w", err)
|
||||
}
|
||||
|
@ -63,8 +62,8 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
|
|||
return l, nil
|
||||
}
|
||||
|
||||
func createScheduler(config limits.OpConfig) (scheduler, error) {
|
||||
if len(config.Tags) == 0 && config.MaxWaitingOps == limits.NoLimit {
|
||||
func createScheduler(config OpConfig) (scheduler, error) {
|
||||
if len(config.Tags) == 0 && config.MaxWaitingOps == NoLimit {
|
||||
return newSemaphoreScheduler(config.MaxRunningOps), nil
|
||||
}
|
||||
return scheduling.NewMClock(
|
||||
|
@ -72,7 +71,7 @@ func createScheduler(config limits.OpConfig) (scheduler, error) {
|
|||
converToSchedulingTags(config.Tags), config.IdleTimeout)
|
||||
}
|
||||
|
||||
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
|
||||
func converToSchedulingTags(limits []IOTagConfig) map[string]scheduling.TagInfo {
|
||||
result := make(map[string]scheduling.TagInfo)
|
||||
for _, tag := range []IOTag{IOTagBackground, IOTagClient, IOTagInternal, IOTagPolicer, IOTagTreeSync, IOTagWritecache} {
|
||||
result[tag.String()] = scheduling.TagInfo{
|
||||
|
|
|
@ -4,8 +4,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
)
|
||||
|
||||
var errWeightsMustBeSpecified = errors.New("invalid weights: weights must be specified for all tags or not specified for any")
|
||||
|
@ -14,17 +12,17 @@ type tagConfig struct {
|
|||
Shares, Limit, Reserved *float64
|
||||
}
|
||||
|
||||
func validateConfig(c *limits.Config) error {
|
||||
if err := validateOpConfig(c.Read()); err != nil {
|
||||
func (c *LimiterConfig) Validate() error {
|
||||
if err := validateOpConfig(c.Read); err != nil {
|
||||
return fmt.Errorf("limits 'read' section validation error: %w", err)
|
||||
}
|
||||
if err := validateOpConfig(c.Write()); err != nil {
|
||||
if err := validateOpConfig(c.Write); err != nil {
|
||||
return fmt.Errorf("limits 'write' section validation error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateOpConfig(c limits.OpConfig) error {
|
||||
func validateOpConfig(c OpConfig) error {
|
||||
if c.MaxRunningOps <= 0 {
|
||||
return fmt.Errorf("invalid 'max_running_ops = %d': must be greater than zero", c.MaxRunningOps)
|
||||
}
|
||||
|
@ -40,7 +38,7 @@ func validateOpConfig(c limits.OpConfig) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func validateTags(configTags []limits.IOTagConfig) error {
|
||||
func validateTags(configTags []IOTagConfig) error {
|
||||
tags := map[IOTag]tagConfig{
|
||||
IOTagBackground: {},
|
||||
IOTagClient: {},
|
||||
|
|
Loading…
Add table
Reference in a new issue