mclock: Initial implementation #1

Merged
dstepanov-yadro merged 10 commits from dstepanov-yadro/frostfs-qos:feat/mclock into master 2025-01-28 11:27:24 +00:00
11 changed files with 1379 additions and 9 deletions

View file

@ -23,11 +23,6 @@ repos:
hooks: hooks:
- id: shellcheck - id: shellcheck
- repo: https://github.com/golangci/golangci-lint
rev: v1.59.1
hooks:
- id: golangci-lint
- repo: local - repo: local
hooks: hooks:
- id: go-unit-tests - id: go-unit-tests
@ -36,3 +31,9 @@ repos:
pass_filenames: false pass_filenames: false
types: [go] types: [go]
language: system language: system
- id: golangci-lint
name: golangci-lint check
entry: make lint
pass_filenames: false
types: [go]
language: system

View file

@ -56,7 +56,7 @@ pre-commit-run:
# Install linters # Install linters
lint-install: lint-install:
@rm -rf $(LINT_DIR) @rm -rf $(LINT_DIR)
@mkdir $(LINT_DIR) @mkdir -p $(LINT_DIR)
@CGO_ENABLED=1 GOBIN=$(LINT_VERSION_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION) @CGO_ENABLED=1 GOBIN=$(LINT_VERSION_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
# Run linters # Run linters
@ -69,7 +69,7 @@ lint:
# Install staticcheck # Install staticcheck
staticcheck-install: staticcheck-install:
@rm -rf $(STATICCHECK_DIR) @rm -rf $(STATICCHECK_DIR)
@mkdir $(STATICCHECK_DIR) @mkdir -p $(STATICCHECK_DIR)
@GOBIN=$(STATICCHECK_VERSION_DIR) go install honnef.co/go/tools/cmd/staticcheck@$(STATICCHECK_VERSION) @GOBIN=$(STATICCHECK_VERSION_DIR) go install honnef.co/go/tools/cmd/staticcheck@$(STATICCHECK_VERSION)
# Run staticcheck # Run staticcheck
@ -82,7 +82,7 @@ staticcheck-run:
# Install gopls # Install gopls
gopls-install: gopls-install:
@rm -rf $(GOPLS_DIR) @rm -rf $(GOPLS_DIR)
@mkdir $(GOPLS_DIR) @mkdir -p $(GOPLS_DIR)
@GOBIN=$(GOPLS_VERSION_DIR) go install golang.org/x/tools/gopls@$(GOPLS_VERSION) @GOBIN=$(GOPLS_VERSION_DIR) go install golang.org/x/tools/gopls@$(GOPLS_VERSION)
# Run gopls # Run gopls
@ -100,7 +100,7 @@ gopls-run:
# Install gofumpt # Install gofumpt
fumpt-install: fumpt-install:
@rm -rf $(GOFUMPT_DIR) @rm -rf $(GOFUMPT_DIR)
@mkdir $(GOFUMPT_DIR) @mkdir -p $(GOFUMPT_DIR)
@GOBIN=$(GOFUMPT_VERSION_DIR) go install mvdan.cc/gofumpt@$(GOFUMPT_VERSION) @GOBIN=$(GOFUMPT_VERSION_DIR) go install mvdan.cc/gofumpt@$(GOFUMPT_VERSION)
# Run gofumpt # Run gofumpt

11
go.mod
View file

@ -1,3 +1,14 @@
module git.frostfs.info/TrueCloudLab/frostfs-qos module git.frostfs.info/TrueCloudLab/frostfs-qos
go 1.22 go 1.22
require (
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.10.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

12
go.sum Normal file
View file

@ -0,0 +1,12 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

9
internal/assert/cond.go Normal file
View file

@ -0,0 +1,9 @@
package assert
import "strings"
func Cond(cond bool, details ...string) {
if !cond {
panic(strings.Join(details, " "))
}
}

84
scheduling/clock.go Normal file
View file

@ -0,0 +1,84 @@
package scheduling
import (
"sync"
"time"
)
type clock interface {
now() float64
runAt(ts float64, f func())
close()
}
type scheduleInfo struct {
ts float64
f func()
}
type systemClock struct {
since time.Time
schedule chan scheduleInfo
wg sync.WaitGroup
}
func newSystemClock() *systemClock {
c := &systemClock{
since: time.Now(),
schedule: make(chan scheduleInfo),
}
c.start()
return c
}
func (c *systemClock) now() float64 {
return time.Since(c.since).Seconds()
}
func (c *systemClock) runAt(ts float64, f func()) {
select {
case c.schedule <- scheduleInfo{ts: ts, f: f}:
default: // timer fired, scheduleRequest will call runAt again
}
}
func (c *systemClock) close() {
close(c.schedule)
c.wg.Wait()
}
func (c *systemClock) start() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
t := time.NewTimer(0)
<-t.C
var f func()
for {
select {
case <-t.C:
if f != nil {
f()
f = nil
}
case s, ok := <-c.schedule:
if !ok {
return
}
var d time.Duration
now := c.now()
if now < s.ts {
d = time.Duration((s.ts - now) * 1e9)
}
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(d)
f = s.f
}
}
}()
}

399
scheduling/mclock.go Normal file
View file

@ -0,0 +1,399 @@
package scheduling
import (
"container/heap"
"context"
"errors"
"math"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-qos/internal/assert"
)
const (
invalidIndex = -1
undefinedReservation float64 = -1.0
)
var (
ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed")
ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded")
ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
)
type request struct {
tag string
ts float64
reservation float64
limit float64
shares float64
reservationIdx int
limitIdx int
sharesIdx int
readyIdx int
scheduled chan struct{}
canceled chan struct{}
}
// ReleaseFunc is the type of function that should be called after the request is completed.
type ReleaseFunc func()
// TagInfo contains reserved IOPS, IOPS limit and share values for a tag.
type TagInfo struct {
ReservedIOPS *float64
LimitIOPS *float64
Share float64
}
// MClock is mClock scheduling algorithm implementation.
//
// See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details.
type MClock struct {
runLimit uint64
fyrchik marked this conversation as resolved Outdated

ReleaseFunc?

`ReleaseFunc`?

Done

Done
waitLimit int
clock clock
idleTimeout float64
tagInfo map[string]TagInfo
mtx sync.Mutex
previous map[string]*request
aarifullin marked this conversation as resolved Outdated

realization -> implementation?
Realize is rather about plans in general :)

`realization` -> `implementation`? Realize is rather about plans in general :)

fixed

fixed
inProgress uint64
timeBasedScheduleTs float64
reservationQueue *queue
limitQueue *queue
sharesQueue *queue
readyQueue *queue
closed bool
}
// NewMClock creates new MClock scheduler instance with
// runLimit maximum allowed count of running requests and
// waitLimit maximum allowed count of waiting requests
// for tags specified by tagInfo. The value of idleTimeout defines
// the difference between the current time and the time of
// the previous request, at which the tag considered idle.
// If idleTimeout is negative, it means that there is no idle tags allowed.
// If waitLimit equals zero, it means that there is no limit on the
// number of waiting requests.
func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout time.Duration) (*MClock, error) {
if err := validateParams(runLimit, tagInfo); err != nil {
return nil, err
}
result := &MClock{
runLimit: runLimit,
waitLimit: int(waitLimit),
clock: newSystemClock(),
idleTimeout: idleTimeout.Seconds(),
tagInfo: tagInfo,
reservationQueue: &queue{},
limitQueue: &queue{},
sharesQueue: &queue{},
readyQueue: &queue{},
timeBasedScheduleTs: math.MaxFloat64,
}
previous := make(map[string]*request)
for tag := range tagInfo {
previous[tag] = &request{
tag: tag,
reservationIdx: invalidIndex,
limitIdx: invalidIndex,
sharesIdx: invalidIndex,
}
fyrchik marked this conversation as resolved Outdated

make([]queueItem, 0) looks like an anti-pattern to me, why don't we use nil and omit this line?

`make([]queueItem, 0)` looks like an anti-pattern to me, why don't we use `nil` and omit this line?

Done

Done
}
result.previous = previous
return result, nil
}
// RequestArrival schedules new request with tag value.
// Method call is blocked until one of the following events occurs:
// request with the tag is scheduled for execution,
// context ctx is canceled or the scheduler is closed.
fyrchik marked this conversation as resolved Outdated

Probably s/should/MUST/?

Checking my understanding: if it is not called, the space is not released and the queue is eventually saturated.
However, Close() will release all requests, without surprises.
Right?

Probably `s/should/MUST/`? Checking my understanding: if it is not called, the space is not released and the queue is eventually saturated. However, `Close()` will release all requests, without surprises. Right?

If it is not called, the count of in progress requests will not decreased and the queue is eventually saturated.
Close() will release all requests, without surprises.

If it is not called, the count of in progress requests will not decreased and the queue is eventually saturated. `Close()` will release all requests, without surprises.
// If the method call returned non-nil ReleaseFunc,
// then it must be called after the request is completed.
func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) {
req, release, err := q.pushRequest(tag)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
q.dropRequest(req)
return nil, ctx.Err()
case <-req.scheduled:
return release, nil
case <-req.canceled:
return nil, ErrMClockSchedulerClosed
}
}
// Close closes MClock scheduler.
// No new requests for scheduling will be accepted after the closing.
func (q *MClock) Close() {
q.mtx.Lock()
defer q.mtx.Unlock()
q.closed = true
q.clock.close()
for q.limitQueue.Len() > 0 {
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
close(item.r.canceled)
q.removeFromQueues(item.r)
}

clocses -> closes

clocses -> closes

done

done
}
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
if runLimit == 0 {
return ErrInvalidRunLimit
}
for _, v := range tagInfo {
if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) {
return ErrInvalidTagInfo
}
if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) {
return ErrInvalidTagInfo
}
if math.IsNaN(v.Share) || v.Share <= float64(0) {
return ErrInvalidTagInfo
}
}
return nil
}
func (q *MClock) dropRequest(req *request) {
q.mtx.Lock()
defer q.mtx.Unlock()
select {
case <-req.scheduled:
assert.Cond(q.inProgress > 0, "invalid requests count")
q.inProgress--
default:

Hm, is float64 obligatory here?

Hm, is `float64` obligatory here?

Also, is there any reason we use floats instead of ints?

Also, is there any reason we use floats instead of ints?

Algorithm uses division, and the weights can be arbitrary. That's why I chose float64.

Algorithm uses division, and the weights can be arbitrary. That's why I chose float64.

I thought about it too

Algorithm uses division

Your implementation uses seconds (float64 in Go) as a time unit. I think nanoseconds (int64 in Go) could be used too with some changes in the algorithm

For example, take the following formula

R_i^r = \max\left(R_i^{r-1} + \frac{1}{r_i},\ \mathrm{now}()\right)

Let's add units for each value

R_i^r\ \mathrm{[s]}= \max\left(R_i^{r-1}\ \mathrm{[s]} + \frac{1}{r_i\ \mathrm{[IOPS]}},\ \mathrm{now}()\ \mathrm{[s]}\right)

As I understand, 1\ \mathrm{IOPS} = 1\ \mathrm{s}^{-1}, so

r_i = 1\ \mathrm{IOPS} = 1\ \mathrm{s}^{-1} = 1 \cdot 10^9\ \mathrm{ns}^{-1}
R_i^r\ \mathrm{[s]}= \max\left(R_i^{r-1}\ \mathrm{[s]} + \frac{1}{r_i\ \mathrm{[s^{-1}]}},\ \mathrm{now}()\ \mathrm{[s]}\right)

Then rewrite all formulas

R_i^r\ \mathrm{[ns]}= \max\left(R_i^{r-1}\ \mathrm{[ns]} + \frac{10^9}{r_i\ \mathrm{[ns^{-1}]}},\ \mathrm{now}()\ \mathrm{[ns]}\right)
L_i^r\ \mathrm{[ns]}= \max\left(L_i^{r-1}\ \mathrm{[ns]} + \frac{10^9}{l_i\ \mathrm{[ns^{-1}]}},\ \mathrm{now}()\ \mathrm{[ns]}\right)
P_i^r\ \mathrm{[ns]}= \max\left(P_i^{r-1}\ \mathrm{[ns]} + \frac{10^9\ \mathrm{[ns]}}{w_i},\ \mathrm{now}()\ \mathrm{[ns]}\right)

What do you think?

I thought about it too > Algorithm uses division Your implementation uses _seconds_ (float64 in Go) as a time unit. I think _nanoseconds_ (int64 in Go) could be used too with some changes in the algorithm For example, take the following formula ```math R_i^r = \max\left(R_i^{r-1} + \frac{1}{r_i},\ \mathrm{now}()\right) ``` Let's add units for each value ```math R_i^r\ \mathrm{[s]}= \max\left(R_i^{r-1}\ \mathrm{[s]} + \frac{1}{r_i\ \mathrm{[IOPS]}},\ \mathrm{now}()\ \mathrm{[s]}\right) ``` As I understand, $1\ \mathrm{IOPS} = 1\ \mathrm{s}^{-1}$, so ```math r_i = 1\ \mathrm{IOPS} = 1\ \mathrm{s}^{-1} = 1 \cdot 10^9\ \mathrm{ns}^{-1} ``` ```math R_i^r\ \mathrm{[s]}= \max\left(R_i^{r-1}\ \mathrm{[s]} + \frac{1}{r_i\ \mathrm{[s^{-1}]}},\ \mathrm{now}()\ \mathrm{[s]}\right) ``` Then rewrite all formulas ```math R_i^r\ \mathrm{[ns]}= \max\left(R_i^{r-1}\ \mathrm{[ns]} + \frac{10^9}{r_i\ \mathrm{[ns^{-1}]}},\ \mathrm{now}()\ \mathrm{[ns]}\right) ``` ```math L_i^r\ \mathrm{[ns]}= \max\left(L_i^{r-1}\ \mathrm{[ns]} + \frac{10^9}{l_i\ \mathrm{[ns^{-1}]}},\ \mathrm{now}()\ \mathrm{[ns]}\right) ``` ```math P_i^r\ \mathrm{[ns]}= \max\left(P_i^{r-1}\ \mathrm{[ns]} + \frac{10^9\ \mathrm{[ns]}}{w_i},\ \mathrm{now}()\ \mathrm{[ns]}\right) ``` What do you think?

As for me, using float64 shouldn't lead to any problems, and I personally find it more comfortable working with seconds than with nanoseconds.
But also I changed units for idleTimeout and names of TagInfo fields.

As for me, using float64 shouldn't lead to any problems, and I personally find it more comfortable working with seconds than with nanoseconds. But also I changed units for `idleTimeout` and names of `TagInfo` fields.
}
q.removeFromQueues(req)
}
func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
fyrchik marked this conversation as resolved Outdated

NaN will not trigger this condition, though it seems like it should.

`NaN` will not trigger this condition, though it seems like it should.

Fixed

Fixed
q.mtx.Lock()
defer q.mtx.Unlock()
if q.closed {
return nil, nil, ErrMClockSchedulerClosed
}
if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit {
return nil, nil, ErrMClockSchedulerRequestLimitExceeded
}
now := q.clock.now()
tagInfo, ok := q.tagInfo[tag]
if !ok {
return nil, nil, ErrMClockSchedulerUnknownTag
}
prev, ok := q.previous[tag]
assert.Cond(ok, "undefined previous:", tag)
if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout
q.adjustTags(now, tag)
}
r := &request{
tag: tag,
ts: now,
shares: max(prev.shares+1.0/tagInfo.Share, now),
reservationIdx: invalidIndex,
limitIdx: invalidIndex,
sharesIdx: invalidIndex,
readyIdx: invalidIndex,
scheduled: make(chan struct{}),
canceled: make(chan struct{}),
}
aarifullin marked this conversation as resolved Outdated

I suppose we should add some value validation for tags as we expect that *tagInfo.Reservation != 0

I suppose we should add some value validation for tags as we expect that `*tagInfo.Reservation != 0`

Added validation and unit tests

Added validation and unit tests
if tagInfo.ReservedIOPS != nil {
r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now)
} else {
r.reservation = undefinedReservation
}
if tagInfo.LimitIOPS != nil {
r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now)
} else {
r.limit = max(prev.limit, now)
}
q.previous[tag] = r
if tagInfo.ReservedIOPS != nil {
heap.Push(q.reservationQueue, &reservationMQueueItem{r: r})
}
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
heap.Push(q.limitQueue, &limitMQueueItem{r: r})
q.scheduleRequestUnsafe()
return r, q.requestCompleted, nil
}
func (q *MClock) adjustTags(now float64, idleTag string) {
if q.sharesQueue.Len() == 0 {
return
}
minShare := q.sharesQueue.items[0].ts()
for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed
limitItem := item.(*limitMQueueItem)
if limitItem.r.tag == idleTag {
continue
fyrchik marked this conversation as resolved Outdated

Why do you have brackets around item?

Why do you have brackets around `item`?

Fixed

Fixed
}
limitItem.r.shares -= (minShare - now)
if limitItem.r.sharesIdx != invalidIndex {
heap.Fix(q.sharesQueue, limitItem.r.sharesIdx)
}
if limitItem.r.readyIdx != invalidIndex {
heap.Fix(q.readyQueue, limitItem.r.readyIdx)
}
}
}
func (q *MClock) scheduleRequest() {
q.mtx.Lock()
defer q.mtx.Unlock()
fyrchik marked this conversation as resolved Outdated

IMO making 2 separate functions would be more readable (i.e. schedule() { lock(); defer unlock(); scheduleUnsafe() } and scheduleUnsafe()), because it is not obvious what this bool value means without looking at the function definition.

IMO making 2 separate functions would be more readable (i.e. `schedule() { lock(); defer unlock(); scheduleUnsafe() }` and `scheduleUnsafe()`), because it is not obvious what this bool value means without looking at the function definition.

Done

Done
q.scheduleRequestUnsafe()
}
func (q *MClock) scheduleRequestUnsafe() {
if q.inProgress >= q.runLimit {
return
}
now := q.clock.now()
q.scheduleByReservation(now)
if q.inProgress >= q.runLimit {
return
}
q.scheduleByLimitAndWeight(now)
if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
return
}
q.setNextScheduleTimer(now)
}
func (q *MClock) setNextScheduleTimer(now float64) {
nextTs := math.MaxFloat64
if q.reservationQueue.Len() > 0 {
nextTs = q.reservationQueue.items[0].ts()
}
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
nextTs = q.limitQueue.items[0].ts()
}
a-savchuk marked this conversation as resolved Outdated

When can inProgress be greater than runLimit?

When can `inProgress` be greater than `runLimit`?

These values can be equal. Greater check is just foolproof.

These values can be equal. Greater check is just foolproof.
if nextTs <= now {
// should not happen as we always compare .ts() <= now
return
}
if q.timeBasedScheduleTs > nextTs {
q.clock.runAt(nextTs, func() {
q.scheduleRequest()

There is a possible call-chain runAt() -> scheduleRequest() -> setNextScheduleTimer() -> runAt(), where the last runAt call blocks on a non-buffered schedule channel.

Why does this not happen? Can we make it more obvious?

There is a possible call-chain `runAt()` -> `scheduleRequest()` -> `setNextScheduleTimer()` -> `runAt()`, where the last `runAt` call blocks on a non-buffered `schedule` channel. Why does this not happen? Can we make it more obvious?

Thank you very much! This is really a bug!

scheduleRequest(lock taken)                                              ||
                  ||                                                     ||
                  ||                              time fired, scheduleRequest called
                  ||                                                     ||                  
                  ||                                                     ||
runAt() called, push to channel                                          ||
                  ||                                                     ||
                              deadlock             

Fixed:

  1. systemClock's timer created stopped
  2. scheduleRequest called only when timer is fired inside systemClock
  3. runAt has default branch, to if timer fired and other scheduleRequest called, we just skip push to schedule channel, because another runAt call will be performed by scheduleRequest
Thank you very much! This is really a bug! ``` scheduleRequest(lock taken) || || || || time fired, scheduleRequest called || || || || runAt() called, push to channel || || || deadlock ``` Fixed: 1. `systemClock`'s timer created stopped 2. `scheduleRequest` called only when timer is fired inside `systemClock` 2. `runAt` has `default` branch, to if timer fired and other `scheduleRequest` called, we just skip push to `schedule` channel, because another `runAt` call will be performed by `scheduleRequest`
})
q.timeBasedScheduleTs = nextTs
}
}
func (q *MClock) scheduleByLimitAndWeight(now float64) {
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
}
for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 {

Just a comment, not expecting any changes in this PR:

This loop is run with under q.mtx lock, so arriving requests are blocked by this processing routine.
Not saying that this is a problem, but it could become in theory (latency spikes or slow degradation).
We have limited queue size, though. And new requests can be dropped, so the damage is resricted.

Just a comment, not expecting any changes in this PR: This loop is run with under `q.mtx` lock, so arriving requests are blocked by this processing routine. Not saying that this is a problem, but it could become in theory (latency spikes or slow degradation). We have limited queue size, though. And new requests can be dropped, so the damage is resricted.
next := heap.Pop(q.readyQueue).(*readyMQueueItem)
hadReservation := false
if next.r.reservationIdx != invalidIndex {
hadReservation = true
achuprov marked this conversation as resolved Outdated

What does this condition mean?

What does this condition mean?

Good question! It was an error.
It must be right so: if q.lastSchedule > nextTs
This means that nextTs is closer than already scheduled, so we must overwrite it.

Good question! It was an error. It must be right so: `if q.lastSchedule > nextTs` This means that nextTs is closer than already scheduled, so we must overwrite it.
heap.Remove(q.reservationQueue, next.r.reservationIdx)
}
q.removeFromQueues(next.r)
tagInfo, ok := q.tagInfo[next.r.tag]
assert.Cond(ok, "unknown tag:", next.r.tag)
if tagInfo.ReservedIOPS != nil && hadReservation {
var updated bool
for _, i := range q.reservationQueue.items {
ri := i.(*reservationMQueueItem)
if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation {
ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS
updated = true
}
}
fyrchik marked this conversation as resolved Outdated

Have you considered using heap.Fix(q.reservationQueue, i) here instead of heap.Init() after the loop?

Have you considered using `heap.Fix(q.reservationQueue, i)` here instead of `heap.Init()` after the loop?

I've been thinking about it. But iterating through the elements and changing the queue at the same time can be difficult to understand.

I've been thinking about it. But iterating through the elements and changing the queue at the same time can be difficult to understand.

Oh, yes, missed that it can be done multiple times.

Oh, yes, missed that it can be done multiple times.
if updated {
heap.Init(q.reservationQueue)
}
}
select {
case <-next.r.canceled:
continue
default:
}
assertIndexInvalid(next.r)
q.inProgress++
close(next.r.scheduled)
}
}
func (q *MClock) scheduleByReservation(now float64) {
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
q.removeFromQueues(next.r)
select {
case <-next.r.canceled:
continue
default:
}
assertIndexInvalid(next.r)
q.inProgress++
close(next.r.scheduled)
}
}
func (q *MClock) removeFromQueues(r *request) {
if r.limitIdx != invalidIndex {
heap.Remove(q.limitQueue, r.limitIdx)
}
if r.sharesIdx != invalidIndex {
heap.Remove(q.sharesQueue, r.sharesIdx)
}
if r.readyIdx != invalidIndex {
heap.Remove(q.readyQueue, r.readyIdx)
}
if r.reservationIdx != invalidIndex {
heap.Remove(q.reservationQueue, r.reservationIdx)
}
}
func (q *MClock) requestCompleted() {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.closed {
return
}
assert.Cond(q.inProgress > 0, "invalid requests count")
q.inProgress--
q.scheduleRequestUnsafe()
}
func assertIndexInvalid(r *request) {
assert.Cond(r.limitIdx == invalidIndex, "limitIdx is not -1")
assert.Cond(r.sharesIdx == invalidIndex, "sharesIdx is not -1")
assert.Cond(r.reservationIdx == invalidIndex, "reservationIdx is not -1")
a-savchuk marked this conversation as resolved Outdated

You used readyIdx instead of reservationIdx. Typo?

You used `readyIdx` instead of `reservationIdx`. Typo?

Thx, fixed!

Thx, fixed!
assert.Cond(r.readyIdx == invalidIndex, "readyIdx is not -1")
}
fyrchik marked this conversation as resolved Outdated

