[#1] mclock: Refactor: split code between files

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-01-27 13:45:00 +03:00
parent 54b4bf7cc1
commit 47559a8d16
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 181 additions and 176 deletions

81
scheduling/clock.go Normal file
View file

@ -0,0 +1,81 @@
package scheduling
import (
"sync"
"time"
)
type clock interface {
now() float64
runAt(ts float64, f func())
close()
}
type scheduleInfo struct {
ts float64
f func()
}
type systemClock struct {
since time.Time
schedule chan scheduleInfo
wg sync.WaitGroup
}
func newSystemClock() *systemClock {
c := &systemClock{
since: time.Now(),
schedule: make(chan scheduleInfo),
}
c.start()
return c
}
func (c *systemClock) now() float64 {
return time.Since(c.since).Seconds()
}
func (c *systemClock) runAt(ts float64, f func()) {
c.schedule <- scheduleInfo{ts: ts, f: f}
}
func (c *systemClock) close() {
close(c.schedule)
c.wg.Wait()
}
func (c *systemClock) start() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
t := time.NewTimer(time.Hour)
var f func()
for {
select {
case <-t.C:
if f != nil {
f()
f = nil
}
case s, ok := <-c.schedule:
if !ok {
return
}
now := c.now()
if now >= s.ts {
s.f()
f = nil
continue
}
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(time.Duration((s.ts - now) * 1e9))
f = s.f
}
}
}()
}

View file

@ -6,7 +6,6 @@ import (
"errors"
"math"
"sync"
"time"
)
const (
@ -22,15 +21,6 @@ var (
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
)
type queueItem interface {
ts() float64
setIndex(idx int)
}
type queue struct {
items []queueItem
}
type request struct {
tag string
ts float64
@ -48,12 +38,6 @@ type request struct {
canceled chan struct{}
}
type clock interface {
now() float64
runAt(ts float64, f func())
close()
}
// ReleaseFunc is the type of function that should be called after the request is completed.
type ReleaseFunc func()
@ -424,163 +408,3 @@ func assertIndexInvalid(r *request) {
panic("readyIdx is not -1")
}
}
// Len implements heap.Interface.
func (q *queue) Len() int {
return len(q.items)
}
// Less implements heap.Interface.
func (q *queue) Less(i int, j int) bool {
return q.items[i].ts() < q.items[j].ts()
}
// Pop implements heap.Interface.
func (q *queue) Pop() any {
n := len(q.items)
item := q.items[n-1]
q.items[n-1] = nil
q.items = q.items[0 : n-1]
item.setIndex(invalidIndex)
return item
}
// Push implements heap.Interface.
func (q *queue) Push(x any) {
it := x.(queueItem)
it.setIndex(q.Len())
q.items = append(q.items, it)
}
// Swap implements heap.Interface.
func (q *queue) Swap(i int, j int) {
q.items[i], q.items[j] = q.items[j], q.items[i]
q.items[i].setIndex(i)
q.items[j].setIndex(j)
}
var _ queueItem = &reservationMQueueItem{}
type reservationMQueueItem struct {
r *request
}
func (i *reservationMQueueItem) ts() float64 {
return i.r.reservation
}
func (i *reservationMQueueItem) setIndex(idx int) {
i.r.reservationIdx = idx
}
var _ queueItem = &limitMQueueItem{}
type limitMQueueItem struct {
r *request
}
func (i *limitMQueueItem) ts() float64 {
return i.r.limit
}
func (i *limitMQueueItem) setIndex(idx int) {
i.r.limitIdx = idx
}
var _ queueItem = &sharesMQueueItem{}
type sharesMQueueItem struct {
r *request
}
func (i *sharesMQueueItem) ts() float64 {
return i.r.shares
}
func (i *sharesMQueueItem) setIndex(idx int) {
i.r.sharesIdx = idx
}
var _ queueItem = &readyMQueueItem{}
type readyMQueueItem struct {
r *request
}
func (i *readyMQueueItem) ts() float64 {
return i.r.shares
}
func (i *readyMQueueItem) setIndex(idx int) {
i.r.readyIdx = idx
}
type scheduleInfo struct {
ts float64
f func()
}
type systemClock struct {
since time.Time
schedule chan scheduleInfo
wg sync.WaitGroup
}
func newSystemClock() *systemClock {
c := &systemClock{
since: time.Now(),
schedule: make(chan scheduleInfo),
}
c.start()
return c
}
func (c *systemClock) now() float64 {
return time.Since(c.since).Seconds()
}
func (c *systemClock) runAt(ts float64, f func()) {
c.schedule <- scheduleInfo{ts: ts, f: f}
}
func (c *systemClock) close() {
close(c.schedule)
c.wg.Wait()
}
func (c *systemClock) start() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
t := time.NewTimer(time.Hour)
var f func()
for {
select {
case <-t.C:
if f != nil {
f()
f = nil
}
t.Reset(time.Hour)
case s, ok := <-c.schedule:
if !ok {
return
}
now := c.now()
if now >= s.ts {
s.f()
f = nil
continue
}
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(time.Duration((s.ts - now) * 1e9))
f = s.f
}
}
}()
}

100
scheduling/queue.go Normal file
View file

@ -0,0 +1,100 @@
package scheduling
type queueItem interface {
ts() float64
setIndex(idx int)
}
type queue struct {
items []queueItem
}
// Len implements heap.Interface.
func (q *queue) Len() int {
return len(q.items)
}
// Less implements heap.Interface.
func (q *queue) Less(i int, j int) bool {
return q.items[i].ts() < q.items[j].ts()
}
// Pop implements heap.Interface.
func (q *queue) Pop() any {
n := len(q.items)
item := q.items[n-1]
q.items[n-1] = nil
q.items = q.items[0 : n-1]
item.setIndex(invalidIndex)
return item
}
// Push implements heap.Interface.
func (q *queue) Push(x any) {
it := x.(queueItem)
it.setIndex(q.Len())
q.items = append(q.items, it)
}
// Swap implements heap.Interface.
func (q *queue) Swap(i int, j int) {
q.items[i], q.items[j] = q.items[j], q.items[i]
q.items[i].setIndex(i)
q.items[j].setIndex(j)
}
var _ queueItem = &reservationMQueueItem{}
type reservationMQueueItem struct {
r *request
}
func (i *reservationMQueueItem) ts() float64 {
return i.r.reservation
}
func (i *reservationMQueueItem) setIndex(idx int) {
i.r.reservationIdx = idx
}
var _ queueItem = &limitMQueueItem{}
type limitMQueueItem struct {
r *request
}
func (i *limitMQueueItem) ts() float64 {
return i.r.limit
}
func (i *limitMQueueItem) setIndex(idx int) {
i.r.limitIdx = idx
}
var _ queueItem = &sharesMQueueItem{}
type sharesMQueueItem struct {
r *request
}
func (i *sharesMQueueItem) ts() float64 {
return i.r.shares
}
func (i *sharesMQueueItem) setIndex(idx int) {
i.r.sharesIdx = idx
}
var _ queueItem = &readyMQueueItem{}
type readyMQueueItem struct {
r *request
}
func (i *readyMQueueItem) ts() float64 {
return i.r.shares
}
func (i *readyMQueueItem) setIndex(idx int) {
i.r.readyIdx = idx
}