[#12] mclock: Fix timer-based scheduling
All checks were successful
DCO action / DCO (pull_request) Successful in 34s
Tests and linters / Run gofumpt (pull_request) Successful in 33s
Vulncheck / Vulncheck (pull_request) Successful in 37s
Tests and linters / Tests (pull_request) Successful in 50s
Tests and linters / Tests with -race (pull_request) Successful in 50s
Tests and linters / Staticcheck (pull_request) Successful in 1m6s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m18s
Tests and linters / Lint (pull_request) Successful in 1m32s
Tests and linters / gopls check (pull_request) Successful in 1m38s
All checks were successful
DCO action / DCO (pull_request) Successful in 34s
Tests and linters / Run gofumpt (pull_request) Successful in 33s
Vulncheck / Vulncheck (pull_request) Successful in 37s
Tests and linters / Tests (pull_request) Successful in 50s
Tests and linters / Tests with -race (pull_request) Successful in 50s
Tests and linters / Staticcheck (pull_request) Successful in 1m6s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m18s
Tests and linters / Lint (pull_request) Successful in 1m32s
Tests and linters / gopls check (pull_request) Successful in 1m38s
Let's assume that there are two requests in the queue with execution time t1 and t2. The timer is set to t1. The timer is triggered, schedules the t1 request, calculates the time for the next timer t2 to be triggered. But it doesn't schedules timer to this time because of the `q.timeBasedScheduleTs > nextTs` check. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
3e7ca94035
commit
f546741c3a
3 changed files with 48 additions and 25 deletions
|
@ -1,6 +1,7 @@
|
|||
package scheduling
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -25,7 +26,7 @@ type systemClock struct {
|
|||
func newSystemClock() *systemClock {
|
||||
c := &systemClock{
|
||||
since: time.Now(),
|
||||
schedule: make(chan scheduleInfo),
|
||||
schedule: make(chan scheduleInfo, 1),
|
||||
}
|
||||
c.start()
|
||||
return c
|
||||
|
@ -36,10 +37,7 @@ func (c *systemClock) now() float64 {
|
|||
}
|
||||
|
||||
func (c *systemClock) runAt(ts float64, f func()) {
|
||||
select {
|
||||
case c.schedule <- scheduleInfo{ts: ts, f: f}:
|
||||
default: // timer fired, scheduleRequest will call runAt again
|
||||
}
|
||||
c.schedule <- scheduleInfo{ts: ts, f: f}
|
||||
}
|
||||
|
||||
func (c *systemClock) close() {
|
||||
|
@ -53,6 +51,7 @@ func (c *systemClock) start() {
|
|||
defer c.wg.Done()
|
||||
t := time.NewTimer(0)
|
||||
<-t.C
|
||||
currentTs := math.MaxFloat64
|
||||
var f func()
|
||||
for {
|
||||
select {
|
||||
|
@ -61,10 +60,16 @@ func (c *systemClock) start() {
|
|||
f()
|
||||
f = nil
|
||||
}
|
||||
currentTs = math.MaxFloat64
|
||||
case s, ok := <-c.schedule:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if s.ts >= currentTs {
|
||||
// current timer will fire earlier
|
||||
// so next scheduleRequest will push new schedule event
|
||||
continue
|
||||
}
|
||||
var d time.Duration
|
||||
now := c.now()
|
||||
if now < s.ts {
|
||||
|
@ -78,6 +83,7 @@ func (c *systemClock) start() {
|
|||
}
|
||||
t.Reset(d)
|
||||
f = s.f
|
||||
currentTs = s.ts
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -61,15 +61,14 @@ type MClock struct {
|
|||
idleTimeout float64
|
||||
tagInfo map[string]TagInfo
|
||||
|
||||
mtx sync.Mutex
|
||||
previous map[string]*request
|
||||
inProgress uint64
|
||||
timeBasedScheduleTs float64
|
||||
reservationQueue *queue
|
||||
limitQueue *queue
|
||||
sharesQueue *queue
|
||||
readyQueue *queue
|
||||
closed bool
|
||||
mtx sync.Mutex
|
||||
previous map[string]*request
|
||||
inProgress uint64
|
||||
reservationQueue *queue
|
||||
limitQueue *queue
|
||||
sharesQueue *queue
|
||||
readyQueue *queue
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewMClock creates new MClock scheduler instance with
|
||||
|
@ -92,11 +91,10 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo
|
|||
idleTimeout: idleTimeout.Seconds(),
|
||||
tagInfo: tagInfo,
|
||||
|
||||
reservationQueue: &queue{},
|
||||
limitQueue: &queue{},
|
||||
sharesQueue: &queue{},
|
||||
readyQueue: &queue{},
|
||||
timeBasedScheduleTs: math.MaxFloat64,
|
||||
reservationQueue: &queue{},
|
||||
limitQueue: &queue{},
|
||||
sharesQueue: &queue{},
|
||||
readyQueue: &queue{},
|
||||
}
|
||||
|
||||
previous := make(map[string]*request)
|
||||
|
@ -294,13 +292,10 @@ func (q *MClock) setNextScheduleTimer(now float64) {
|
|||
// should not happen as we always compare .ts() <= now
|
||||
return
|
||||
}
|
||||
|
||||
if q.timeBasedScheduleTs > nextTs {
|
||||
q.clock.runAt(nextTs, func() {
|
||||
q.scheduleRequest()
|
||||
})
|
||||
q.timeBasedScheduleTs = nextTs
|
||||
if nextTs == math.MaxFloat64 {
|
||||
return
|
||||
}
|
||||
q.clock.runAt(nextTs, q.scheduleRequest)
|
||||
}
|
||||
|
||||
func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
||||
|
|
|
@ -493,3 +493,25 @@ func TestMClockTimeBasedSchedule(t *testing.T) {
|
|||
close(checked)
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
||||
func TestMClockLowLimit(t *testing.T) {
|
||||
t.Parallel()
|
||||
limit := 2.0
|
||||
q, err := NewMClock(100, 100, map[string]TagInfo{
|
||||
"class1": {Share: 50, LimitIOPS: &limit},
|
||||
}, 5*time.Second)
|
||||
require.NoError(t, err)
|
||||
defer q.Close()
|
||||
|
||||
eg, ctx := errgroup.WithContext(context.Background())
|
||||
eg.SetLimit(5)
|
||||
eg.Go(func() error {
|
||||
for range 3 {
|
||||
release, err := q.RequestArrival(ctx, "class1")
|
||||
require.NoError(t, err)
|
||||
release()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue