forked from TrueCloudLab/frostfs-qos
Let's assume that there are two requests in the queue with execution time t1 and t2. The timer is set to t1. The timer is triggered, schedules the t1 request, calculates the time for the next timer t2 to be triggered. But it doesn't schedules timer to this time because of the `q.timeBasedScheduleTs > nextTs` check. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
95 lines
1.5 KiB
Go
95 lines
1.5 KiB
Go
package scheduling
|
|
|
|
import (
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type clock interface {
|
|
now() float64
|
|
runAt(ts float64, f func())
|
|
close()
|
|
}
|
|
|
|
type scheduleInfo struct {
|
|
ts float64
|
|
f func()
|
|
}
|
|
|
|
type systemClock struct {
|
|
since time.Time
|
|
schedule chan scheduleInfo
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func newSystemClock() *systemClock {
|
|
c := &systemClock{
|
|
since: time.Now(),
|
|
schedule: make(chan scheduleInfo),
|
|
}
|
|
c.start()
|
|
return c
|
|
}
|
|
|
|
func (c *systemClock) now() float64 {
|
|
return time.Since(c.since).Seconds()
|
|
}
|
|
|
|
func (c *systemClock) runAt(ts float64, f func()) {
|
|
c.schedule <- scheduleInfo{ts: ts, f: f}
|
|
}
|
|
|
|
func (c *systemClock) close() {
|
|
close(c.schedule)
|
|
c.wg.Wait()
|
|
}
|
|
|
|
func (c *systemClock) start() {
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
t := time.NewTimer(0)
|
|
<-t.C
|
|
currentTs := math.MaxFloat64
|
|
var currentTask func()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
if currentTask != nil {
|
|
c.wg.Add(1)
|
|
f := currentTask
|
|
go func() {
|
|
defer c.wg.Done()
|
|
f()
|
|
}()
|
|
currentTask = nil
|
|
}
|
|
currentTs = math.MaxFloat64
|
|
case s, ok := <-c.schedule:
|
|
if !ok {
|
|
return
|
|
}
|
|
if s.ts >= currentTs {
|
|
// current timer will fire earlier
|
|
// so next scheduleRequest will push new schedule event
|
|
continue
|
|
}
|
|
var d time.Duration
|
|
now := c.now()
|
|
if now < s.ts {
|
|
d = time.Duration((s.ts - now) * 1e9)
|
|
}
|
|
if !t.Stop() {
|
|
select {
|
|
case <-t.C:
|
|
default:
|
|
}
|
|
}
|
|
t.Reset(d)
|
|
currentTask = s.f
|
|
currentTs = s.ts
|
|
}
|
|
}
|
|
}()
|
|
}
|