Still need this?

Still need this?

Yes: waitingCount is used by tests only, but linter excludes test files.

Yes: `waitingCount` is used by tests only, but linter excludes test files.

Can we implement it in test file then?

Can we implement it in test file then?

Ok, done

Ok, done

View file

@ -0,0 +1,172 @@
Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -tags integration -bench ^BenchmarkMClock$ git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling -count=1
goos: linux
goarch: amd64
pkg: git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
BenchmarkMClock/impl=noop/parallelism=1-8 8623 136817 ns/op 0 B/op 0 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=1-8 7368 140674 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=2-8 8486 140394 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=4-8 8500 141410 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=8-8 8268 142724 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=16-8 8431 142548 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=1-8 8505 142035 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=2-8 7845 142658 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=4-8 8473 140029 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=8-8 8518 142607 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=16-8 8578 141002 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=1-8 8557 141858 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=2-8 8353 142742 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=4-8 8475 142753 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=8-8 8433 141319 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=16-8 8480 141825 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=1-8 7827 141525 ns/op 371 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=2-8 7935 140939 ns/op 370 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=4-8 8472 140988 ns/op 368 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=8-8 8373 142260 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=16-8 8383 142239 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=1-8 5727 206852 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=2-8 6516 178739 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=4-8 7300 163438 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=8-8 7807 152344 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=16-8 8443 147051 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=1-8 6062 205018 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=2-8 6526 182511 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=4-8 7341 163028 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=8-8 7930 153741 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=16-8 7804 148216 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=1-8 5485 207763 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=2-8 5774 181830 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=4-8 7262 165102 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=8-8 7231 152958 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=16-8 7849 146705 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=1-8 5275 206549 ns/op 368 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=2-8 6115 180053 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=4-8 7264 163943 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=8-8 7810 152008 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=16-8 7875 147107 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=noop/parallelism=8-8 8589 139356 ns/op 0 B/op 0 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=1-8 7916 142917 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=2-8 8392 141914 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=4-8 8444 141011 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=8-8 8419 140638 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=16-8 8473 141018 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=1-8 8487 139941 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=2-8 7938 142745 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=4-8 8522 140837 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=8-8 8431 141361 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=16-8 8390 142171 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=1-8 8449 140695 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=2-8 8467 140622 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=4-8 8460 140925 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=8-8 8487 141316 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=16-8 7876 141374 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=1-8 7887 140590 ns/op 371 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=2-8 8328 142214 ns/op 370 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=4-8 8475 141472 ns/op 368 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=8-8 8402 141861 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=16-8 8509 142173 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=1-8 5490 207911 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=2-8 6481 182955 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=4-8 6816 165103 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=8-8 6901 155528 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=16-8 7690 148762 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=1-8 5437 205208 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=2-8 6092 183311 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=4-8 6907 162595 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=8-8 7756 151761 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=16-8 7855 146382 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=1-8 5468 206883 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=2-8 6061 180350 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=4-8 6795 163866 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=8-8 7350 152345 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=16-8 7869 145708 ns/op 374 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=1-8 5283 207099 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=2-8 6799 180029 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=4-8 7324 164306 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=8-8 7770 152377 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=16-8 8342 146888 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=noop/parallelism=32-8 8604 140481 ns/op 0 B/op 0 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=1-8 8491 142215 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=2-8 8508 140537 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=4-8 8320 142631 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=8-8 8368 142430 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=16-8 8432 141733 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=1-8 7855 141754 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=2-8 7858 141304 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=4-8 8545 140996 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=8-8 8437 142022 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=16-8 8418 142653 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=1-8 8448 141117 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=2-8 8530 142164 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=4-8 7944 142449 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=8-8 8551 139223 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=16-8 8491 140160 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=1-8 8354 141835 ns/op 371 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=2-8 7880 141608 ns/op 370 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=4-8 7940 140794 ns/op 368 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=8-8 8414 140646 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=16-8 8373 140890 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=1-8 5256 209447 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=2-8 6451 183969 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=4-8 7326 163980 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=8-8 7862 152768 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=16-8 8390 147437 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=1-8 5228 206086 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=2-8 6471 181844 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=4-8 7318 163604 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=8-8 7827 151880 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=16-8 8362 146623 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=1-8 5541 210639 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=2-8 5818 183541 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=4-8 6910 163609 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=8-8 7797 152752 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=16-8 7344 146966 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=1-8 5746 206651 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=2-8 6490 182702 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=4-8 7250 164727 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=8-8 7386 152508 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=16-8 8379 146547 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=noop/parallelism=64-8 8486 138281 ns/op 0 B/op 0 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=1-8 8472 142782 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=2-8 8437 140925 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=4-8 8338 141035 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=8-8 8487 142288 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=16-8 8366 142353 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=1-8 8510 140838 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=2-8 7935 142844 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=4-8 8218 139362 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=8-8 7977 140291 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=16-8 8371 140322 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=1-8 8524 140484 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=2-8 8461 142431 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=4-8 8420 141652 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=8-8 8385 140956 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=16-8 8355 142509 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=1-8 7239 141018 ns/op 371 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=2-8 8467 141807 ns/op 370 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=4-8 8420 140763 ns/op 368 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=8-8 8474 140264 ns/op 366 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=16-8 8413 142191 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=1-8 5474 208031 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=2-8 5706 182794 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=4-8 7248 165044 ns/op 364 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=8-8 7825 153229 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=16-8 7879 148568 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=1-8 5278 211267 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=2-8 6108 183247 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=4-8 7338 163152 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=8-8 7339 154054 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=16-8 7750 146000 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=1-8 5716 208259 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=2-8 6450 185159 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=4-8 7285 168077 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=8-8 7357 151950 ns/op 372 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=16-8 8257 147548 ns/op 373 B/op 9 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=1-8 5245 207383 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=2-8 6115 179041 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=4-8 6831 164377 ns/op 367 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=8-8 7378 152743 ns/op 365 B/op 8 allocs/op
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=16-8 7837 148694 ns/op 366 B/op 8 allocs/op
PASS
ok git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling 194.532s

