[#1699] qos: Allow to prohibit operations for IO tag
All checks were successful
Vulncheck / Vulncheck (push) Successful in 1m18s
Pre-commit hooks / Pre-commit (push) Successful in 1m47s
Build / Build Components (push) Successful in 2m24s
Tests and linters / Run gofumpt (push) Successful in 2m29s
Tests and linters / Staticcheck (push) Successful in 3m42s
Tests and linters / Tests (push) Successful in 4m10s
Tests and linters / gopls check (push) Successful in 4m19s
Tests and linters / Lint (push) Successful in 4m56s
Tests and linters / Tests with -race (push) Successful in 5m5s
OCI image / Build container images (push) Successful in 3m57s
All checks were successful
Vulncheck / Vulncheck (push) Successful in 1m18s
Pre-commit hooks / Pre-commit (push) Successful in 1m47s
Build / Build Components (push) Successful in 2m24s
Tests and linters / Run gofumpt (push) Successful in 2m29s
Tests and linters / Staticcheck (push) Successful in 3m42s
Tests and linters / Tests (push) Successful in 4m10s
Tests and linters / gopls check (push) Successful in 4m19s
Tests and linters / Lint (push) Successful in 4m56s
Tests and linters / Tests with -race (push) Successful in 5m5s
OCI image / Build container images (push) Successful in 3m57s
Change-Id: I2bee26885244e241d224860978b6de3526527e96 Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
5a13830a94
commit
a5bae6c5af
7 changed files with 25 additions and 6 deletions
|
@ -168,9 +168,10 @@ func TestEngineSection(t *testing.T) {
|
|||
LimitOps: toPtr(25000),
|
||||
},
|
||||
{
|
||||
Tag: "policer",
|
||||
Weight: toPtr(5),
|
||||
LimitOps: toPtr(25000),
|
||||
Tag: "policer",
|
||||
Weight: toPtr(5),
|
||||
LimitOps: toPtr(25000),
|
||||
Prohibited: true,
|
||||
},
|
||||
})
|
||||
require.ElementsMatch(t, writeLimits.Tags,
|
||||
|
|
|
@ -84,6 +84,7 @@ type IOTagConfig struct {
|
|||
Weight *float64
|
||||
LimitOps *float64
|
||||
ReservedOps *float64
|
||||
Prohibited bool
|
||||
}
|
||||
|
||||
func tags(c *config.Config) []IOTagConfig {
|
||||
|
@ -119,6 +120,13 @@ func tags(c *config.Config) []IOTagConfig {
|
|||
tagConfig.ReservedOps = &r
|
||||
}
|
||||
|
||||
v = c.Value(strconv.Itoa(i) + ".prohibited")
|
||||
if v != nil {
|
||||
r, err := cast.ToBoolE(v)
|
||||
panicOnErr(err)
|
||||
tagConfig.Prohibited = r
|
||||
}
|
||||
|
||||
result = append(result, tagConfig)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -180,6 +180,7 @@ FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_3_LIMIT_OPS=25000
|
|||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_TAG=policer
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_WEIGHT=5
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_LIMIT_OPS=25000
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_READ_TAGS_4_PROHIBITED=true
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_TAG=internal
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_WEIGHT=200
|
||||
FROSTFS_STORAGE_SHARD_0_LIMITS_WRITE_TAGS_0_LIMIT_OPS=0
|
||||
|
|
|
@ -252,7 +252,8 @@
|
|||
{
|
||||
"tag": "policer",
|
||||
"weight": 5,
|
||||
"limit_ops": 25000
|
||||
"limit_ops": 25000,
|
||||
"prohibited": true
|
||||
}
|
||||
]
|
||||
},
|
||||
|
|
|
@ -249,6 +249,7 @@ storage:
|
|||
- tag: policer
|
||||
weight: 5
|
||||
limit_ops: 25000
|
||||
prohibited: true
|
||||
write:
|
||||
max_running_ops: 1000
|
||||
max_waiting_ops: 100
|
||||
|
|
|
@ -359,6 +359,7 @@ limits:
|
|||
| `tag.weight` | `float` | 0 (no weight) | Weight for queries with the specified tag. Weights must be specified for all tags or not specified for any one. |
|
||||
| `tag.limit_ops` | `float` | 0 (no limit) | Operations per second rate limit for queries with the specified tag. |
|
||||
| `tag.reserved_ops` | `float` | 0 (no reserve) | Reserved operations per second rate for queries with the specified tag. |
|
||||
| `tag.prohibited` | `bool` | false | If true, operations with this specified tag will be prohibited. |
|
||||
|
||||
# `node` section
|
||||
|
||||
|
|
|
@ -90,6 +90,7 @@ func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.T
|
|||
if l.ReservedOps != nil && *l.ReservedOps != 0 {
|
||||
v.ReservedIOPS = l.ReservedOps
|
||||
}
|
||||
v.Prohibited = l.Prohibited
|
||||
result[l.Tag] = v
|
||||
}
|
||||
return result
|
||||
|
@ -164,8 +165,7 @@ func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (R
|
|||
rel, err := s.RequestArrival(ctx, tag)
|
||||
stat.inProgress.Add(1)
|
||||
if err != nil {
|
||||
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
|
||||
errors.Is(err, errSemaphoreLimitExceeded) {
|
||||
if isResourceExhaustedErr(err) {
|
||||
stat.resourceExhausted.Add(1)
|
||||
return nil, &apistatus.ResourceExhausted{}
|
||||
}
|
||||
|
@ -234,3 +234,9 @@ func exportMetrics(metrics Metrics, stats map[string]*stat, shardID, operation s
|
|||
metrics.SetOperationTagCounters(shardID, operation, tag, pending, inProgress, completed, resExh)
|
||||
}
|
||||
}
|
||||
|
||||
func isResourceExhaustedErr(err error) bool {
|
||||
return errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
|
||||
errors.Is(err, errSemaphoreLimitExceeded) ||
|
||||
errors.Is(err, scheduling.ErrTagRequestsProhibited)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue