package scheduling import ( "math" "sync" "time" ) type clock interface { now() float64 runAt(ts float64, f func()) close() } type scheduleInfo struct { ts float64 f func() } type systemClock struct { since time.Time schedule chan scheduleInfo wg sync.WaitGroup } func newSystemClock() *systemClock { c := &systemClock{ since: time.Now(), schedule: make(chan scheduleInfo), } c.start() return c } func (c *systemClock) now() float64 { return time.Since(c.since).Seconds() } func (c *systemClock) runAt(ts float64, f func()) { c.schedule <- scheduleInfo{ts: ts, f: f} } func (c *systemClock) close() { close(c.schedule) c.wg.Wait() } func (c *systemClock) start() { c.wg.Add(1) go func() { defer c.wg.Done() t := time.NewTimer(0) <-t.C currentTs := math.MaxFloat64 var currentTask func() for { select { case <-t.C: if currentTask != nil { c.wg.Add(1) f := currentTask go func() { defer c.wg.Done() f() }() currentTask = nil } currentTs = math.MaxFloat64 case s, ok := <-c.schedule: if !ok { return } if s.ts >= currentTs { // current timer will fire earlier // so next scheduleRequest will push new schedule event continue } var d time.Duration now := c.now() if now < s.ts { d = time.Duration((s.ts - now) * 1e9) } if !t.Stop() { select { case <-t.C: default: } } t.Reset(d) currentTask = s.f currentTs = s.ts } } }() }