Dmitrii Stepanov af5b3575d0
All checks were successful
DCO action / DCO (pull_request) Successful in 55s
Vulncheck / Vulncheck (pull_request) Successful in 1m27s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m10s
Build / Build Components (pull_request) Successful in 2m14s
Tests and linters / gopls check (pull_request) Successful in 4m56s
Tests and linters / Run gofumpt (pull_request) Successful in 4m59s
Tests and linters / Lint (pull_request) Successful in 5m14s
Tests and linters / Staticcheck (pull_request) Successful in 5m24s
Tests and linters / Tests (pull_request) Successful in 5m43s
Tests and linters / Tests with -race (pull_request) Successful in 7m42s
Vulncheck / Vulncheck (push) Successful in 1m5s
Pre-commit hooks / Pre-commit (push) Successful in 1m32s
Build / Build Components (push) Successful in 3m57s
Tests and linters / gopls check (push) Successful in 3m49s
OCI image / Build container images (push) Successful in 5m45s
Tests and linters / Run gofumpt (push) Successful in 5m46s
Tests and linters / Tests (push) Successful in 5m52s
Tests and linters / Lint (push) Successful in 6m21s
Tests and linters / Staticcheck (push) Successful in 6m27s
Tests and linters / Tests with -race (push) Successful in 11m10s
[#1690] qos: Do not export zero metrics counters
Signed-off-by: Dmitrii Stepanov <>
2025-03-20 14:42:35 +03:00

236 lines
5.8 KiB

package qos
import (
apistatus ""
const (
defaultIdleTimeout time.Duration = 0
defaultShare float64 = 1.0
minusOne = ^uint64(0)
defaultMetricsCollectTimeout = 5 * time.Second
type ReleaseFunc scheduling.ReleaseFunc
type Limiter interface {
ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error)
type scheduler interface {
RequestArrival(ctx context.Context, tag string) (scheduling.ReleaseFunc, error)
func NewLimiter(c *limits.Config) (Limiter, error) {
if err := validateConfig(c); err != nil {
return nil, err
readScheduler, err := createScheduler(c.Read())
if err != nil {
return nil, fmt.Errorf("create read scheduler: %w", err)
writeScheduler, err := createScheduler(c.Write())
if err != nil {
return nil, fmt.Errorf("create write scheduler: %w", err)
l := &mClockLimiter{
readScheduler: readScheduler,
writeScheduler: writeScheduler,
closeCh: make(chan struct{}),
wg: &sync.WaitGroup{},
readStats: createStats(),
writeStats: createStats(),
l.metrics.Store(&metricsHolder{metrics: &noopMetrics{}})
return l, nil
func createScheduler(config limits.OpConfig) (scheduler, error) {
if len(config.Tags) == 0 && config.MaxWaitingOps == limits.NoLimit {
return newSemaphoreScheduler(config.MaxRunningOps), nil
return scheduling.NewMClock(
uint64(config.MaxRunningOps), uint64(config.MaxWaitingOps),
converToSchedulingTags(config.Tags), config.IdleTimeout)
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
result := make(map[string]scheduling.TagInfo)
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
result[tag.String()] = scheduling.TagInfo{
Share: defaultShare,
for _, l := range limits {
v := result[l.Tag]
if l.Weight != nil && *l.Weight != 0 {
v.Share = *l.Weight
if l.LimitOps != nil && *l.LimitOps != 0 {
v.LimitIOPS = l.LimitOps
if l.ReservedOps != nil && *l.ReservedOps != 0 {
v.ReservedIOPS = l.ReservedOps
result[l.Tag] = v
return result
var (
_ Limiter = (*noopLimiter)(nil)
releaseStub ReleaseFunc = func() {}
noopLimiterInstance = &noopLimiter{}
func NewNoopLimiter() Limiter {
return noopLimiterInstance
type noopLimiter struct{}
func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
func (n *noopLimiter) SetParentID(string) {}
func (n *noopLimiter) Close() {}
func (n *noopLimiter) SetMetrics(Metrics) {}
var _ Limiter = (*mClockLimiter)(nil)
type shardID struct {
id string
type mClockLimiter struct {
readScheduler scheduler
writeScheduler scheduler
readStats map[string]*stat
writeStats map[string]*stat
shardID atomic.Pointer[shardID]
metrics atomic.Pointer[metricsHolder]
closeCh chan struct{}
wg *sync.WaitGroup
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.readScheduler, n.readStats)
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.writeScheduler, n.writeStats)
func requestArrival(ctx context.Context, s scheduler, stats map[string]*stat) (ReleaseFunc, error) {
tag, ok := tagging.IOTagFromContext(ctx)
if !ok {
tag = IOTagClient.String()
stat := getStat(tag, stats)
if tag == IOTagCritical.String() {
return func() {
}, nil
rel, err := s.RequestArrival(ctx, tag)
if err != nil {
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) {
return nil, &apistatus.ResourceExhausted{}
return nil, err
return func() {
}, nil
func (n *mClockLimiter) Close() {
func (n *mClockLimiter) SetParentID(parentID string) {
n.shardID.Store(&shardID{id: parentID})
func (n *mClockLimiter) SetMetrics(m Metrics) {
n.metrics.Store(&metricsHolder{metrics: m})
func (n *mClockLimiter) startMetricsCollect() {
go func() {
defer n.wg.Done()
ticker := time.NewTicker(defaultMetricsCollectTimeout)
defer ticker.Stop()
for {
select {
case <-n.closeCh:
case <-ticker.C:
shardID := n.shardID.Load().id
if shardID == "" {
metrics := n.metrics.Load().metrics
exportMetrics(metrics, n.readStats, shardID, "read")
exportMetrics(metrics, n.writeStats, shardID, "write")
func exportMetrics(metrics Metrics, stats map[string]*stat, shardID, operation string) {
var pending uint64
var inProgress uint64
var completed uint64
var resExh uint64
for tag, s := range stats {
pending = s.pending.Load()
inProgress = s.inProgress.Load()
completed = s.completed.Load()
resExh = s.resourceExhausted.Load()
if pending == 0 && inProgress == 0 && completed == 0 && resExh == 0 {
metrics.SetOperationTagCounters(shardID, operation, tag, pending, inProgress, completed, resExh)