mclock: Initial implementation #1
|
@ -23,11 +23,6 @@ repos:
|
||||||
hooks:
|
hooks:
|
||||||
- id: shellcheck
|
- id: shellcheck
|
||||||
|
|
||||||
- repo: https://github.com/golangci/golangci-lint
|
|
||||||
rev: v1.59.1
|
|
||||||
hooks:
|
|
||||||
- id: golangci-lint
|
|
||||||
|
|
||||||
- repo: local
|
- repo: local
|
||||||
hooks:
|
hooks:
|
||||||
- id: go-unit-tests
|
- id: go-unit-tests
|
||||||
|
@ -36,3 +31,9 @@ repos:
|
||||||
pass_filenames: false
|
pass_filenames: false
|
||||||
types: [go]
|
types: [go]
|
||||||
language: system
|
language: system
|
||||||
|
- id: golangci-lint
|
||||||
|
name: golangci-lint check
|
||||||
|
entry: make lint
|
||||||
|
pass_filenames: false
|
||||||
|
types: [go]
|
||||||
|
language: system
|
||||||
|
|
8
Makefile
|
@ -56,7 +56,7 @@ pre-commit-run:
|
||||||
# Install linters
|
# Install linters
|
||||||
lint-install:
|
lint-install:
|
||||||
@rm -rf $(LINT_DIR)
|
@rm -rf $(LINT_DIR)
|
||||||
@mkdir $(LINT_DIR)
|
@mkdir -p $(LINT_DIR)
|
||||||
@CGO_ENABLED=1 GOBIN=$(LINT_VERSION_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
|
@CGO_ENABLED=1 GOBIN=$(LINT_VERSION_DIR) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v$(LINT_VERSION)
|
||||||
|
|
||||||
# Run linters
|
# Run linters
|
||||||
|
@ -69,7 +69,7 @@ lint:
|
||||||
# Install staticcheck
|
# Install staticcheck
|
||||||
staticcheck-install:
|
staticcheck-install:
|
||||||
@rm -rf $(STATICCHECK_DIR)
|
@rm -rf $(STATICCHECK_DIR)
|
||||||
@mkdir $(STATICCHECK_DIR)
|
@mkdir -p $(STATICCHECK_DIR)
|
||||||
@GOBIN=$(STATICCHECK_VERSION_DIR) go install honnef.co/go/tools/cmd/staticcheck@$(STATICCHECK_VERSION)
|
@GOBIN=$(STATICCHECK_VERSION_DIR) go install honnef.co/go/tools/cmd/staticcheck@$(STATICCHECK_VERSION)
|
||||||
|
|
||||||
# Run staticcheck
|
# Run staticcheck
|
||||||
|
@ -82,7 +82,7 @@ staticcheck-run:
|
||||||
# Install gopls
|
# Install gopls
|
||||||
gopls-install:
|
gopls-install:
|
||||||
@rm -rf $(GOPLS_DIR)
|
@rm -rf $(GOPLS_DIR)
|
||||||
@mkdir $(GOPLS_DIR)
|
@mkdir -p $(GOPLS_DIR)
|
||||||
@GOBIN=$(GOPLS_VERSION_DIR) go install golang.org/x/tools/gopls@$(GOPLS_VERSION)
|
@GOBIN=$(GOPLS_VERSION_DIR) go install golang.org/x/tools/gopls@$(GOPLS_VERSION)
|
||||||
|
|
||||||
# Run gopls
|
# Run gopls
|
||||||
|
@ -100,7 +100,7 @@ gopls-run:
|
||||||
# Install gofumpt
|
# Install gofumpt
|
||||||
fumpt-install:
|
fumpt-install:
|
||||||
@rm -rf $(GOFUMPT_DIR)
|
@rm -rf $(GOFUMPT_DIR)
|
||||||
@mkdir $(GOFUMPT_DIR)
|
@mkdir -p $(GOFUMPT_DIR)
|
||||||
@GOBIN=$(GOFUMPT_VERSION_DIR) go install mvdan.cc/gofumpt@$(GOFUMPT_VERSION)
|
@GOBIN=$(GOFUMPT_VERSION_DIR) go install mvdan.cc/gofumpt@$(GOFUMPT_VERSION)
|
||||||
|
|
||||||
# Run gofumpt
|
# Run gofumpt
|
||||||
|
|
11
go.mod
|
@ -1,3 +1,14 @@
|
||||||
module git.frostfs.info/TrueCloudLab/frostfs-qos
|
module git.frostfs.info/TrueCloudLab/frostfs-qos
|
||||||
|
|
||||||
go 1.22
|
go 1.22
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/stretchr/testify v1.9.0
|
||||||
|
golang.org/x/sync v0.10.0
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
|
)
|
||||||
|
|
12
go.sum
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||||
|
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
|
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||||
|
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
9
internal/assert/cond.go
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
package assert
|
||||||
|
|
||||||
|
import "strings"
|
||||||
|
|
||||||
|
func Cond(cond bool, details ...string) {
|
||||||
|
if !cond {
|
||||||
|
panic(strings.Join(details, " "))
|
||||||
|
}
|
||||||
|
}
|
84
scheduling/clock.go
Normal file
|
@ -0,0 +1,84 @@
|
||||||
|
package scheduling
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type clock interface {
|
||||||
|
now() float64
|
||||||
|
runAt(ts float64, f func())
|
||||||
|
close()
|
||||||
|
}
|
||||||
|
|
||||||
|
type scheduleInfo struct {
|
||||||
|
ts float64
|
||||||
|
f func()
|
||||||
|
}
|
||||||
|
|
||||||
|
type systemClock struct {
|
||||||
|
since time.Time
|
||||||
|
schedule chan scheduleInfo
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSystemClock() *systemClock {
|
||||||
|
c := &systemClock{
|
||||||
|
since: time.Now(),
|
||||||
|
schedule: make(chan scheduleInfo),
|
||||||
|
}
|
||||||
|
c.start()
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) now() float64 {
|
||||||
|
return time.Since(c.since).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) runAt(ts float64, f func()) {
|
||||||
|
select {
|
||||||
|
case c.schedule <- scheduleInfo{ts: ts, f: f}:
|
||||||
|
default: // timer fired, scheduleRequest will call runAt again
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) close() {
|
||||||
|
close(c.schedule)
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) start() {
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
t := time.NewTimer(0)
|
||||||
|
<-t.C
|
||||||
|
var f func()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
if f != nil {
|
||||||
|
f()
|
||||||
|
f = nil
|
||||||
|
}
|
||||||
|
case s, ok := <-c.schedule:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var d time.Duration
|
||||||
|
now := c.now()
|
||||||
|
if now < s.ts {
|
||||||
|
d = time.Duration((s.ts - now) * 1e9)
|
||||||
|
}
|
||||||
|
if !t.Stop() {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Reset(d)
|
||||||
|
f = s.f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
399
scheduling/mclock.go
Normal file
|
@ -0,0 +1,399 @@
|
||||||
|
package scheduling
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/heap"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/internal/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
invalidIndex = -1
|
||||||
|
undefinedReservation float64 = -1.0
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed")
|
||||||
|
ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded")
|
||||||
|
ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
|
||||||
|
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
|
||||||
|
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
|
||||||
|
)
|
||||||
|
|
||||||
|
type request struct {
|
||||||
|
tag string
|
||||||
|
ts float64
|
||||||
|
|
||||||
|
reservation float64
|
||||||
|
limit float64
|
||||||
|
shares float64
|
||||||
|
|
||||||
|
reservationIdx int
|
||||||
|
limitIdx int
|
||||||
|
sharesIdx int
|
||||||
|
readyIdx int
|
||||||
|
|
||||||
|
scheduled chan struct{}
|
||||||
|
canceled chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReleaseFunc is the type of function that should be called after the request is completed.
|
||||||
|
type ReleaseFunc func()
|
||||||
|
|
||||||
|
// TagInfo contains reserved IOPS, IOPS limit and share values for a tag.
|
||||||
|
type TagInfo struct {
|
||||||
|
ReservedIOPS *float64
|
||||||
|
LimitIOPS *float64
|
||||||
|
Share float64
|
||||||
|
}
|
||||||
|
|
||||||
|
// MClock is mClock scheduling algorithm implementation.
|
||||||
|
//
|
||||||
|
// See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details.
|
||||||
|
type MClock struct {
|
||||||
|
runLimit uint64
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
waitLimit int
|
||||||
|
clock clock
|
||||||
|
idleTimeout float64
|
||||||
|
tagInfo map[string]TagInfo
|
||||||
|
|
||||||
|
mtx sync.Mutex
|
||||||
|
previous map[string]*request
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
`realization` -> `implementation`?
Realize is rather about plans in general :)
dstepanov-yadro
commented
fixed fixed
|
|||||||
|
inProgress uint64
|
||||||
|
timeBasedScheduleTs float64
|
||||||
|
reservationQueue *queue
|
||||||
|
limitQueue *queue
|
||||||
|
sharesQueue *queue
|
||||||
|
readyQueue *queue
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMClock creates new MClock scheduler instance with
|
||||||
|
// runLimit maximum allowed count of running requests and
|
||||||
|
// waitLimit maximum allowed count of waiting requests
|
||||||
|
// for tags specified by tagInfo. The value of idleTimeout defines
|
||||||
|
// the difference between the current time and the time of
|
||||||
|
// the previous request, at which the tag considered idle.
|
||||||
|
// If idleTimeout is negative, it means that there is no idle tags allowed.
|
||||||
|
// If waitLimit equals zero, it means that there is no limit on the
|
||||||
|
// number of waiting requests.
|
||||||
|
func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout time.Duration) (*MClock, error) {
|
||||||
|
if err := validateParams(runLimit, tagInfo); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result := &MClock{
|
||||||
|
runLimit: runLimit,
|
||||||
|
waitLimit: int(waitLimit),
|
||||||
|
clock: newSystemClock(),
|
||||||
|
idleTimeout: idleTimeout.Seconds(),
|
||||||
|
tagInfo: tagInfo,
|
||||||
|
|
||||||
|
reservationQueue: &queue{},
|
||||||
|
limitQueue: &queue{},
|
||||||
|
sharesQueue: &queue{},
|
||||||
|
readyQueue: &queue{},
|
||||||
|
timeBasedScheduleTs: math.MaxFloat64,
|
||||||
|
}
|
||||||
|
|
||||||
|
previous := make(map[string]*request)
|
||||||
|
for tag := range tagInfo {
|
||||||
|
previous[tag] = &request{
|
||||||
|
tag: tag,
|
||||||
|
reservationIdx: invalidIndex,
|
||||||
|
limitIdx: invalidIndex,
|
||||||
|
sharesIdx: invalidIndex,
|
||||||
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`make([]queueItem, 0)` looks like an anti-pattern to me, why don't we use `nil` and omit this line?
dstepanov-yadro
commented
Done Done
|
|||||||
|
}
|
||||||
|
result.previous = previous
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestArrival schedules new request with tag value.
|
||||||
|
// Method call is blocked until one of the following events occurs:
|
||||||
|
// request with the tag is scheduled for execution,
|
||||||
|
// context ctx is canceled or the scheduler is closed.
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Probably Checking my understanding: if it is not called, the space is not released and the queue is eventually saturated. Probably `s/should/MUST/`?
Checking my understanding: if it is not called, the space is not released and the queue is eventually saturated.
However, `Close()` will release all requests, without surprises.
Right?
dstepanov-yadro
commented
If it is not called, the count of in progress requests will not decreased and the queue is eventually saturated. If it is not called, the count of in progress requests will not decreased and the queue is eventually saturated.
`Close()` will release all requests, without surprises.
|
|||||||
|
// If the method call returned non-nil ReleaseFunc,
|
||||||
|
// then it must be called after the request is completed.
|
||||||
|
func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
q.dropRequest(req)
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-req.scheduled:
|
||||||
|
return release, nil
|
||||||
|
case <-req.canceled:
|
||||||
|
return nil, ErrMClockSchedulerClosed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes MClock scheduler.
|
||||||
|
// No new requests for scheduling will be accepted after the closing.
|
||||||
|
func (q *MClock) Close() {
|
||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
q.closed = true
|
||||||
|
q.clock.close()
|
||||||
|
for q.limitQueue.Len() > 0 {
|
||||||
|
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||||
|
close(item.r.canceled)
|
||||||
|
q.removeFromQueues(item.r)
|
||||||
|
}
|
||||||
acid-ant
commented
clocses -> closes clocses -> closes
dstepanov-yadro
commented
done done
|
|||||||
|
}
|
||||||
|
|
||||||
|
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
|
||||||
|
if runLimit == 0 {
|
||||||
|
return ErrInvalidRunLimit
|
||||||
|
}
|
||||||
|
for _, v := range tagInfo {
|
||||||
|
if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) {
|
||||||
|
return ErrInvalidTagInfo
|
||||||
|
}
|
||||||
|
if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) {
|
||||||
|
return ErrInvalidTagInfo
|
||||||
|
}
|
||||||
|
if math.IsNaN(v.Share) || v.Share <= float64(0) {
|
||||||
|
return ErrInvalidTagInfo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) dropRequest(req *request) {
|
||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-req.scheduled:
|
||||||
|
assert.Cond(q.inProgress > 0, "invalid requests count")
|
||||||
|
q.inProgress--
|
||||||
|
default:
|
||||||
fyrchik
commented
Hm, is Hm, is `float64` obligatory here?
fyrchik
commented
Also, is there any reason we use floats instead of ints? Also, is there any reason we use floats instead of ints?
dstepanov-yadro
commented
Algorithm uses division, and the weights can be arbitrary. That's why I chose float64. Algorithm uses division, and the weights can be arbitrary. That's why I chose float64.
a-savchuk
commented
I thought about it too
Your implementation uses seconds (float64 in Go) as a time unit. I think nanoseconds (int64 in Go) could be used too with some changes in the algorithm For example, take the following formula
Let's add units for each value
As I understand,
Then rewrite all formulas
What do you think? I thought about it too
> Algorithm uses division
Your implementation uses _seconds_ (float64 in Go) as a time unit. I think _nanoseconds_ (int64 in Go) could be used too with some changes in the algorithm
For example, take the following formula
```math
R_i^r = \max\left(R_i^{r-1} + \frac{1}{r_i},\ \mathrm{now}()\right)
```
Let's add units for each value
```math
R_i^r\ \mathrm{[s]}= \max\left(R_i^{r-1}\ \mathrm{[s]} + \frac{1}{r_i\ \mathrm{[IOPS]}},\ \mathrm{now}()\ \mathrm{[s]}\right)
```
As I understand, $1\ \mathrm{IOPS} = 1\ \mathrm{s}^{-1}$, so
```math
r_i = 1\ \mathrm{IOPS} = 1\ \mathrm{s}^{-1} = 1 \cdot 10^9\ \mathrm{ns}^{-1}
```
```math
R_i^r\ \mathrm{[s]}= \max\left(R_i^{r-1}\ \mathrm{[s]} + \frac{1}{r_i\ \mathrm{[s^{-1}]}},\ \mathrm{now}()\ \mathrm{[s]}\right)
```
Then rewrite all formulas
```math
R_i^r\ \mathrm{[ns]}= \max\left(R_i^{r-1}\ \mathrm{[ns]} + \frac{10^9}{r_i\ \mathrm{[ns^{-1}]}},\ \mathrm{now}()\ \mathrm{[ns]}\right)
```
```math
L_i^r\ \mathrm{[ns]}= \max\left(L_i^{r-1}\ \mathrm{[ns]} + \frac{10^9}{l_i\ \mathrm{[ns^{-1}]}},\ \mathrm{now}()\ \mathrm{[ns]}\right)
```
```math
P_i^r\ \mathrm{[ns]}= \max\left(P_i^{r-1}\ \mathrm{[ns]} + \frac{10^9\ \mathrm{[ns]}}{w_i},\ \mathrm{now}()\ \mathrm{[ns]}\right)
```
What do you think?
dstepanov-yadro
commented
As for me, using float64 shouldn't lead to any problems, and I personally find it more comfortable working with seconds than with nanoseconds. As for me, using float64 shouldn't lead to any problems, and I personally find it more comfortable working with seconds than with nanoseconds.
But also I changed units for `idleTimeout` and names of `TagInfo` fields.
|
|||||||
|
}
|
||||||
|
|
||||||
|
q.removeFromQueues(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`NaN` will not trigger this condition, though it seems like it should.
dstepanov-yadro
commented
Fixed Fixed
|
|||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
if q.closed {
|
||||||
|
return nil, nil, ErrMClockSchedulerClosed
|
||||||
|
}
|
||||||
|
if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit {
|
||||||
|
return nil, nil, ErrMClockSchedulerRequestLimitExceeded
|
||||||
|
}
|
||||||
|
|
||||||
|
now := q.clock.now()
|
||||||
|
tagInfo, ok := q.tagInfo[tag]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, ErrMClockSchedulerUnknownTag
|
||||||
|
}
|
||||||
|
prev, ok := q.previous[tag]
|
||||||
|
assert.Cond(ok, "undefined previous:", tag)
|
||||||
|
|
||||||
|
if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout
|
||||||
|
q.adjustTags(now, tag)
|
||||||
|
}
|
||||||
|
|
||||||
|
r := &request{
|
||||||
|
tag: tag,
|
||||||
|
ts: now,
|
||||||
|
shares: max(prev.shares+1.0/tagInfo.Share, now),
|
||||||
|
reservationIdx: invalidIndex,
|
||||||
|
limitIdx: invalidIndex,
|
||||||
|
sharesIdx: invalidIndex,
|
||||||
|
readyIdx: invalidIndex,
|
||||||
|
scheduled: make(chan struct{}),
|
||||||
|
canceled: make(chan struct{}),
|
||||||
|
}
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
I suppose we should add some value validation for tags as we expect that I suppose we should add some value validation for tags as we expect that `*tagInfo.Reservation != 0`
dstepanov-yadro
commented
Added validation and unit tests Added validation and unit tests
|
|||||||
|
if tagInfo.ReservedIOPS != nil {
|
||||||
|
r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now)
|
||||||
|
} else {
|
||||||
|
r.reservation = undefinedReservation
|
||||||
|
}
|
||||||
|
|
||||||
|
if tagInfo.LimitIOPS != nil {
|
||||||
|
r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now)
|
||||||
|
} else {
|
||||||
|
r.limit = max(prev.limit, now)
|
||||||
|
}
|
||||||
|
|
||||||
|
q.previous[tag] = r
|
||||||
|
if tagInfo.ReservedIOPS != nil {
|
||||||
|
heap.Push(q.reservationQueue, &reservationMQueueItem{r: r})
|
||||||
|
}
|
||||||
|
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
|
||||||
|
heap.Push(q.limitQueue, &limitMQueueItem{r: r})
|
||||||
|
q.scheduleRequestUnsafe()
|
||||||
|
|
||||||
|
return r, q.requestCompleted, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) adjustTags(now float64, idleTag string) {
|
||||||
|
if q.sharesQueue.Len() == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
minShare := q.sharesQueue.items[0].ts()
|
||||||
|
for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed
|
||||||
|
limitItem := item.(*limitMQueueItem)
|
||||||
|
if limitItem.r.tag == idleTag {
|
||||||
|
continue
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why do you have brackets around Why do you have brackets around `item`?
dstepanov-yadro
commented
Fixed Fixed
|
|||||||
|
}
|
||||||
|
limitItem.r.shares -= (minShare - now)
|
||||||
|
if limitItem.r.sharesIdx != invalidIndex {
|
||||||
|
heap.Fix(q.sharesQueue, limitItem.r.sharesIdx)
|
||||||
|
}
|
||||||
|
if limitItem.r.readyIdx != invalidIndex {
|
||||||
|
heap.Fix(q.readyQueue, limitItem.r.readyIdx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) scheduleRequest() {
|
||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
IMO making 2 separate functions would be more readable (i.e. IMO making 2 separate functions would be more readable (i.e. `schedule() { lock(); defer unlock(); scheduleUnsafe() }` and `scheduleUnsafe()`), because it is not obvious what this bool value means without looking at the function definition.
dstepanov-yadro
commented
Done Done
|
|||||||
|
q.scheduleRequestUnsafe()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) scheduleRequestUnsafe() {
|
||||||
|
if q.inProgress >= q.runLimit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
now := q.clock.now()
|
||||||
|
q.scheduleByReservation(now)
|
||||||
|
if q.inProgress >= q.runLimit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
q.scheduleByLimitAndWeight(now)
|
||||||
|
if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
q.setNextScheduleTimer(now)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) setNextScheduleTimer(now float64) {
|
||||||
|
nextTs := math.MaxFloat64
|
||||||
|
if q.reservationQueue.Len() > 0 {
|
||||||
|
nextTs = q.reservationQueue.items[0].ts()
|
||||||
|
}
|
||||||
|
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
|
||||||
|
nextTs = q.limitQueue.items[0].ts()
|
||||||
|
}
|
||||||
a-savchuk marked this conversation as resolved
Outdated
a-savchuk
commented
When can When can `inProgress` be greater than `runLimit`?
dstepanov-yadro
commented
These values can be equal. Greater check is just foolproof. These values can be equal. Greater check is just foolproof.
|
|||||||
|
if nextTs <= now {
|
||||||
|
// should not happen as we always compare .ts() <= now
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.timeBasedScheduleTs > nextTs {
|
||||||
|
q.clock.runAt(nextTs, func() {
|
||||||
|
q.scheduleRequest()
|
||||||
fyrchik
commented
There is a possible call-chain Why does this not happen? Can we make it more obvious? There is a possible call-chain `runAt()` -> `scheduleRequest()` -> `setNextScheduleTimer()` -> `runAt()`, where the last `runAt` call blocks on a non-buffered `schedule` channel.
Why does this not happen? Can we make it more obvious?
dstepanov-yadro
commented
Thank you very much! This is really a bug!
Fixed:
Thank you very much! This is really a bug!
```
scheduleRequest(lock taken) ||
|| ||
|| time fired, scheduleRequest called
|| ||
|| ||
runAt() called, push to channel ||
|| ||
deadlock
```
Fixed:
1. `systemClock`'s timer created stopped
2. `scheduleRequest` called only when timer is fired inside `systemClock`
2. `runAt` has `default` branch, to if timer fired and other `scheduleRequest` called, we just skip push to `schedule` channel, because another `runAt` call will be performed by `scheduleRequest`
|
|||||||
|
})
|
||||||
|
q.timeBasedScheduleTs = nextTs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
||||||
|
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
|
||||||
|
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||||
|
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
|
||||||
|
}
|
||||||
|
|
||||||
|
for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 {
|
||||||
fyrchik
commented
Just a comment, not expecting any changes in this PR: This loop is run with under Just a comment, not expecting any changes in this PR:
This loop is run with under `q.mtx` lock, so arriving requests are blocked by this processing routine.
Not saying that this is a problem, but it could become in theory (latency spikes or slow degradation).
We have limited queue size, though. And new requests can be dropped, so the damage is resricted.
|
|||||||
|
next := heap.Pop(q.readyQueue).(*readyMQueueItem)
|
||||||
|
hadReservation := false
|
||||||
|
if next.r.reservationIdx != invalidIndex {
|
||||||
|
hadReservation = true
|
||||||
achuprov marked this conversation as resolved
Outdated
achuprov
commented
What does this condition mean? What does this condition mean?
dstepanov-yadro
commented
Good question! It was an error. Good question! It was an error.
It must be right so: `if q.lastSchedule > nextTs`
This means that nextTs is closer than already scheduled, so we must overwrite it.
|
|||||||
|
heap.Remove(q.reservationQueue, next.r.reservationIdx)
|
||||||
|
}
|
||||||
|
q.removeFromQueues(next.r)
|
||||||
|
|
||||||
|
tagInfo, ok := q.tagInfo[next.r.tag]
|
||||||
|
assert.Cond(ok, "unknown tag:", next.r.tag)
|
||||||
|
if tagInfo.ReservedIOPS != nil && hadReservation {
|
||||||
|
var updated bool
|
||||||
|
for _, i := range q.reservationQueue.items {
|
||||||
|
ri := i.(*reservationMQueueItem)
|
||||||
|
if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation {
|
||||||
|
ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS
|
||||||
|
updated = true
|
||||||
|
}
|
||||||
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Have you considered using Have you considered using `heap.Fix(q.reservationQueue, i)` here instead of `heap.Init()` after the loop?
dstepanov-yadro
commented
I've been thinking about it. But iterating through the elements and changing the queue at the same time can be difficult to understand. I've been thinking about it. But iterating through the elements and changing the queue at the same time can be difficult to understand.
fyrchik
commented
Oh, yes, missed that it can be done multiple times. Oh, yes, missed that it can be done multiple times.
|
|||||||
|
if updated {
|
||||||
|
heap.Init(q.reservationQueue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-next.r.canceled:
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
assertIndexInvalid(next.r)
|
||||||
|
q.inProgress++
|
||||||
|
close(next.r.scheduled)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) scheduleByReservation(now float64) {
|
||||||
|
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
|
||||||
|
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
||||||
|
q.removeFromQueues(next.r)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-next.r.canceled:
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
assertIndexInvalid(next.r)
|
||||||
|
q.inProgress++
|
||||||
|
close(next.r.scheduled)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) removeFromQueues(r *request) {
|
||||||
|
if r.limitIdx != invalidIndex {
|
||||||
|
heap.Remove(q.limitQueue, r.limitIdx)
|
||||||
|
}
|
||||||
|
if r.sharesIdx != invalidIndex {
|
||||||
|
heap.Remove(q.sharesQueue, r.sharesIdx)
|
||||||
|
}
|
||||||
|
if r.readyIdx != invalidIndex {
|
||||||
|
heap.Remove(q.readyQueue, r.readyIdx)
|
||||||
|
}
|
||||||
|
if r.reservationIdx != invalidIndex {
|
||||||
|
heap.Remove(q.reservationQueue, r.reservationIdx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) requestCompleted() {
|
||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
if q.closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Cond(q.inProgress > 0, "invalid requests count")
|
||||||
|
q.inProgress--
|
||||||
|
q.scheduleRequestUnsafe()
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertIndexInvalid(r *request) {
|
||||||
|
assert.Cond(r.limitIdx == invalidIndex, "limitIdx is not -1")
|
||||||
|
assert.Cond(r.sharesIdx == invalidIndex, "sharesIdx is not -1")
|
||||||
|
assert.Cond(r.reservationIdx == invalidIndex, "reservationIdx is not -1")
|
||||||
a-savchuk marked this conversation as resolved
Outdated
a-savchuk
commented
You used You used `readyIdx` instead of `reservationIdx`. Typo?
dstepanov-yadro
commented
Thx, fixed! Thx, fixed!
|
|||||||
|
assert.Cond(r.readyIdx == invalidIndex, "readyIdx is not -1")
|
||||||
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Still need this? Still need this?
dstepanov-yadro
commented
Yes: Yes: `waitingCount` is used by tests only, but linter excludes test files.
fyrchik
commented
Can we implement it in test file then? Can we implement it in test file then?
dstepanov-yadro
commented
Ok, done Ok, done
|
172
scheduling/mclock_bench.result
Normal file
|
@ -0,0 +1,172 @@
|
||||||
|
Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -tags integration -bench ^BenchmarkMClock$ git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling -count=1
|
||||||
|
|
||||||
|
goos: linux
|
||||||
|
goarch: amd64
|
||||||
|
pkg: git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling
|
||||||
|
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
|
||||||
|
BenchmarkMClock/impl=noop/parallelism=1-8 8623 136817 ns/op 0 B/op 0 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=1-8 7368 140674 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=2-8 8486 140394 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=4-8 8500 141410 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=8-8 8268 142724 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=1/tags=16-8 8431 142548 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=1-8 8505 142035 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=2-8 7845 142658 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=4-8 8473 140029 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=8-8 8518 142607 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=1/tags=16-8 8578 141002 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=1-8 8557 141858 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=2-8 8353 142742 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=4-8 8475 142753 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=8-8 8433 141319 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=1/tags=16-8 8480 141825 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=1-8 7827 141525 ns/op 371 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=2-8 7935 140939 ns/op 370 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=4-8 8472 140988 ns/op 368 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=8-8 8373 142260 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=1/tags=16-8 8383 142239 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=1-8 5727 206852 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=2-8 6516 178739 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=4-8 7300 163438 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=8-8 7807 152344 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=1/tags=16-8 8443 147051 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=1-8 6062 205018 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=2-8 6526 182511 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=4-8 7341 163028 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=8-8 7930 153741 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=1/tags=16-8 7804 148216 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=1-8 5485 207763 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=2-8 5774 181830 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=4-8 7262 165102 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=8-8 7231 152958 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=1/tags=16-8 7849 146705 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=1-8 5275 206549 ns/op 368 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=2-8 6115 180053 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=4-8 7264 163943 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=8-8 7810 152008 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=1/tags=16-8 7875 147107 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=noop/parallelism=8-8 8589 139356 ns/op 0 B/op 0 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=1-8 7916 142917 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=2-8 8392 141914 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=4-8 8444 141011 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=8-8 8419 140638 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=8/tags=16-8 8473 141018 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=1-8 8487 139941 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=2-8 7938 142745 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=4-8 8522 140837 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=8-8 8431 141361 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=8/tags=16-8 8390 142171 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=1-8 8449 140695 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=2-8 8467 140622 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=4-8 8460 140925 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=8-8 8487 141316 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=8/tags=16-8 7876 141374 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=1-8 7887 140590 ns/op 371 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=2-8 8328 142214 ns/op 370 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=4-8 8475 141472 ns/op 368 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=8-8 8402 141861 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=8/tags=16-8 8509 142173 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=1-8 5490 207911 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=2-8 6481 182955 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=4-8 6816 165103 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=8-8 6901 155528 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=8/tags=16-8 7690 148762 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=1-8 5437 205208 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=2-8 6092 183311 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=4-8 6907 162595 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=8-8 7756 151761 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=8/tags=16-8 7855 146382 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=1-8 5468 206883 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=2-8 6061 180350 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=4-8 6795 163866 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=8-8 7350 152345 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=8/tags=16-8 7869 145708 ns/op 374 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=1-8 5283 207099 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=2-8 6799 180029 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=4-8 7324 164306 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=8-8 7770 152377 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=8/tags=16-8 8342 146888 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=noop/parallelism=32-8 8604 140481 ns/op 0 B/op 0 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=1-8 8491 142215 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=2-8 8508 140537 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=4-8 8320 142631 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=8-8 8368 142430 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=32/tags=16-8 8432 141733 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=1-8 7855 141754 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=2-8 7858 141304 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=4-8 8545 140996 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=8-8 8437 142022 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=32/tags=16-8 8418 142653 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=1-8 8448 141117 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=2-8 8530 142164 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=4-8 7944 142449 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=8-8 8551 139223 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=32/tags=16-8 8491 140160 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=1-8 8354 141835 ns/op 371 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=2-8 7880 141608 ns/op 370 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=4-8 7940 140794 ns/op 368 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=8-8 8414 140646 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=32/tags=16-8 8373 140890 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=1-8 5256 209447 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=2-8 6451 183969 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=4-8 7326 163980 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=8-8 7862 152768 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=32/tags=16-8 8390 147437 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=1-8 5228 206086 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=2-8 6471 181844 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=4-8 7318 163604 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=8-8 7827 151880 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=32/tags=16-8 8362 146623 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=1-8 5541 210639 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=2-8 5818 183541 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=4-8 6910 163609 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=8-8 7797 152752 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=32/tags=16-8 7344 146966 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=1-8 5746 206651 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=2-8 6490 182702 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=4-8 7250 164727 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=8-8 7386 152508 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=32/tags=16-8 8379 146547 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=noop/parallelism=64-8 8486 138281 ns/op 0 B/op 0 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=1-8 8472 142782 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=2-8 8437 140925 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=4-8 8338 141035 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=8-8 8487 142288 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=no/parallelism=64/tags=16-8 8366 142353 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=1-8 8510 140838 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=2-8 7935 142844 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=4-8 8218 139362 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=8-8 7977 140291 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=1.0/parallelism=64/tags=16-8 8371 140322 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=1-8 8524 140484 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=2-8 8461 142431 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=4-8 8420 141652 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=8-8 8385 140956 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=100.0/parallelism=64/tags=16-8 8355 142509 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=1-8 7239 141018 ns/op 371 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=2-8 8467 141807 ns/op 370 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=4-8 8420 140763 ns/op 368 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=8-8 8474 140264 ns/op 366 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=no/reservation=10000.0/parallelism=64/tags=16-8 8413 142191 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=1-8 5474 208031 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=2-8 5706 182794 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=4-8 7248 165044 ns/op 364 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=8-8 7825 153229 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=no/parallelism=64/tags=16-8 7879 148568 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=1-8 5278 211267 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=2-8 6108 183247 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=4-8 7338 163152 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=8-8 7339 154054 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=1.0/parallelism=64/tags=16-8 7750 146000 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=1-8 5716 208259 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=2-8 6450 185159 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=4-8 7285 168077 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=8-8 7357 151950 ns/op 372 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=100.0/parallelism=64/tags=16-8 8257 147548 ns/op 373 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=1-8 5245 207383 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=2-8 6115 179041 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=4-8 6831 164377 ns/op 367 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=8-8 7378 152743 ns/op 365 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/impl=mclock/limit=100000.0/reservation=10000.0/parallelism=64/tags=16-8 7837 148694 ns/op 366 B/op 8 allocs/op
|
||||||
|
PASS
|
||||||
|
ok git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling 194.532s
|
87
scheduling/mclock_bench_test.go
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
package scheduling
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"math/rand/v2"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type noopMClockScheduler struct{}
|
||||||
|
|
||||||
|
var (
|
||||||
|
releaseStub ReleaseFunc = func() {}
|
||||||
|
defaultLimit float64 = 100_000
|
||||||
|
shortReservation float64 = 1
|
||||||
|
medReservation float64 = 100
|
||||||
|
largeReservation float64 = 10_000
|
||||||
a-savchuk marked this conversation as resolved
Outdated
a-savchuk
commented
I think I think `10_000` would look better
dstepanov-yadro
commented
Fixed Fixed
|
|||||||
|
)
|
||||||
|
|
||||||
|
func (s *noopMClockScheduler) RequestArrival(context.Context, string) ReleaseFunc {
|
||||||
|
return releaseStub
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkMClock(b *testing.B) {
|
||||||
|
tagsCount := []int{1, 2, 4, 8, 16}
|
||||||
|
ioDuration := time.Millisecond
|
||||||
|
parallelismValues := []int{1, 8, 32, 64}
|
||||||
|
limits := []*float64{nil, &defaultLimit}
|
||||||
|
reservations := []*float64{nil, &shortReservation, &medReservation, &largeReservation}
|
||||||
|
for _, parallelism := range parallelismValues {
|
||||||
|
b.SetParallelism(parallelism)
|
||||||
|
|
||||||
|
noopMClock := &noopMClockScheduler{}
|
||||||
|
b.Run(fmt.Sprintf("impl=noop/parallelism=%d", parallelism), func(b *testing.B) {
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
release := noopMClock.RequestArrival(context.Background(), "tag")
|
||||||
|
time.Sleep(ioDuration)
|
||||||
|
release()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, limit := range limits {
|
||||||
|
for _, reservation := range reservations {
|
||||||
|
for _, tags := range tagsCount {
|
||||||
|
tagInfos := make(map[string]TagInfo)
|
||||||
|
for tag := 0; tag < tags; tag++ {
|
||||||
|
tagInfos["tag"+strconv.FormatInt(int64(tag), 10)] = TagInfo{Share: 50, LimitIOPS: limit, ReservedIOPS: reservation}
|
||||||
|
}
|
||||||
|
|
||||||
|
mClockQ, _ := NewMClock(math.MaxUint64, math.MaxUint64, tagInfos, time.Hour)
|
||||||
|
|
||||||
|
resStr := "no"
|
||||||
|
if reservation != nil {
|
||||||
|
resStr = strconv.FormatFloat(*reservation, 'f', 1, 64)
|
||||||
|
}
|
||||||
|
limitStr := "no"
|
||||||
|
if limit != nil {
|
||||||
|
limitStr = strconv.FormatFloat(*limit, 'f', 1, 64)
|
||||||
|
}
|
||||||
|
b.Run(fmt.Sprintf("impl=mclock/limit=%s/reservation=%s/parallelism=%d/tags=%d", limitStr, resStr, parallelism, tags), func(b *testing.B) {
|
||||||
a-savchuk
commented
Could we use benchstat syntax for logging benchmark parameters? TrueCloudLab/frostfs-node#1472 Could we use benchstat syntax for logging benchmark parameters?
https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/1472
https://pkg.go.dev/golang.org/x/perf/benchproc/syntax
|
|||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
tag := rand.Int64N(int64(tags))
|
||||||
|
release, err := mClockQ.RequestArrival(context.Background(), "tag"+strconv.FormatInt(int64(tag), 10))
|
||||||
|
require.NoError(b, err)
|
||||||
|
time.Sleep(ioDuration)
|
||||||
|
release()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
495
scheduling/mclock_test.go
Normal file
|
@ -0,0 +1,495 @@
|
||||||
|
package scheduling
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMClockSharesScheduling(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
reqCount := 1000
|
||||||
|
reqCount = (reqCount / 2) * 2
|
||||||
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {Share: 2},
|
||||||
|
"class2": {Share: 1},
|
||||||
|
}, 100)
|
||||||
|
require.NoError(t, err)
|
||||||
|
q.clock = &noopClock{}
|
||||||
|
|
||||||
|
var releases []ReleaseFunc
|
||||||
|
var requests []*request
|
||||||
|
tag := "class1"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
tag = "class2"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []string
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < reqCount; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-requests[i].scheduled
|
||||||
|
result = append(result, requests[i].tag)
|
||||||
|
releases[i]()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
|
||||||
|
// because the ratio is 2 to 1.
|
||||||
|
// However, there may be deviations due to rounding and sorting.
|
||||||
|
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
|
||||||
|
var class1Count int
|
||||||
|
var class2Count int
|
||||||
|
var class2MaxSeq int
|
||||||
|
for _, res := range result {
|
||||||
|
switch res {
|
||||||
|
case "class1":
|
||||||
|
class1Count++
|
||||||
|
class2MaxSeq = 0
|
||||||
|
case "class2":
|
||||||
|
class2Count++
|
||||||
|
class2MaxSeq++
|
||||||
|
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ clock = &noopClock{}
|
||||||
|
|
||||||
|
type noopClock struct {
|
||||||
|
v float64
|
||||||
|
runAtValue *float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClock) now() float64 {
|
||||||
|
return n.v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClock) runAt(ts float64, f func()) {
|
||||||
|
n.runAtValue = &ts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClock) close() {}
|
||||||
|
|
||||||
|
func TestMClockRequestCancel(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {Share: 2},
|
||||||
|
"class2": {Share: 1},
|
||||||
|
}, 100)
|
||||||
|
require.NoError(t, err)
|
||||||
|
q.clock = &noopClock{}
|
||||||
|
|
||||||
|
release1, err := q.RequestArrival(context.Background(), "class1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
release2, err := q.RequestArrival(ctx, "class1")
|
||||||
|
require.Nil(t, release2)
|
||||||
|
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||||
|
|
||||||
|
require.Equal(t, 0, q.readyQueue.Len())
|
||||||
|
require.Equal(t, 0, q.sharesQueue.Len())
|
||||||
|
require.Equal(t, 0, q.limitQueue.Len())
|
||||||
|
require.Equal(t, 0, q.reservationQueue.Len())
|
||||||
|
|
||||||
|
release1()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockLimitScheduling(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
reqCount := 100
|
||||||
|
reqCount = (reqCount / 2) * 2
|
||||||
|
limit := 1.0
|
||||||
|
cl := &noopClock{}
|
||||||
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {Share: 2, LimitIOPS: &limit},
|
||||||
|
"class2": {Share: 1, LimitIOPS: &limit},
|
||||||
|
}, 100)
|
||||||
|
require.NoError(t, err)
|
||||||
|
q.clock = cl
|
||||||
|
|
||||||
|
var releases []ReleaseFunc
|
||||||
|
var requests []*request
|
||||||
|
tag := "class1"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
tag = "class2"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
|
||||||
|
q.scheduleRequest()
|
||||||
|
|
||||||
|
for _, req := range requests {
|
||||||
|
select {
|
||||||
|
case <-req.scheduled:
|
||||||
|
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cl.v = math.MaxFloat64
|
||||||
|
|
||||||
|
var result []string
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < reqCount; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-requests[i].scheduled
|
||||||
|
result = append(result, requests[i].tag)
|
||||||
|
releases[i]()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
q.scheduleRequest()
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
|
||||||
|
// because the ratio is 2 to 1.
|
||||||
|
// However, there may be deviations due to rounding and sorting.
|
||||||
|
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
|
||||||
|
var class1Count int
|
||||||
|
var class2Count int
|
||||||
|
var class2MaxSeq int
|
||||||
|
for _, res := range result {
|
||||||
|
switch res {
|
||||||
|
case "class1":
|
||||||
|
class1Count++
|
||||||
|
class2MaxSeq = 0
|
||||||
|
case "class2":
|
||||||
|
class2Count++
|
||||||
|
class2MaxSeq++
|
||||||
|
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
|
||||||
|
|
||||||
|
require.Equal(t, 0, q.readyQueue.Len())
|
||||||
|
require.Equal(t, 0, q.sharesQueue.Len())
|
||||||
|
require.Equal(t, 0, q.limitQueue.Len())
|
||||||
|
require.Equal(t, 0, q.reservationQueue.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockReservationScheduling(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
reqCount := 1000
|
||||||
|
reqCount = (reqCount / 2) * 2
|
||||||
|
limit := 0.01 // 1 request in 100 seconds
|
||||||
|
resevation := 100.0 // 100 RPS
|
||||||
|
cl := &noopClock{}
|
||||||
|
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {Share: 2, LimitIOPS: &limit},
|
||||||
|
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
|
||||||
|
}, 100)
|
||||||
|
require.NoError(t, err)
|
||||||
|
q.clock = cl
|
||||||
|
|
||||||
|
var releases []ReleaseFunc
|
||||||
|
var requests []*request
|
||||||
|
tag := "class1"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
tag = "class2"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
|
||||||
|
q.scheduleRequest()
|
||||||
|
|
||||||
|
for _, req := range requests {
|
||||||
|
select {
|
||||||
|
case <-req.scheduled:
|
||||||
|
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cl.v = 1.00001 // 1s elapsed
|
||||||
|
q.scheduleRequest()
|
||||||
|
|
||||||
|
var result []string
|
||||||
|
for i, req := range requests {
|
||||||
|
select {
|
||||||
|
case <-req.scheduled:
|
||||||
|
result = append(result, requests[i].tag)
|
||||||
|
releases[i]()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, 100, len(result))
|
||||||
|
for _, res := range result {
|
||||||
|
require.Equal(t, "class2", res)
|
||||||
|
}
|
||||||
|
|
||||||
|
cl.v = math.MaxFloat64
|
||||||
|
q.scheduleRequest()
|
||||||
|
|
||||||
|
require.Equal(t, 0, q.readyQueue.Len())
|
||||||
|
require.Equal(t, 0, q.sharesQueue.Len())
|
||||||
|
require.Equal(t, 0, q.limitQueue.Len())
|
||||||
|
require.Equal(t, 0, q.reservationQueue.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockIdleTag(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
reqCount := 100
|
||||||
|
idleTimeout := 2 * time.Second
|
||||||
|
cl := &noopClock{}
|
||||||
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1},
|
||||||
|
"class2": {Share: 1},
|
||||||
|
}, idleTimeout)
|
||||||
|
require.NoError(t, err)
|
||||||
|
q.clock = cl
|
||||||
|
|
||||||
|
var requests []*request
|
||||||
|
tag := "class1"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
cl.v += idleTimeout.Seconds() / 2
|
||||||
|
req, _, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// class1 requests have shares [1.0; 2.0; 3.0; ... ]
|
||||||
|
|
||||||
|
cl.v += 2 * idleTimeout.Seconds()
|
||||||
|
|
||||||
|
tag = "class2"
|
||||||
|
req, _, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
|
||||||
|
// class2 must be defined as idle, so all shares tags must be adjusted.
|
||||||
|
|
||||||
|
for _, req := range requests {
|
||||||
|
select {
|
||||||
|
case <-req.scheduled:
|
||||||
|
default:
|
||||||
|
require.True(t, req.shares >= cl.v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockClose(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1},
|
||||||
|
}, 1000)
|
||||||
|
require.NoError(t, err)
|
||||||
|
q.clock = &noopClock{}
|
||||||
|
|
||||||
|
requestRunning := make(chan struct{})
|
||||||
|
checkDone := make(chan struct{})
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
tag := "class1"
|
||||||
|
eg.Go(func() error {
|
||||||
|
release, err := q.RequestArrival(ctx, tag)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
|
close(requestRunning)
|
||||||
|
<-checkDone
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
<-requestRunning
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
release, err := q.RequestArrival(ctx, tag)
|
||||||
|
require.Nil(t, release)
|
||||||
|
require.ErrorIs(t, err, ErrMClockSchedulerClosed)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// wait until second request will be blocked on wait
|
||||||
|
for q.waitingCount() == 0 {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
q.Close()
|
||||||
|
|
||||||
|
release, err := q.RequestArrival(context.Background(), tag)
|
||||||
|
require.Nil(t, release)
|
||||||
|
require.ErrorIs(t, err, ErrMClockSchedulerClosed)
|
||||||
|
|
||||||
|
close(checkDone)
|
||||||
|
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockWaitLimit(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q, err := NewMClock(1, 1, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1},
|
||||||
|
}, 1000)
|
||||||
|
require.NoError(t, err)
|
||||||
|
q.clock = &noopClock{}
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
requestRunning := make(chan struct{})
|
||||||
|
checkDone := make(chan struct{})
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
tag := "class1"
|
||||||
|
// running request
|
||||||
|
eg.Go(func() error {
|
||||||
|
release, err := q.RequestArrival(ctx, tag)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
|
close(requestRunning)
|
||||||
|
<-checkDone
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// waiting request
|
||||||
|
eg.Go(func() error {
|
||||||
|
<-requestRunning
|
||||||
|
release, err := q.RequestArrival(ctx, tag)
|
||||||
|
require.NotNil(t, release)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer release()
|
||||||
|
<-checkDone
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// wait until second request will be waiting
|
||||||
|
for q.waitingCount() == 0 {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
release, err := q.RequestArrival(ctx, tag)
|
||||||
|
require.Nil(t, release)
|
||||||
|
require.ErrorIs(t, err, ErrMClockSchedulerRequestLimitExceeded)
|
||||||
|
|
||||||
|
close(checkDone)
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockParameterValidation(t *testing.T) {
|
||||||
|
_, err := NewMClock(0, 1, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1},
|
||||||
|
}, 1000)
|
||||||
|
require.ErrorIs(t, err, ErrInvalidRunLimit)
|
||||||
|
_, err = NewMClock(1, 0, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1},
|
||||||
|
}, 1000)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1},
|
||||||
|
}, -1.0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1},
|
||||||
|
}, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
negativeValue := -1.0
|
||||||
|
zeroValue := float64(0)
|
||||||
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
||||||
|
"class1": {Share: negativeValue},
|
||||||
|
}, 1000)
|
||||||
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
||||||
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
||||||
|
"class1": {Share: zeroValue},
|
||||||
|
}, 1000)
|
||||||
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
||||||
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1.0, ReservedIOPS: &zeroValue},
|
||||||
|
}, 1000)
|
||||||
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
||||||
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1.0, ReservedIOPS: &negativeValue},
|
||||||
|
}, 1000)
|
||||||
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
||||||
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1.0, LimitIOPS: &zeroValue},
|
||||||
|
}, 1000)
|
||||||
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
||||||
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1.0, LimitIOPS: &negativeValue},
|
||||||
|
}, 1000)
|
||||||
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClock) waitingCount() int {
|
||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
return q.sharesQueue.Len()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockTimeBasedSchedule(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
limit := 1.0 // 1 request per second allowed
|
||||||
|
cl := &noopClock{v: float64(1.5)}
|
||||||
|
q, err := NewMClock(100, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {Share: 1, LimitIOPS: &limit},
|
||||||
|
}, 100)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer q.Close()
|
||||||
|
q.clock = cl
|
||||||
|
|
||||||
|
running := make(chan struct{})
|
||||||
|
checked := make(chan struct{})
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
eg.Go(func() error {
|
||||||
|
release, err := q.RequestArrival(ctx, "class1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer release()
|
||||||
|
close(running)
|
||||||
|
<-checked
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
<-running
|
||||||
|
// request must be scheduled at 2.0
|
||||||
|
_, _, err = q.pushRequest("class1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, cl.runAtValue)
|
||||||
|
require.Equal(t, cl.v+1.0/limit, *cl.runAtValue)
|
||||||
|
close(checked)
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
}
|
100
scheduling/queue.go
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
package scheduling
|
||||||
|
|
||||||
|
type queueItem interface {
|
||||||
|
ts() float64
|
||||||
|
setIndex(idx int)
|
||||||
|
}
|
||||||
|
|
||||||
|
type queue struct {
|
||||||
|
items []queueItem
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len implements heap.Interface.
|
||||||
|
func (q *queue) Len() int {
|
||||||
|
return len(q.items)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Less implements heap.Interface.
|
||||||
|
func (q *queue) Less(i int, j int) bool {
|
||||||
|
return q.items[i].ts() < q.items[j].ts()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pop implements heap.Interface.
|
||||||
|
func (q *queue) Pop() any {
|
||||||
|
n := len(q.items)
|
||||||
|
item := q.items[n-1]
|
||||||
|
q.items[n-1] = nil
|
||||||
|
q.items = q.items[0 : n-1]
|
||||||
|
item.setIndex(invalidIndex)
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push implements heap.Interface.
|
||||||
|
func (q *queue) Push(x any) {
|
||||||
|
it := x.(queueItem)
|
||||||
|
it.setIndex(q.Len())
|
||||||
|
q.items = append(q.items, it)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap implements heap.Interface.
|
||||||
|
func (q *queue) Swap(i int, j int) {
|
||||||
|
q.items[i], q.items[j] = q.items[j], q.items[i]
|
||||||
|
q.items[i].setIndex(i)
|
||||||
|
q.items[j].setIndex(j)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ queueItem = &reservationMQueueItem{}
|
||||||
|
|
||||||
|
type reservationMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *reservationMQueueItem) ts() float64 {
|
||||||
|
return i.r.reservation
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *reservationMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.reservationIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ queueItem = &limitMQueueItem{}
|
||||||
|
|
||||||
|
type limitMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *limitMQueueItem) ts() float64 {
|
||||||
|
return i.r.limit
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *limitMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.limitIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ queueItem = &sharesMQueueItem{}
|
||||||
|
|
||||||
|
type sharesMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *sharesMQueueItem) ts() float64 {
|
||||||
|
return i.r.shares
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *sharesMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.sharesIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ queueItem = &readyMQueueItem{}
|
||||||
|
|
||||||
|
type readyMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *readyMQueueItem) ts() float64 {
|
||||||
|
return i.r.shares
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *readyMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.readyIdx = idx
|
||||||
|
}
|
ReleaseFunc
?Done