[#1] mclock: Refactor: split code between files
All checks were successful
DCO action / DCO (pull_request) Successful in 30s
Vulncheck / Vulncheck (pull_request) Successful in 30s
Tests and linters / Staticcheck (pull_request) Successful in 43s
Tests and linters / Run gofumpt (pull_request) Successful in 43s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m6s
Tests and linters / gopls check (pull_request) Successful in 1m4s
Tests and linters / Tests (pull_request) Successful in 35s
Tests and linters / Lint (pull_request) Successful in 1m22s
Tests and linters / Tests with -race (pull_request) Successful in 1m20s
All checks were successful
DCO action / DCO (pull_request) Successful in 30s
Vulncheck / Vulncheck (pull_request) Successful in 30s
Tests and linters / Staticcheck (pull_request) Successful in 43s
Tests and linters / Run gofumpt (pull_request) Successful in 43s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m6s
Tests and linters / gopls check (pull_request) Successful in 1m4s
Tests and linters / Tests (pull_request) Successful in 35s
Tests and linters / Lint (pull_request) Successful in 1m22s
Tests and linters / Tests with -race (pull_request) Successful in 1m20s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
903bcbd4bb
commit
b9abb25e2d
3 changed files with 182 additions and 176 deletions
82
scheduling/clock.go
Normal file
82
scheduling/clock.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
package scheduling
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type clock interface {
|
||||||
|
now() float64
|
||||||
|
runAt(ts float64, f func())
|
||||||
|
close()
|
||||||
|
}
|
||||||
|
|
||||||
|
type scheduleInfo struct {
|
||||||
|
ts float64
|
||||||
|
f func()
|
||||||
|
}
|
||||||
|
|
||||||
|
type systemClock struct {
|
||||||
|
since time.Time
|
||||||
|
schedule chan scheduleInfo
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSystemClock() *systemClock {
|
||||||
|
c := &systemClock{
|
||||||
|
since: time.Now(),
|
||||||
|
schedule: make(chan scheduleInfo),
|
||||||
|
}
|
||||||
|
c.start()
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) now() float64 {
|
||||||
|
return time.Since(c.since).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) runAt(ts float64, f func()) {
|
||||||
|
c.schedule <- scheduleInfo{ts: ts, f: f}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) close() {
|
||||||
|
close(c.schedule)
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) start() {
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
t := time.NewTimer(time.Hour)
|
||||||
|
var f func()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
if f != nil {
|
||||||
|
f()
|
||||||
|
f = nil
|
||||||
|
}
|
||||||
|
t.Reset(time.Hour)
|
||||||
|
case s, ok := <-c.schedule:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
now := c.now()
|
||||||
|
if now >= s.ts {
|
||||||
|
s.f()
|
||||||
|
f = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !t.Stop() {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Reset(time.Duration((s.ts - now) * 1e9))
|
||||||
|
f = s.f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -22,15 +21,6 @@ var (
|
||||||
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
|
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
|
||||||
)
|
)
|
||||||
|
|
||||||
type queueItem interface {
|
|
||||||
ts() float64
|
|
||||||
setIndex(idx int)
|
|
||||||
}
|
|
||||||
|
|
||||||
type queue struct {
|
|
||||||
items []queueItem
|
|
||||||
}
|
|
||||||
|
|
||||||
type request struct {
|
type request struct {
|
||||||
tag string
|
tag string
|
||||||
ts float64
|
ts float64
|
||||||
|
@ -48,12 +38,6 @@ type request struct {
|
||||||
canceled chan struct{}
|
canceled chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type clock interface {
|
|
||||||
now() float64
|
|
||||||
runAt(ts float64, f func())
|
|
||||||
close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReleaseFunc is the type of function that should be called after the request is completed.
|
// ReleaseFunc is the type of function that should be called after the request is completed.
|
||||||
type ReleaseFunc func()
|
type ReleaseFunc func()
|
||||||
|
|
||||||
|
@ -424,163 +408,3 @@ func assertIndexInvalid(r *request) {
|
||||||
panic("readyIdx is not -1")
|
panic("readyIdx is not -1")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Len implements heap.Interface.
|
|
||||||
func (q *queue) Len() int {
|
|
||||||
return len(q.items)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Less implements heap.Interface.
|
|
||||||
func (q *queue) Less(i int, j int) bool {
|
|
||||||
return q.items[i].ts() < q.items[j].ts()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pop implements heap.Interface.
|
|
||||||
func (q *queue) Pop() any {
|
|
||||||
n := len(q.items)
|
|
||||||
item := q.items[n-1]
|
|
||||||
q.items[n-1] = nil
|
|
||||||
q.items = q.items[0 : n-1]
|
|
||||||
item.setIndex(invalidIndex)
|
|
||||||
return item
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push implements heap.Interface.
|
|
||||||
func (q *queue) Push(x any) {
|
|
||||||
it := x.(queueItem)
|
|
||||||
it.setIndex(q.Len())
|
|
||||||
q.items = append(q.items, it)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Swap implements heap.Interface.
|
|
||||||
func (q *queue) Swap(i int, j int) {
|
|
||||||
q.items[i], q.items[j] = q.items[j], q.items[i]
|
|
||||||
q.items[i].setIndex(i)
|
|
||||||
q.items[j].setIndex(j)
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ queueItem = &reservationMQueueItem{}
|
|
||||||
|
|
||||||
type reservationMQueueItem struct {
|
|
||||||
r *request
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *reservationMQueueItem) ts() float64 {
|
|
||||||
return i.r.reservation
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *reservationMQueueItem) setIndex(idx int) {
|
|
||||||
i.r.reservationIdx = idx
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ queueItem = &limitMQueueItem{}
|
|
||||||
|
|
||||||
type limitMQueueItem struct {
|
|
||||||
r *request
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *limitMQueueItem) ts() float64 {
|
|
||||||
return i.r.limit
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *limitMQueueItem) setIndex(idx int) {
|
|
||||||
i.r.limitIdx = idx
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ queueItem = &sharesMQueueItem{}
|
|
||||||
|
|
||||||
type sharesMQueueItem struct {
|
|
||||||
r *request
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *sharesMQueueItem) ts() float64 {
|
|
||||||
return i.r.shares
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *sharesMQueueItem) setIndex(idx int) {
|
|
||||||
i.r.sharesIdx = idx
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ queueItem = &readyMQueueItem{}
|
|
||||||
|
|
||||||
type readyMQueueItem struct {
|
|
||||||
r *request
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *readyMQueueItem) ts() float64 {
|
|
||||||
return i.r.shares
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *readyMQueueItem) setIndex(idx int) {
|
|
||||||
i.r.readyIdx = idx
|
|
||||||
}
|
|
||||||
|
|
||||||
type scheduleInfo struct {
|
|
||||||
ts float64
|
|
||||||
f func()
|
|
||||||
}
|
|
||||||
|
|
||||||
type systemClock struct {
|
|
||||||
since time.Time
|
|
||||||
schedule chan scheduleInfo
|
|
||||||
wg sync.WaitGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
func newSystemClock() *systemClock {
|
|
||||||
c := &systemClock{
|
|
||||||
since: time.Now(),
|
|
||||||
schedule: make(chan scheduleInfo),
|
|
||||||
}
|
|
||||||
c.start()
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *systemClock) now() float64 {
|
|
||||||
return time.Since(c.since).Seconds()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *systemClock) runAt(ts float64, f func()) {
|
|
||||||
c.schedule <- scheduleInfo{ts: ts, f: f}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *systemClock) close() {
|
|
||||||
close(c.schedule)
|
|
||||||
c.wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *systemClock) start() {
|
|
||||||
c.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer c.wg.Done()
|
|
||||||
t := time.NewTimer(time.Hour)
|
|
||||||
var f func()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-t.C:
|
|
||||||
if f != nil {
|
|
||||||
f()
|
|
||||||
f = nil
|
|
||||||
}
|
|
||||||
t.Reset(time.Hour)
|
|
||||||
case s, ok := <-c.schedule:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
now := c.now()
|
|
||||||
if now >= s.ts {
|
|
||||||
s.f()
|
|
||||||
f = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !t.Stop() {
|
|
||||||
select {
|
|
||||||
case <-t.C:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.Reset(time.Duration((s.ts - now) * 1e9))
|
|
||||||
f = s.f
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
100
scheduling/queue.go
Normal file
100
scheduling/queue.go
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
package scheduling
|
||||||
|
|
||||||
|
type queueItem interface {
|
||||||
|
ts() float64
|
||||||
|
setIndex(idx int)
|
||||||
|
}
|
||||||
|
|
||||||
|
type queue struct {
|
||||||
|
items []queueItem
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len implements heap.Interface.
|
||||||
|
func (q *queue) Len() int {
|
||||||
|
return len(q.items)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Less implements heap.Interface.
|
||||||
|
func (q *queue) Less(i int, j int) bool {
|
||||||
|
return q.items[i].ts() < q.items[j].ts()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pop implements heap.Interface.
|
||||||
|
func (q *queue) Pop() any {
|
||||||
|
n := len(q.items)
|
||||||
|
item := q.items[n-1]
|
||||||
|
q.items[n-1] = nil
|
||||||
|
q.items = q.items[0 : n-1]
|
||||||
|
item.setIndex(invalidIndex)
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push implements heap.Interface.
|
||||||
|
func (q *queue) Push(x any) {
|
||||||
|
it := x.(queueItem)
|
||||||
|
it.setIndex(q.Len())
|
||||||
|
q.items = append(q.items, it)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap implements heap.Interface.
|
||||||
|
func (q *queue) Swap(i int, j int) {
|
||||||
|
q.items[i], q.items[j] = q.items[j], q.items[i]
|
||||||
|
q.items[i].setIndex(i)
|
||||||
|
q.items[j].setIndex(j)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ queueItem = &reservationMQueueItem{}
|
||||||
|
|
||||||
|
type reservationMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *reservationMQueueItem) ts() float64 {
|
||||||
|
return i.r.reservation
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *reservationMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.reservationIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ queueItem = &limitMQueueItem{}
|
||||||
|
|
||||||
|
type limitMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *limitMQueueItem) ts() float64 {
|
||||||
|
return i.r.limit
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *limitMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.limitIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ queueItem = &sharesMQueueItem{}
|
||||||
|
|
||||||
|
type sharesMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *sharesMQueueItem) ts() float64 {
|
||||||
|
return i.r.shares
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *sharesMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.sharesIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ queueItem = &readyMQueueItem{}
|
||||||
|
|
||||||
|
type readyMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *readyMQueueItem) ts() float64 {
|
||||||
|
return i.r.shares
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *readyMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.readyIdx = idx
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue