From 47559a8d1653578db72efccb77dc1b93ea0cecf6 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 27 Jan 2025 13:45:00 +0300 Subject: [PATCH] [#1] mclock: Refactor: split code between files Signed-off-by: Dmitrii Stepanov --- scheduling/clock.go | 81 ++++++++++++++++++++ scheduling/mclock.go | 176 ------------------------------------------- scheduling/queue.go | 100 ++++++++++++++++++++++++ 3 files changed, 181 insertions(+), 176 deletions(-) create mode 100644 scheduling/clock.go create mode 100644 scheduling/queue.go diff --git a/scheduling/clock.go b/scheduling/clock.go new file mode 100644 index 0000000..99e6075 --- /dev/null +++ b/scheduling/clock.go @@ -0,0 +1,81 @@ +package scheduling + +import ( + "sync" + "time" +) + +type clock interface { + now() float64 + runAt(ts float64, f func()) + close() +} + +type scheduleInfo struct { + ts float64 + f func() +} + +type systemClock struct { + since time.Time + schedule chan scheduleInfo + wg sync.WaitGroup +} + +func newSystemClock() *systemClock { + c := &systemClock{ + since: time.Now(), + schedule: make(chan scheduleInfo), + } + c.start() + return c +} + +func (c *systemClock) now() float64 { + return time.Since(c.since).Seconds() +} + +func (c *systemClock) runAt(ts float64, f func()) { + c.schedule <- scheduleInfo{ts: ts, f: f} +} + +func (c *systemClock) close() { + close(c.schedule) + c.wg.Wait() +} + +func (c *systemClock) start() { + c.wg.Add(1) + go func() { + defer c.wg.Done() + t := time.NewTimer(time.Hour) + var f func() + for { + select { + case <-t.C: + if f != nil { + f() + f = nil + } + case s, ok := <-c.schedule: + if !ok { + return + } + now := c.now() + if now >= s.ts { + s.f() + f = nil + continue + } + if !t.Stop() { + select { + case <-t.C: + default: + } + } + t.Reset(time.Duration((s.ts - now) * 1e9)) + f = s.f + } + } + }() +} diff --git a/scheduling/mclock.go b/scheduling/mclock.go index 5967d2d..4cd6ff1 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -6,7 +6,6 @@ import ( "errors" "math" "sync" - "time" ) const ( @@ -22,15 +21,6 @@ var ( ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero") ) -type queueItem interface { - ts() float64 - setIndex(idx int) -} - -type queue struct { - items []queueItem -} - type request struct { tag string ts float64 @@ -48,12 +38,6 @@ type request struct { canceled chan struct{} } -type clock interface { - now() float64 - runAt(ts float64, f func()) - close() -} - // ReleaseFunc is the type of function that should be called after the request is completed. type ReleaseFunc func() @@ -424,163 +408,3 @@ func assertIndexInvalid(r *request) { panic("readyIdx is not -1") } } - -// Len implements heap.Interface. -func (q *queue) Len() int { - return len(q.items) -} - -// Less implements heap.Interface. -func (q *queue) Less(i int, j int) bool { - return q.items[i].ts() < q.items[j].ts() -} - -// Pop implements heap.Interface. -func (q *queue) Pop() any { - n := len(q.items) - item := q.items[n-1] - q.items[n-1] = nil - q.items = q.items[0 : n-1] - item.setIndex(invalidIndex) - return item -} - -// Push implements heap.Interface. -func (q *queue) Push(x any) { - it := x.(queueItem) - it.setIndex(q.Len()) - q.items = append(q.items, it) -} - -// Swap implements heap.Interface. -func (q *queue) Swap(i int, j int) { - q.items[i], q.items[j] = q.items[j], q.items[i] - q.items[i].setIndex(i) - q.items[j].setIndex(j) -} - -var _ queueItem = &reservationMQueueItem{} - -type reservationMQueueItem struct { - r *request -} - -func (i *reservationMQueueItem) ts() float64 { - return i.r.reservation -} - -func (i *reservationMQueueItem) setIndex(idx int) { - i.r.reservationIdx = idx -} - -var _ queueItem = &limitMQueueItem{} - -type limitMQueueItem struct { - r *request -} - -func (i *limitMQueueItem) ts() float64 { - return i.r.limit -} - -func (i *limitMQueueItem) setIndex(idx int) { - i.r.limitIdx = idx -} - -var _ queueItem = &sharesMQueueItem{} - -type sharesMQueueItem struct { - r *request -} - -func (i *sharesMQueueItem) ts() float64 { - return i.r.shares -} - -func (i *sharesMQueueItem) setIndex(idx int) { - i.r.sharesIdx = idx -} - -var _ queueItem = &readyMQueueItem{} - -type readyMQueueItem struct { - r *request -} - -func (i *readyMQueueItem) ts() float64 { - return i.r.shares -} - -func (i *readyMQueueItem) setIndex(idx int) { - i.r.readyIdx = idx -} - -type scheduleInfo struct { - ts float64 - f func() -} - -type systemClock struct { - since time.Time - schedule chan scheduleInfo - wg sync.WaitGroup -} - -func newSystemClock() *systemClock { - c := &systemClock{ - since: time.Now(), - schedule: make(chan scheduleInfo), - } - c.start() - return c -} - -func (c *systemClock) now() float64 { - return time.Since(c.since).Seconds() -} - -func (c *systemClock) runAt(ts float64, f func()) { - c.schedule <- scheduleInfo{ts: ts, f: f} -} - -func (c *systemClock) close() { - close(c.schedule) - c.wg.Wait() -} - -func (c *systemClock) start() { - c.wg.Add(1) - go func() { - defer c.wg.Done() - t := time.NewTimer(time.Hour) - var f func() - for { - select { - case <-t.C: - if f != nil { - f() - f = nil - } - t.Reset(time.Hour) - case s, ok := <-c.schedule: - if !ok { - return - } - now := c.now() - if now >= s.ts { - s.f() - f = nil - continue - } - if !t.Stop() { - select { - case <-t.C: - default: - } - } - t.Reset(time.Duration((s.ts - now) * 1e9)) - f = s.f - } - } - }() -} diff --git a/scheduling/queue.go b/scheduling/queue.go new file mode 100644 index 0000000..12dd44a --- /dev/null +++ b/scheduling/queue.go @@ -0,0 +1,100 @@ +package scheduling + +type queueItem interface { + ts() float64 + setIndex(idx int) +} + +type queue struct { + items []queueItem +} + +// Len implements heap.Interface. +func (q *queue) Len() int { + return len(q.items) +} + +// Less implements heap.Interface. +func (q *queue) Less(i int, j int) bool { + return q.items[i].ts() < q.items[j].ts() +} + +// Pop implements heap.Interface. +func (q *queue) Pop() any { + n := len(q.items) + item := q.items[n-1] + q.items[n-1] = nil + q.items = q.items[0 : n-1] + item.setIndex(invalidIndex) + return item +} + +// Push implements heap.Interface. +func (q *queue) Push(x any) { + it := x.(queueItem) + it.setIndex(q.Len()) + q.items = append(q.items, it) +} + +// Swap implements heap.Interface. +func (q *queue) Swap(i int, j int) { + q.items[i], q.items[j] = q.items[j], q.items[i] + q.items[i].setIndex(i) + q.items[j].setIndex(j) +} + +var _ queueItem = &reservationMQueueItem{} + +type reservationMQueueItem struct { + r *request +} + +func (i *reservationMQueueItem) ts() float64 { + return i.r.reservation +} + +func (i *reservationMQueueItem) setIndex(idx int) { + i.r.reservationIdx = idx +} + +var _ queueItem = &limitMQueueItem{} + +type limitMQueueItem struct { + r *request +} + +func (i *limitMQueueItem) ts() float64 { + return i.r.limit +} + +func (i *limitMQueueItem) setIndex(idx int) { + i.r.limitIdx = idx +} + +var _ queueItem = &sharesMQueueItem{} + +type sharesMQueueItem struct { + r *request +} + +func (i *sharesMQueueItem) ts() float64 { + return i.r.shares +} + +func (i *sharesMQueueItem) setIndex(idx int) { + i.r.sharesIdx = idx +} + +var _ queueItem = &readyMQueueItem{} + +type readyMQueueItem struct { + r *request +} + +func (i *readyMQueueItem) ts() float64 { + return i.r.shares +} + +func (i *readyMQueueItem) setIndex(idx int) { + i.r.readyIdx = idx +}