View file

@ -0,0 +1,87 @@
package scheduling
import (
"context"
"fmt"
"math"
"math/rand/v2"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
)
type noopMClockScheduler struct{}
var (
releaseStub ReleaseFunc = func() {}
defaultLimit float64 = 100_000
shortReservation float64 = 1
medReservation float64 = 100
largeReservation float64 = 10_000
a-savchuk marked this conversation as resolved Outdated

I think 10_000 would look better

I think `10_000` would look better

Fixed

Fixed
)
func (s *noopMClockScheduler) RequestArrival(context.Context, string) ReleaseFunc {
return releaseStub
}
func BenchmarkMClock(b *testing.B) {
tagsCount := []int{1, 2, 4, 8, 16}
ioDuration := time.Millisecond
parallelismValues := []int{1, 8, 32, 64}
limits := []*float64{nil, &defaultLimit}
reservations := []*float64{nil, &shortReservation, &medReservation, &largeReservation}
for _, parallelism := range parallelismValues {
b.SetParallelism(parallelism)
noopMClock := &noopMClockScheduler{}
b.Run(fmt.Sprintf("impl=noop/parallelism=%d", parallelism), func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
release := noopMClock.RequestArrival(context.Background(), "tag")
time.Sleep(ioDuration)
release()
}
})
})
for _, limit := range limits {
for _, reservation := range reservations {
for _, tags := range tagsCount {
tagInfos := make(map[string]TagInfo)
for tag := 0; tag < tags; tag++ {
tagInfos["tag"+strconv.FormatInt(int64(tag), 10)] = TagInfo{Share: 50, LimitIOPS: limit, ReservedIOPS: reservation}
}
mClockQ, _ := NewMClock(math.MaxUint64, math.MaxUint64, tagInfos, time.Hour)
resStr := "no"
if reservation != nil {
resStr = strconv.FormatFloat(*reservation, 'f', 1, 64)
}
limitStr := "no"
if limit != nil {
limitStr = strconv.FormatFloat(*limit, 'f', 1, 64)
}
b.Run(fmt.Sprintf("impl=mclock/limit=%s/reservation=%s/parallelism=%d/tags=%d", limitStr, resStr, parallelism, tags), func(b *testing.B) {

Could we use benchstat syntax for logging benchmark parameters?

TrueCloudLab/frostfs-node#1472
https://pkg.go.dev/golang.org/x/perf/benchproc/syntax

Could we use benchstat syntax for logging benchmark parameters? https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/1472 https://pkg.go.dev/golang.org/x/perf/benchproc/syntax
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tag := rand.Int64N(int64(tags))
release, err := mClockQ.RequestArrival(context.Background(), "tag"+strconv.FormatInt(int64(tag), 10))
require.NoError(b, err)
time.Sleep(ioDuration)
release()
}
})
})
}
}
}
}
}

495
scheduling/mclock_test.go Normal file
View file

