frostfs-qos/scheduling/clock.go
Dmitrii Stepanov 346752477b
All checks were successful
DCO action / DCO (pull_request) Successful in 21s
Vulncheck / Vulncheck (pull_request) Successful in 34s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m13s
Tests and linters / Tests with -race (pull_request) Successful in 1m14s
Tests and linters / Run gofumpt (pull_request) Successful in 1m13s
Tests and linters / gopls check (pull_request) Successful in 1m17s
Tests and linters / Lint (pull_request) Successful in 1m22s
Tests and linters / Staticcheck (pull_request) Successful in 1m18s
Tests and linters / Tests (pull_request) Successful in 1m27s
[#12] mclock: Fix timer-based scheduling
Let's assume that there are two requests in the queue with execution time t1 and t2.
The timer is set to t1. The timer is triggered, schedules the t1 request,
calculates the time for the next timer t2 to be triggered.
But it doesn't schedules timer to this time because of the
`q.timeBasedScheduleTs > nextTs` check.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-19 17:12:00 +03:00

95 lines
1.5 KiB
Go

package scheduling
import (
"math"
"sync"
"time"
)
type clock interface {
now() float64
runAt(ts float64, f func())
close()
}
type scheduleInfo struct {
ts float64
f func()
}
type systemClock struct {
since time.Time
schedule chan scheduleInfo
wg sync.WaitGroup
}
func newSystemClock() *systemClock {
c := &systemClock{
since: time.Now(),
schedule: make(chan scheduleInfo),
}
c.start()
return c
}
func (c *systemClock) now() float64 {
return time.Since(c.since).Seconds()
}
func (c *systemClock) runAt(ts float64, f func()) {
c.schedule <- scheduleInfo{ts: ts, f: f}
}
func (c *systemClock) close() {
close(c.schedule)
c.wg.Wait()
}
func (c *systemClock) start() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
t := time.NewTimer(0)
<-t.C
currentTs := math.MaxFloat64
var currentTask func()
for {
select {
case <-t.C:
if currentTask != nil {
c.wg.Add(1)
f := currentTask
go func() {
defer c.wg.Done()
f()
}()
currentTask = nil
}
currentTs = math.MaxFloat64
case s, ok := <-c.schedule:
if !ok {
return
}
if s.ts >= currentTs {
// current timer will fire earlier
// so next scheduleRequest will push new schedule event
continue
}
var d time.Duration
now := c.now()
if now < s.ts {
d = time.Duration((s.ts - now) * 1e9)
}
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(d)
currentTask = s.f
currentTs = s.ts
}
}
}()
}