@ -0,0 +1,495 @@
package scheduling
import (
"context"
"math"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestMClockSharesScheduling(t *testing.T) {
t.Parallel()
reqCount := 1000
reqCount = (reqCount / 2) * 2
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2},
"class2": {Share: 1},
}, 100)
require.NoError(t, err)
q.clock = &noopClock{}
var releases []ReleaseFunc
var requests []*request
tag := "class1"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
tag = "class2"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
var result []string
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-requests[i].scheduled
result = append(result, requests[i].tag)
releases[i]()
}()
}
wg.Wait()
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
// because the ratio is 2 to 1.
// However, there may be deviations due to rounding and sorting.
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
var class1Count int
var class2Count int
var class2MaxSeq int
for _, res := range result {
switch res {
case "class1":
class1Count++
class2MaxSeq = 0
case "class2":
class2Count++
class2MaxSeq++
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
default:
require.Fail(t, "unknown tag")
}
}
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
}
var _ clock = &noopClock{}
type noopClock struct {
v float64
runAtValue *float64
}
func (n *noopClock) now() float64 {
return n.v
}
func (n *noopClock) runAt(ts float64, f func()) {
n.runAtValue = &ts
}
func (n *noopClock) close() {}
func TestMClockRequestCancel(t *testing.T) {
t.Parallel()
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2},
"class2": {Share: 1},
}, 100)
require.NoError(t, err)
q.clock = &noopClock{}
release1, err := q.RequestArrival(context.Background(), "class1")
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
release2, err := q.RequestArrival(ctx, "class1")
require.Nil(t, release2)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Equal(t, 0, q.readyQueue.Len())
require.Equal(t, 0, q.sharesQueue.Len())
require.Equal(t, 0, q.limitQueue.Len())
require.Equal(t, 0, q.reservationQueue.Len())
release1()
}
func TestMClockLimitScheduling(t *testing.T) {
t.Parallel()
reqCount := 100
reqCount = (reqCount / 2) * 2
limit := 1.0
cl := &noopClock{}
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2, LimitIOPS: &limit},
"class2": {Share: 1, LimitIOPS: &limit},
}, 100)
require.NoError(t, err)
q.clock = cl
var releases []ReleaseFunc
var requests []*request
tag := "class1"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
tag = "class2"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
q.scheduleRequest()
for _, req := range requests {
select {
case <-req.scheduled:
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
default:
}
}
cl.v = math.MaxFloat64
var result []string
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-requests[i].scheduled
result = append(result, requests[i].tag)
releases[i]()
}()
}
q.scheduleRequest()
wg.Wait()
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
// because the ratio is 2 to 1.
// However, there may be deviations due to rounding and sorting.
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
var class1Count int
var class2Count int
var class2MaxSeq int
for _, res := range result {
switch res {
case "class1":
class1Count++
class2MaxSeq = 0
case "class2":
class2Count++
class2MaxSeq++
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
default:
require.Fail(t, "unknown tag")
}
}
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
require.Equal(t, 0, q.readyQueue.Len())
require.Equal(t, 0, q.sharesQueue.Len())
require.Equal(t, 0, q.limitQueue.Len())
require.Equal(t, 0, q.reservationQueue.Len())
}
func TestMClockReservationScheduling(t *testing.T) {
t.Parallel()
reqCount := 1000
reqCount = (reqCount / 2) * 2
limit := 0.01 // 1 request in 100 seconds
resevation := 100.0 // 100 RPS
cl := &noopClock{}
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2, LimitIOPS: &limit},
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
}, 100)
require.NoError(t, err)
q.clock = cl
var releases []ReleaseFunc
var requests []*request
tag := "class1"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
tag = "class2"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
q.scheduleRequest()
for _, req := range requests {
select {
case <-req.scheduled:
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
default:
}
}
cl.v = 1.00001 // 1s elapsed
q.scheduleRequest()
var result []string
for i, req := range requests {
select {
case <-req.scheduled:
result = append(result, requests[i].tag)
releases[i]()
default:
}
}
require.Equal(t, 100, len(result))
for _, res := range result {
require.Equal(t, "class2", res)
}
cl.v = math.MaxFloat64
q.scheduleRequest()
require.Equal(t, 0, q.readyQueue.Len())
require.Equal(t, 0, q.sharesQueue.Len())
require.Equal(t, 0, q.limitQueue.Len())
require.Equal(t, 0, q.reservationQueue.Len())
}
func TestMClockIdleTag(t *testing.T) {
t.Parallel()
reqCount := 100
idleTimeout := 2 * time.Second
cl := &noopClock{}
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 1},
"class2": {Share: 1},
}, idleTimeout)
require.NoError(t, err)
q.clock = cl
var requests []*request
tag := "class1"
for i := 0; i < reqCount/2; i++ {
cl.v += idleTimeout.Seconds() / 2
req, _, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
}
// class1 requests have shares [1.0; 2.0; 3.0; ... ]
cl.v += 2 * idleTimeout.Seconds()
tag = "class2"
req, _, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
// class2 must be defined as idle, so all shares tags must be adjusted.
for _, req := range requests {
select {
case <-req.scheduled:
default:
require.True(t, req.shares >= cl.v)
}
}
}
func TestMClockClose(t *testing.T) {
t.Parallel()
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 1},
}, 1000)
require.NoError(t, err)
q.clock = &noopClock{}
requestRunning := make(chan struct{})
checkDone := make(chan struct{})
eg, ctx := errgroup.WithContext(context.Background())
tag := "class1"
eg.Go(func() error {
release, err := q.RequestArrival(ctx, tag)
if err != nil {
return err
}
defer release()
close(requestRunning)
<-checkDone
return nil
})
<-requestRunning
eg.Go(func() error {
release, err := q.RequestArrival(ctx, tag)
require.Nil(t, release)
require.ErrorIs(t, err, ErrMClockSchedulerClosed)
return nil
})
// wait until second request will be blocked on wait
for q.waitingCount() == 0 {
time.Sleep(1 * time.Second)
}
q.Close()
release, err := q.RequestArrival(context.Background(), tag)
require.Nil(t, release)
require.ErrorIs(t, err, ErrMClockSchedulerClosed)
close(checkDone)
require.NoError(t, eg.Wait())
}
func TestMClockWaitLimit(t *testing.T) {
t.Parallel()
q, err := NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1},
}, 1000)
require.NoError(t, err)
q.clock = &noopClock{}
defer q.Close()
requestRunning := make(chan struct{})
checkDone := make(chan struct{})
eg, ctx := errgroup.WithContext(context.Background())
tag := "class1"
// running request
eg.Go(func() error {
release, err := q.RequestArrival(ctx, tag)
if err != nil {
return err
}
defer release()
close(requestRunning)
<-checkDone
return nil
})
// waiting request
eg.Go(func() error {
<-requestRunning
release, err := q.RequestArrival(ctx, tag)
require.NotNil(t, release)
require.NoError(t, err)
defer release()
<-checkDone
return nil
})
// wait until second request will be waiting
for q.waitingCount() == 0 {
time.Sleep(1 * time.Second)
}
release, err := q.RequestArrival(ctx, tag)
require.Nil(t, release)
require.ErrorIs(t, err, ErrMClockSchedulerRequestLimitExceeded)
close(checkDone)
require.NoError(t, eg.Wait())
}
func TestMClockParameterValidation(t *testing.T) {
_, err := NewMClock(0, 1, map[string]TagInfo{
"class1": {Share: 1},
}, 1000)
require.ErrorIs(t, err, ErrInvalidRunLimit)
_, err = NewMClock(1, 0, map[string]TagInfo{
"class1": {Share: 1},
}, 1000)
require.NoError(t, err)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1},
}, -1.0)
require.NoError(t, err)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1},
}, 0)
require.NoError(t, err)
negativeValue := -1.0
zeroValue := float64(0)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: negativeValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: zeroValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1.0, ReservedIOPS: &zeroValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1.0, ReservedIOPS: &negativeValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1.0, LimitIOPS: &zeroValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1.0, LimitIOPS: &negativeValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
}
func (q *MClock) waitingCount() int {
q.mtx.Lock()
defer q.mtx.Unlock()
return q.sharesQueue.Len()
}
func TestMClockTimeBasedSchedule(t *testing.T) {
t.Parallel()
limit := 1.0 // 1 request per second allowed
cl := &noopClock{v: float64(1.5)}
q, err := NewMClock(100, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 1, LimitIOPS: &limit},
}, 100)
require.NoError(t, err)
defer q.Close()
q.clock = cl
running := make(chan struct{})
checked := make(chan struct{})
eg, ctx := errgroup.WithContext(context.Background())
eg.Go(func() error {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
defer release()
close(running)
<-checked
return nil
})
<-running
// request must be scheduled at 2.0
_, _, err = q.pushRequest("class1")
require.NoError(t, err)
require.NotNil(t, cl.runAtValue)
require.Equal(t, cl.v+1.0/limit, *cl.runAtValue)
close(checked)
require.NoError(t, eg.Wait())
}

100
scheduling/queue.go Normal file
View file

@ -0,0 +1,100 @@
package scheduling
type queueItem interface {
ts() float64
setIndex(idx int)
}
type queue struct {
items []queueItem
}
// Len implements heap.Interface.
func (q *queue) Len() int {
return len(q.items)
}
// Less implements heap.Interface.
func (q *queue) Less(i int, j int) bool {
return q.items[i].ts() < q.items[j].ts()
}
// Pop implements heap.Interface.
func (q *queue) Pop() any {
n := len(q.items)
item := q.items[n-1]
q.items[n-1] = nil
q.items = q.items[0 : n-1]
item.setIndex(invalidIndex)
return item
}
// Push implements heap.Interface.
func (q *queue) Push(x any) {
it := x.(queueItem)
it.setIndex(q.Len())
q.items = append(q.items, it)
}
// Swap implements heap.Interface.
func (q *queue) Swap(i int, j int) {
q.items[i], q.items[j] = q.items[j], q.items[i]
q.items[i].setIndex(i)
q.items[j].setIndex(j)
}
var _ queueItem = &reservationMQueueItem{}
type reservationMQueueItem struct {
r *request
}
func (i *reservationMQueueItem) ts() float64 {
return i.r.reservation
}
func (i *reservationMQueueItem) setIndex(idx int) {
i.r.reservationIdx = idx
}
var _ queueItem = &limitMQueueItem{}
type limitMQueueItem struct {
r *request
}
func (i *limitMQueueItem) ts() float64 {
return i.r.limit
}
func (i *limitMQueueItem) setIndex(idx int) {
i.r.limitIdx = idx
}
var _ queueItem = &sharesMQueueItem{}
type sharesMQueueItem struct {
r *request
}
func (i *sharesMQueueItem) ts() float64 {
return i.r.shares
}
func (i *sharesMQueueItem) setIndex(idx int) {
i.r.sharesIdx = idx
}
var _ queueItem = &readyMQueueItem{}
type readyMQueueItem struct {
r *request
}
func (i *readyMQueueItem) ts() float64 {
return i.r.shares
}
func (i *readyMQueueItem) setIndex(idx int) {
i.r.readyIdx = idx
}