Compare commits
17 commits
master
...
feat/prior
Author | SHA1 | Date | |
---|---|---|---|
33cefac665 | |||
7d709b4228 | |||
ee1598e111 | |||
0cccf4751b | |||
c3aaa19cd1 | |||
1ffd4cb1ca | |||
df2dd8f721 | |||
e70c195c17 | |||
a53eb16a5a | |||
d221caf3e8 | |||
6de26ae851 | |||
09de5d70cc | |||
2f66f30e25 | |||
9eda612274 | |||
7256a12ff6 | |||
4e6c58456d | |||
6eccfdddf1 |
7 changed files with 1335 additions and 0 deletions
172
pkg/core/quota/bench.results
Normal file
172
pkg/core/quota/bench.results
Normal file
|
@ -0,0 +1,172 @@
|
||||||
|
Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -tags integration -bench ^BenchmarkMClock$ git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/quota -count=1
|
||||||
|
|
||||||
|
goos: linux
|
||||||
|
goarch: amd64
|
||||||
|
pkg: git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/quota
|
||||||
|
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
|
||||||
|
BenchmarkMClock/noop,_1_parallelism-8 8622 138852 ns/op 0 B/op 0 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_1_parallelism,_1_tags-8 7750 143396 ns/op 381 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_1_parallelism,_2_tags-8 7890 140165 ns/op 381 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_1_parallelism,_4_tags-8 8481 142988 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_1_parallelism,_8_tags-8 7778 143392 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_1_parallelism,_16_tags-8 8174 142753 ns/op 381 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_1_parallelism,_1_tags-8 8428 141928 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_1_parallelism,_2_tags-8 8401 142478 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_1_parallelism,_4_tags-8 7838 143398 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_1_parallelism,_8_tags-8 8358 143362 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_1_parallelism,_16_tags-8 7646 143140 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_1_parallelism,_1_tags-8 7792 142760 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_1_parallelism,_2_tags-8 8359 144336 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_1_parallelism,_4_tags-8 8433 142473 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_1_parallelism,_8_tags-8 8074 141951 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_1_parallelism,_16_tags-8 7893 143826 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_1_parallelism,_1_tags-8 8542 143451 ns/op 387 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_1_parallelism,_2_tags-8 8462 143230 ns/op 386 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_1_parallelism,_4_tags-8 7852 140847 ns/op 384 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_1_parallelism,_8_tags-8 8299 142257 ns/op 383 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_1_parallelism,_16_tags-8 8274 143428 ns/op 383 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_1_parallelism,_1_tags-8 5896 182277 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_1_parallelism,_2_tags-8 6312 164867 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_1_parallelism,_4_tags-8 7076 153351 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_1_parallelism,_8_tags-8 7918 147351 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_1_parallelism,_16_tags-8 7756 146122 ns/op 381 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_1_parallelism,_1_tags-8 6823 182441 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_1_parallelism,_2_tags-8 6462 163464 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_1_parallelism,_4_tags-8 7828 152245 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_1_parallelism,_8_tags-8 7338 147831 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_1_parallelism,_16_tags-8 7875 145634 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_1_parallelism,_1_tags-8 6800 178145 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_1_parallelism,_2_tags-8 6358 163197 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_1_parallelism,_4_tags-8 7752 153441 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_1_parallelism,_8_tags-8 7791 148652 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_1_parallelism,_16_tags-8 7851 144684 ns/op 390 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_1_parallelism,_1_tags-8 5810 182836 ns/op 384 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_1_parallelism,_2_tags-8 6638 160513 ns/op 384 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_1_parallelism,_4_tags-8 7846 152878 ns/op 383 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_1_parallelism,_8_tags-8 7684 147061 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_1_parallelism,_16_tags-8 7780 145393 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/noop,_8_parallelism-8 8485 139644 ns/op 0 B/op 0 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_8_parallelism,_1_tags-8 8170 142515 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_8_parallelism,_2_tags-8 7875 142867 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_8_parallelism,_4_tags-8 7293 142487 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_8_parallelism,_8_tags-8 8222 142765 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_8_parallelism,_16_tags-8 8317 142517 ns/op 381 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_8_parallelism,_1_tags-8 7639 142420 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_8_parallelism,_2_tags-8 7912 143456 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_8_parallelism,_4_tags-8 7863 143354 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_8_parallelism,_8_tags-8 7605 143204 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_8_parallelism,_16_tags-8 7837 142883 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_8_parallelism,_1_tags-8 8346 143170 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_8_parallelism,_2_tags-8 8270 142266 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_8_parallelism,_4_tags-8 8330 143951 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_8_parallelism,_8_tags-8 8420 143760 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_8_parallelism,_16_tags-8 7776 142903 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_8_parallelism,_1_tags-8 8458 143242 ns/op 387 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_8_parallelism,_2_tags-8 7788 143594 ns/op 385 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_8_parallelism,_4_tags-8 8486 143527 ns/op 384 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_8_parallelism,_8_tags-8 8178 142912 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_8_parallelism,_16_tags-8 7953 143021 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_8_parallelism,_1_tags-8 6120 182367 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_8_parallelism,_2_tags-8 6740 163498 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_8_parallelism,_4_tags-8 7536 156347 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_8_parallelism,_8_tags-8 7845 149975 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_8_parallelism,_16_tags-8 7728 145382 ns/op 381 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_8_parallelism,_1_tags-8 6620 176141 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_8_parallelism,_2_tags-8 7160 168321 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_8_parallelism,_4_tags-8 6784 154220 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_8_parallelism,_8_tags-8 7641 147644 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_8_parallelism,_16_tags-8 7839 146256 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_8_parallelism,_1_tags-8 6825 180484 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_8_parallelism,_2_tags-8 6726 160547 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_8_parallelism,_4_tags-8 7887 158786 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_8_parallelism,_8_tags-8 7756 148823 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_8_parallelism,_16_tags-8 8456 145816 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_8_parallelism,_1_tags-8 5749 183312 ns/op 384 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_8_parallelism,_2_tags-8 7070 163066 ns/op 384 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_8_parallelism,_4_tags-8 7608 151387 ns/op 383 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_8_parallelism,_8_tags-8 7801 148704 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_8_parallelism,_16_tags-8 8394 145528 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/noop,_32_parallelism-8 8449 140882 ns/op 0 B/op 0 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_32_parallelism,_1_tags-8 8341 144258 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_32_parallelism,_2_tags-8 8190 144026 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_32_parallelism,_4_tags-8 8432 142862 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_32_parallelism,_8_tags-8 8221 142749 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_32_parallelism,_16_tags-8 8260 142720 ns/op 381 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_32_parallelism,_1_tags-8 8449 143299 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_32_parallelism,_2_tags-8 8289 142904 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_32_parallelism,_4_tags-8 7720 143325 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_32_parallelism,_8_tags-8 7814 143928 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_32_parallelism,_16_tags-8 8314 142920 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_32_parallelism,_1_tags-8 7831 142008 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_32_parallelism,_2_tags-8 7707 143310 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_32_parallelism,_4_tags-8 8436 144178 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_32_parallelism,_8_tags-8 8422 143256 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_32_parallelism,_16_tags-8 7912 142875 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_32_parallelism,_1_tags-8 7935 142824 ns/op 387 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_32_parallelism,_2_tags-8 7815 143701 ns/op 385 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_32_parallelism,_4_tags-8 7846 143185 ns/op 384 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_32_parallelism,_8_tags-8 7292 143897 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_32_parallelism,_16_tags-8 7800 143270 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_32_parallelism,_1_tags-8 6714 178822 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_32_parallelism,_2_tags-8 7051 161121 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_32_parallelism,_4_tags-8 7857 152451 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_32_parallelism,_8_tags-8 7890 147844 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_32_parallelism,_16_tags-8 7592 145755 ns/op 381 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_32_parallelism,_1_tags-8 6048 177194 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_32_parallelism,_2_tags-8 6858 161059 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_32_parallelism,_4_tags-8 7696 154697 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_32_parallelism,_8_tags-8 7346 148836 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_32_parallelism,_16_tags-8 8379 145981 ns/op 390 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_32_parallelism,_1_tags-8 6462 183121 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_32_parallelism,_2_tags-8 7252 159800 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_32_parallelism,_4_tags-8 7234 152839 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_32_parallelism,_8_tags-8 7822 147884 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_32_parallelism,_16_tags-8 7759 144517 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_32_parallelism,_1_tags-8 6594 175730 ns/op 385 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_32_parallelism,_2_tags-8 7651 160258 ns/op 384 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_32_parallelism,_4_tags-8 7653 151893 ns/op 383 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_32_parallelism,_8_tags-8 7148 146893 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_32_parallelism,_16_tags-8 8280 144532 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/noop,_64_parallelism-8 7826 138650 ns/op 0 B/op 0 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_64_parallelism,_1_tags-8 7788 143331 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_64_parallelism,_2_tags-8 8160 143617 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_64_parallelism,_4_tags-8 8360 143654 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_64_parallelism,_8_tags-8 8473 143208 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_no_reservation,_64_parallelism,_16_tags-8 8170 142293 ns/op 381 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_64_parallelism,_1_tags-8 7874 143752 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_64_parallelism,_2_tags-8 8271 142250 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_64_parallelism,_4_tags-8 8403 143406 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_64_parallelism,_8_tags-8 7767 143181 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_1.0_reservation,_64_parallelism,_16_tags-8 8377 143528 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_64_parallelism,_1_tags-8 8172 142732 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_64_parallelism,_2_tags-8 8367 143579 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_64_parallelism,_4_tags-8 7752 143488 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_64_parallelism,_8_tags-8 7873 143070 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_100.0_reservation,_64_parallelism,_16_tags-8 7852 143083 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_64_parallelism,_1_tags-8 8299 142874 ns/op 387 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_64_parallelism,_2_tags-8 7737 141432 ns/op 385 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_64_parallelism,_4_tags-8 7647 143605 ns/op 384 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_64_parallelism,_8_tags-8 8356 142787 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_no_limit,_10000.0_reservation,_64_parallelism,_16_tags-8 8290 141838 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_64_parallelism,_1_tags-8 5763 181437 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_64_parallelism,_2_tags-8 6380 162726 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_64_parallelism,_4_tags-8 7178 161958 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_64_parallelism,_8_tags-8 7790 154252 ns/op 380 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_no_reservation,_64_parallelism,_16_tags-8 7638 150472 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_64_parallelism,_1_tags-8 5731 184777 ns/op 389 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_64_parallelism,_2_tags-8 7092 165548 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_64_parallelism,_4_tags-8 7207 156112 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_64_parallelism,_8_tags-8 6919 151170 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_1.0_reservation,_64_parallelism,_16_tags-8 8192 146798 ns/op 390 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_64_parallelism,_1_tags-8 6075 184531 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_64_parallelism,_2_tags-8 6435 164215 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_64_parallelism,_4_tags-8 7815 152395 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_64_parallelism,_8_tags-8 7689 149567 ns/op 388 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_100.0_reservation,_64_parallelism,_16_tags-8 7846 147134 ns/op 390 B/op 9 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_64_parallelism,_1_tags-8 5758 190913 ns/op 384 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_64_parallelism,_2_tags-8 6462 172379 ns/op 383 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_64_parallelism,_4_tags-8 6819 156184 ns/op 383 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_64_parallelism,_8_tags-8 7365 152747 ns/op 382 B/op 8 allocs/op
|
||||||
|
BenchmarkMClock/mclock,_100000.0_limit,_10000.0_reservation,_64_parallelism,_16_tags-8 8094 147905 ns/op 382 B/op 8 allocs/op
|
||||||
|
PASS
|
||||||
|
ok git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/quota 198.209s
|
87
pkg/core/quota/bench_mclock_test.go
Normal file
87
pkg/core/quota/bench_mclock_test.go
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
package quota
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"math/rand/v2"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type noopMClockScheduler struct{}
|
||||||
|
|
||||||
|
var (
|
||||||
|
releaseStub func() = func() {}
|
||||||
|
defaultLimit float64 = 100_000
|
||||||
|
shortReservation float64 = 1
|
||||||
|
medReservation float64 = 100
|
||||||
|
largeReservation float64 = 100_00
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *noopMClockScheduler) RequestArrival(context.Context, string) func() {
|
||||||
|
return releaseStub
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkMClock(b *testing.B) {
|
||||||
|
tagsCount := []int{1, 2, 4, 8, 16}
|
||||||
|
ioDuration := time.Millisecond
|
||||||
|
parallelismValues := []int{1, 8, 32, 64}
|
||||||
|
limits := []*float64{nil, &defaultLimit}
|
||||||
|
reservations := []*float64{nil, &shortReservation, &medReservation, &largeReservation}
|
||||||
|
for _, parallelism := range parallelismValues {
|
||||||
|
b.SetParallelism(parallelism)
|
||||||
|
|
||||||
|
noopMClock := &noopMClockScheduler{}
|
||||||
|
b.Run(fmt.Sprintf("noop, %d parallelism", parallelism), func(b *testing.B) {
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
release := noopMClock.RequestArrival(context.Background(), "tag")
|
||||||
|
time.Sleep(ioDuration)
|
||||||
|
release()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, limit := range limits {
|
||||||
|
for _, reservation := range reservations {
|
||||||
|
for _, tags := range tagsCount {
|
||||||
|
tagInfos := make(map[string]TagInfo)
|
||||||
|
for tag := 0; tag < tags; tag++ {
|
||||||
|
tagInfos["tag"+strconv.FormatInt(int64(tag), 10)] = TagInfo{shares: 50, limit: limit, reservation: reservation}
|
||||||
|
}
|
||||||
|
|
||||||
|
mClockQ := NewMClockQueue(math.MaxUint64, math.MaxUint64, tagInfos, math.MaxFloat64)
|
||||||
|
|
||||||
|
resStr := "no"
|
||||||
|
if reservation != nil {
|
||||||
|
resStr = strconv.FormatFloat(*reservation, 'f', 1, 64)
|
||||||
|
}
|
||||||
|
limitStr := "no"
|
||||||
|
if limit != nil {
|
||||||
|
limitStr = strconv.FormatFloat(*limit, 'f', 1, 64)
|
||||||
|
}
|
||||||
|
b.Run(fmt.Sprintf("mclock, %s limit, %s reservation, %d parallelism, %d tags", limitStr, resStr, parallelism, tags), func(b *testing.B) {
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
tag := rand.Int64N(int64(tags))
|
||||||
|
release, err := mClockQ.RequestArrival(context.Background(), "tag"+strconv.FormatInt(int64(tag), 10))
|
||||||
|
require.NoError(b, err)
|
||||||
|
time.Sleep(ioDuration)
|
||||||
|
release()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
21
pkg/core/quota/context.go
Normal file
21
pkg/core/quota/context.go
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
package quota
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type tagContextKeyType int
|
||||||
|
|
||||||
|
const currentTagKey tagContextKeyType = iota
|
||||||
|
|
||||||
|
func ContextWithIOTag(parent context.Context, tag string) context.Context {
|
||||||
|
return context.WithValue(parent, currentTagKey, tag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func IOTagFromContext(ctx context.Context) (string, bool) {
|
||||||
|
if ctx == nil {
|
||||||
|
panic("context must be non nil")
|
||||||
|
}
|
||||||
|
if tag, ok := ctx.Value(currentTagKey).(string); ok {
|
||||||
|
return tag, true
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
23
pkg/core/quota/context_test.go
Normal file
23
pkg/core/quota/context_test.go
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
package quota
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestContext(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
tag, ok := IOTagFromContext(ctx)
|
||||||
|
require.False(t, ok)
|
||||||
|
require.Equal(t, "", tag)
|
||||||
|
ctx = ContextWithIOTag(ctx, "tag1")
|
||||||
|
tag, ok = IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "tag1", tag)
|
||||||
|
ctx = ContextWithIOTag(ctx, "tag2")
|
||||||
|
tag, ok = IOTagFromContext(ctx)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "tag2", tag)
|
||||||
|
}
|
96
pkg/core/quota/grpc.go
Normal file
96
pkg/core/quota/grpc.go
Normal file
|
@ -0,0 +1,96 @@
|
||||||
|
package quota
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
ioTagHeader = "x-frostfs-io-tag"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewUnaryClientInteceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata.
|
||||||
|
func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor {
|
||||||
|
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||||
|
return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStreamClientInterceptor creates new gRPC stream interceptor to set an IO tag to gRPC metadata.
|
||||||
|
func NewStreamClientInterceptor() grpc.StreamClientInterceptor {
|
||||||
|
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||||
|
return streamer(setIOTagToGRPCMetadata(ctx), desc, cc, method, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUnaryServerInterceptor creates new gRPC unary interceptor to extract an IO tag to gRPC metadata.
|
||||||
|
func NewUnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||||
|
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||||
|
return handler(extractIOTagFromGRPCMetadata(ctx), req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStreamServerInterceptor creates new gRPC stream interceptor to extract an IO tag to gRPC metadata.
|
||||||
|
func NewStreamServerInterceptor() grpc.StreamServerInterceptor {
|
||||||
|
return func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||||
|
return handler(srv, &serverStream{origin: ss})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setIOTagToGRPCMetadata(ctx context.Context) context.Context {
|
||||||
|
ioTag, ok := IOTagFromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
md, ok := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
md = metadata.MD{}
|
||||||
|
}
|
||||||
|
md.Set(ioTagHeader, ioTag)
|
||||||
|
return metadata.NewOutgoingContext(ctx, md)
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractIOTagFromGRPCMetadata(ctx context.Context) context.Context {
|
||||||
|
md, ok := metadata.FromIncomingContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
values := md.Get(ioTagHeader)
|
||||||
|
if len(values) > 0 {
|
||||||
|
return ContextWithIOTag(ctx, values[0])
|
||||||
|
}
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ grpc.ServerStream = &serverStream{}
|
||||||
|
|
||||||
|
type serverStream struct {
|
||||||
|
origin grpc.ServerStream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverStream) Context() context.Context {
|
||||||
|
return extractIOTagFromGRPCMetadata(s.origin.Context())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverStream) RecvMsg(m any) error {
|
||||||
|
return s.origin.RecvMsg(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverStream) SendHeader(md metadata.MD) error {
|
||||||
|
return s.origin.SendHeader(md)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverStream) SendMsg(m any) error {
|
||||||
|
return s.origin.SendMsg(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverStream) SetHeader(md metadata.MD) error {
|
||||||
|
return s.origin.SetHeader(md)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverStream) SetTrailer(md metadata.MD) {
|
||||||
|
s.origin.SetTrailer(md)
|
||||||
|
}
|
536
pkg/core/quota/mclock.go
Normal file
536
pkg/core/quota/mclock.go
Normal file
|
@ -0,0 +1,536 @@
|
||||||
|
package quota
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/heap"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
invalidIndex = -1
|
||||||
|
undefinedReservation float64 = -1.0
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrSchedulerClosed = errors.New("mClock scheduler is closed")
|
||||||
|
ErrSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded")
|
||||||
|
)
|
||||||
|
|
||||||
|
type Release func()
|
||||||
|
|
||||||
|
type mQueueItem interface {
|
||||||
|
ts() float64
|
||||||
|
setIndex(idx int)
|
||||||
|
}
|
||||||
|
|
||||||
|
type mQueue struct {
|
||||||
|
items []mQueueItem
|
||||||
|
}
|
||||||
|
|
||||||
|
type request struct {
|
||||||
|
tag string
|
||||||
|
ts float64
|
||||||
|
|
||||||
|
reservation float64
|
||||||
|
limit float64
|
||||||
|
shares float64
|
||||||
|
|
||||||
|
reservationIdx int
|
||||||
|
limitIdx int
|
||||||
|
sharesIdx int
|
||||||
|
readyIdx int
|
||||||
|
|
||||||
|
scheduled chan struct{}
|
||||||
|
canceled chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type TagInfo struct {
|
||||||
|
reservation *float64
|
||||||
|
limit *float64
|
||||||
|
shares float64
|
||||||
|
}
|
||||||
|
|
||||||
|
type clock interface {
|
||||||
|
now() float64
|
||||||
|
runAt(ts float64, f func())
|
||||||
|
close()
|
||||||
|
}
|
||||||
|
|
||||||
|
type MClockQueue struct {
|
||||||
|
runLimit uint64
|
||||||
|
waitLimit int
|
||||||
|
clock clock
|
||||||
|
idleTimeout float64
|
||||||
|
tagInfo map[string]TagInfo
|
||||||
|
|
||||||
|
mtx sync.Mutex
|
||||||
|
previous map[string]*request
|
||||||
|
inProgress uint64
|
||||||
|
lastSchedule float64
|
||||||
|
reservationQueue *mQueue
|
||||||
|
limitQueue *mQueue
|
||||||
|
sharesQueue *mQueue
|
||||||
|
readyQueue *mQueue
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMClockQueue(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout float64) *MClockQueue {
|
||||||
|
result := &MClockQueue{
|
||||||
|
runLimit: runLimit,
|
||||||
|
waitLimit: int(waitLimit),
|
||||||
|
clock: newSystemClock(),
|
||||||
|
idleTimeout: idleTimeout,
|
||||||
|
tagInfo: tagInfo,
|
||||||
|
|
||||||
|
reservationQueue: &mQueue{
|
||||||
|
items: make([]mQueueItem, 0),
|
||||||
|
},
|
||||||
|
limitQueue: &mQueue{
|
||||||
|
items: make([]mQueueItem, 0),
|
||||||
|
},
|
||||||
|
sharesQueue: &mQueue{
|
||||||
|
items: make([]mQueueItem, 0),
|
||||||
|
},
|
||||||
|
readyQueue: &mQueue{
|
||||||
|
items: make([]mQueueItem, 0),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
previous := make(map[string]*request)
|
||||||
|
for tag := range tagInfo {
|
||||||
|
previous[tag] = &request{
|
||||||
|
tag: tag,
|
||||||
|
reservationIdx: invalidIndex,
|
||||||
|
limitIdx: invalidIndex,
|
||||||
|
sharesIdx: invalidIndex,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result.previous = previous
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) RequestArrival(ctx context.Context, tag string) (Release, error) {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
q.dropRequest(req)
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case <-req.scheduled:
|
||||||
|
return release, nil
|
||||||
|
case <-req.canceled:
|
||||||
|
return nil, ErrSchedulerClosed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) Close() {
|
||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
q.closed = true
|
||||||
|
q.clock.close()
|
||||||
|
for q.limitQueue.Len() > 0 {
|
||||||
|
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||||
|
close(item.r.canceled)
|
||||||
|
q.removeFromQueues(item.r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) dropRequest(req *request) {
|
||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-req.scheduled:
|
||||||
|
if q.inProgress == 0 {
|
||||||
|
panic("invalid requests count")
|
||||||
|
}
|
||||||
|
q.inProgress--
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
q.removeFromQueues(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) pushRequest(tag string) (*request, Release, error) {
|
||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
if q.closed {
|
||||||
|
return nil, nil, ErrSchedulerClosed
|
||||||
|
}
|
||||||
|
if q.sharesQueue.Len() == q.waitLimit {
|
||||||
|
return nil, nil, ErrSchedulerRequestLimitExceeded
|
||||||
|
}
|
||||||
|
|
||||||
|
now := q.clock.now()
|
||||||
|
tagInfo, ok := q.tagInfo[tag]
|
||||||
|
if !ok {
|
||||||
|
panic("unknown tag: " + tag)
|
||||||
|
}
|
||||||
|
prev, ok := q.previous[tag]
|
||||||
|
if !ok {
|
||||||
|
panic("undefined previous: " + tag)
|
||||||
|
}
|
||||||
|
|
||||||
|
if now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout
|
||||||
|
q.adjustTags(now, tag)
|
||||||
|
}
|
||||||
|
|
||||||
|
r := &request{
|
||||||
|
tag: tag,
|
||||||
|
ts: now,
|
||||||
|
shares: max(prev.shares+1.0/tagInfo.shares, now),
|
||||||
|
reservationIdx: invalidIndex,
|
||||||
|
limitIdx: invalidIndex,
|
||||||
|
sharesIdx: invalidIndex,
|
||||||
|
readyIdx: invalidIndex,
|
||||||
|
scheduled: make(chan struct{}),
|
||||||
|
canceled: make(chan struct{}),
|
||||||
|
}
|
||||||
|
if tagInfo.reservation != nil {
|
||||||
|
r.reservation = max(prev.reservation + 1.0 / *tagInfo.reservation, now)
|
||||||
|
} else {
|
||||||
|
r.reservation = undefinedReservation
|
||||||
|
}
|
||||||
|
|
||||||
|
if tagInfo.limit != nil {
|
||||||
|
r.limit = max(prev.limit + 1.0 / *tagInfo.limit, now)
|
||||||
|
} else {
|
||||||
|
r.limit = max(prev.limit, now)
|
||||||
|
}
|
||||||
|
|
||||||
|
q.previous[tag] = r
|
||||||
|
if tagInfo.reservation != nil {
|
||||||
|
heap.Push(q.reservationQueue, &reservationMQueueItem{r: r})
|
||||||
|
}
|
||||||
|
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
|
||||||
|
heap.Push(q.limitQueue, &limitMQueueItem{r: r})
|
||||||
|
q.scheduleRequest(true)
|
||||||
|
|
||||||
|
return r, q.requestCompleted, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) adjustTags(now float64, idleTag string) {
|
||||||
|
if q.sharesQueue.Len() == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
minShare := q.sharesQueue.items[0].ts()
|
||||||
|
for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed
|
||||||
|
limitItem := (item).(*limitMQueueItem)
|
||||||
|
if limitItem.r.tag == idleTag {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
limitItem.r.shares -= (minShare - now)
|
||||||
|
if limitItem.r.sharesIdx != invalidIndex {
|
||||||
|
heap.Fix(q.sharesQueue, limitItem.r.sharesIdx)
|
||||||
|
}
|
||||||
|
if limitItem.r.readyIdx != invalidIndex {
|
||||||
|
heap.Fix(q.readyQueue, limitItem.r.readyIdx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) scheduleRequest(lockTaken bool) {
|
||||||
|
if !lockTaken {
|
||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.inProgress >= q.runLimit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
now := q.clock.now()
|
||||||
|
q.scheduleByReservation(now)
|
||||||
|
if q.inProgress >= q.runLimit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
q.scheduleByLimitAndWeight(now)
|
||||||
|
if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
q.setNextScheduleTimer(now)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) setNextScheduleTimer(now float64) {
|
||||||
|
nextTs := math.MaxFloat64
|
||||||
|
if q.reservationQueue.Len() > 0 {
|
||||||
|
nextTs = q.reservationQueue.items[0].ts()
|
||||||
|
}
|
||||||
|
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
|
||||||
|
nextTs = q.limitQueue.items[0].ts()
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.lastSchedule < now && q.lastSchedule > nextTs {
|
||||||
|
q.clock.runAt(nextTs, func() {
|
||||||
|
q.scheduleRequest(false)
|
||||||
|
})
|
||||||
|
q.lastSchedule = nextTs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) scheduleByLimitAndWeight(now float64) {
|
||||||
|
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
|
||||||
|
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||||
|
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
|
||||||
|
}
|
||||||
|
|
||||||
|
for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 {
|
||||||
|
next := heap.Pop(q.readyQueue).(*readyMQueueItem)
|
||||||
|
hadReservation := false
|
||||||
|
if next.r.reservationIdx != invalidIndex {
|
||||||
|
hadReservation = true
|
||||||
|
heap.Remove(q.reservationQueue, next.r.reservationIdx)
|
||||||
|
}
|
||||||
|
q.removeFromQueues(next.r)
|
||||||
|
|
||||||
|
tagInfo, ok := q.tagInfo[next.r.tag]
|
||||||
|
if !ok {
|
||||||
|
panic("unknown tag: " + next.r.tag)
|
||||||
|
}
|
||||||
|
if tagInfo.reservation != nil && hadReservation {
|
||||||
|
var updated bool
|
||||||
|
for _, i := range q.reservationQueue.items {
|
||||||
|
ri := i.(*reservationMQueueItem)
|
||||||
|
if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation {
|
||||||
|
ri.r.reservation -= 1.0 / *tagInfo.reservation
|
||||||
|
updated = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if updated {
|
||||||
|
heap.Init(q.reservationQueue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-next.r.canceled:
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
assertIndexInvalid(next.r)
|
||||||
|
q.inProgress++
|
||||||
|
close(next.r.scheduled)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) scheduleByReservation(now float64) {
|
||||||
|
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
|
||||||
|
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
||||||
|
q.removeFromQueues(next.r)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-next.r.canceled:
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
assertIndexInvalid(next.r)
|
||||||
|
q.inProgress++
|
||||||
|
close(next.r.scheduled)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) removeFromQueues(r *request) {
|
||||||
|
if r.limitIdx != invalidIndex {
|
||||||
|
heap.Remove(q.limitQueue, r.limitIdx)
|
||||||
|
}
|
||||||
|
if r.sharesIdx != invalidIndex {
|
||||||
|
heap.Remove(q.sharesQueue, r.sharesIdx)
|
||||||
|
}
|
||||||
|
if r.readyIdx != invalidIndex {
|
||||||
|
heap.Remove(q.readyQueue, r.readyIdx)
|
||||||
|
}
|
||||||
|
if r.reservationIdx != invalidIndex {
|
||||||
|
heap.Remove(q.reservationQueue, r.readyIdx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *MClockQueue) requestCompleted() {
|
||||||
|
q.mtx.Lock()
|
||||||
|
defer q.mtx.Unlock()
|
||||||
|
|
||||||
|
if q.closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.inProgress == 0 {
|
||||||
|
panic("invalid requests count")
|
||||||
|
}
|
||||||
|
q.inProgress--
|
||||||
|
q.scheduleRequest(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertIndexInvalid(r *request) {
|
||||||
|
if r.limitIdx != invalidIndex {
|
||||||
|
panic("limitIdx is not -1")
|
||||||
|
}
|
||||||
|
if r.sharesIdx != invalidIndex {
|
||||||
|
panic("sharesIdx is not -1")
|
||||||
|
}
|
||||||
|
if r.reservationIdx != invalidIndex {
|
||||||
|
panic("reservationIdx is not -1")
|
||||||
|
}
|
||||||
|
if r.readyIdx != invalidIndex {
|
||||||
|
panic("readyIdx is not -1")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len implements heap.Interface.
|
||||||
|
func (q *mQueue) Len() int {
|
||||||
|
return len(q.items)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Less implements heap.Interface.
|
||||||
|
func (q *mQueue) Less(i int, j int) bool {
|
||||||
|
return q.items[i].ts() < q.items[j].ts()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pop implements heap.Interface.
|
||||||
|
func (q *mQueue) Pop() any {
|
||||||
|
n := len(q.items)
|
||||||
|
item := q.items[n-1]
|
||||||
|
q.items[n-1] = nil
|
||||||
|
q.items = q.items[0 : n-1]
|
||||||
|
item.setIndex(invalidIndex)
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push implements heap.Interface.
|
||||||
|
func (q *mQueue) Push(x any) {
|
||||||
|
it := x.(mQueueItem)
|
||||||
|
it.setIndex(q.Len())
|
||||||
|
q.items = append(q.items, it)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap implements heap.Interface.
|
||||||
|
func (q *mQueue) Swap(i int, j int) {
|
||||||
|
q.items[i], q.items[j] = q.items[j], q.items[i]
|
||||||
|
q.items[i].setIndex(i)
|
||||||
|
q.items[j].setIndex(j)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ mQueueItem = &reservationMQueueItem{}
|
||||||
|
|
||||||
|
type reservationMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *reservationMQueueItem) ts() float64 {
|
||||||
|
return i.r.reservation
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *reservationMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.reservationIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ mQueueItem = &limitMQueueItem{}
|
||||||
|
|
||||||
|
type limitMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *limitMQueueItem) ts() float64 {
|
||||||
|
return i.r.limit
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *limitMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.limitIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ mQueueItem = &sharesMQueueItem{}
|
||||||
|
|
||||||
|
type sharesMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *sharesMQueueItem) ts() float64 {
|
||||||
|
return i.r.shares
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *sharesMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.sharesIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ mQueueItem = &readyMQueueItem{}
|
||||||
|
|
||||||
|
type readyMQueueItem struct {
|
||||||
|
r *request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *readyMQueueItem) ts() float64 {
|
||||||
|
return i.r.shares
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *readyMQueueItem) setIndex(idx int) {
|
||||||
|
i.r.readyIdx = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
type scheduleInfo struct {
|
||||||
|
ts float64
|
||||||
|
f func()
|
||||||
|
}
|
||||||
|
|
||||||
|
type systemClock struct {
|
||||||
|
since time.Time
|
||||||
|
schedule chan scheduleInfo
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSystemClock() *systemClock {
|
||||||
|
c := &systemClock{
|
||||||
|
since: time.Now(),
|
||||||
|
}
|
||||||
|
c.start()
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) now() float64 {
|
||||||
|
return time.Since(c.since).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) runAt(ts float64, f func()) {
|
||||||
|
c.schedule <- scheduleInfo{ts: ts, f: f}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) close() {
|
||||||
|
close(c.schedule)
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *systemClock) start() {
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
t := time.NewTimer(time.Hour)
|
||||||
|
var f func()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
if f != nil {
|
||||||
|
f()
|
||||||
|
f = nil
|
||||||
|
}
|
||||||
|
t.Reset(time.Hour)
|
||||||
|
case s, ok := <-c.schedule:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
now := c.now()
|
||||||
|
if now >= s.ts {
|
||||||
|
s.f()
|
||||||
|
f = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.Reset(time.Duration((s.ts - now) * 1e9))
|
||||||
|
f = s.f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
400
pkg/core/quota/mclock_test.go
Normal file
400
pkg/core/quota/mclock_test.go
Normal file
|
@ -0,0 +1,400 @@
|
||||||
|
package quota
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMClockSharesScheduling(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
reqCount := 1000
|
||||||
|
reqCount = (reqCount / 2) * 2
|
||||||
|
q := NewMClockQueue(1, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {shares: 2},
|
||||||
|
"class2": {shares: 1},
|
||||||
|
}, 100)
|
||||||
|
q.clock = &noopClock{}
|
||||||
|
|
||||||
|
var releases []Release
|
||||||
|
var requests []*request
|
||||||
|
tag := "class1"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
tag = "class2"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []string
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < reqCount; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-requests[i].scheduled
|
||||||
|
result = append(result, requests[i].tag)
|
||||||
|
releases[i]()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
|
||||||
|
// because the ratio is 2 to 1.
|
||||||
|
// However, there may be deviations due to rounding and sorting.
|
||||||
|
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
|
||||||
|
var class1Count int
|
||||||
|
var class2Count int
|
||||||
|
var class2MaxSeq int
|
||||||
|
for _, res := range result {
|
||||||
|
switch res {
|
||||||
|
case "class1":
|
||||||
|
class1Count++
|
||||||
|
class2MaxSeq = 0
|
||||||
|
case "class2":
|
||||||
|
class2Count++
|
||||||
|
class2MaxSeq++
|
||||||
|
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ clock = &noopClock{}
|
||||||
|
|
||||||
|
type noopClock struct {
|
||||||
|
v float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClock) now() float64 {
|
||||||
|
return n.v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClock) runAt(ts float64, f func()) {}
|
||||||
|
|
||||||
|
func (n *noopClock) close() {}
|
||||||
|
|
||||||
|
func TestMClockRequestCancel(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := NewMClockQueue(1, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {shares: 2},
|
||||||
|
"class2": {shares: 1},
|
||||||
|
}, 100)
|
||||||
|
q.clock = &noopClock{}
|
||||||
|
|
||||||
|
release1, err := q.RequestArrival(context.Background(), "class1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
release2, err := q.RequestArrival(ctx, "class1")
|
||||||
|
require.Nil(t, release2)
|
||||||
|
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||||
|
|
||||||
|
require.Equal(t, 0, q.readyQueue.Len())
|
||||||
|
require.Equal(t, 0, q.sharesQueue.Len())
|
||||||
|
require.Equal(t, 0, q.limitQueue.Len())
|
||||||
|
require.Equal(t, 0, q.reservationQueue.Len())
|
||||||
|
|
||||||
|
release1()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockLimitScheduling(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
reqCount := 100
|
||||||
|
reqCount = (reqCount / 2) * 2
|
||||||
|
limit := 1.0
|
||||||
|
cl := &noopClock{}
|
||||||
|
q := NewMClockQueue(1, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {shares: 2, limit: &limit},
|
||||||
|
"class2": {shares: 1, limit: &limit},
|
||||||
|
}, 100)
|
||||||
|
q.clock = cl
|
||||||
|
|
||||||
|
var releases []Release
|
||||||
|
var requests []*request
|
||||||
|
tag := "class1"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
tag = "class2"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
|
||||||
|
q.scheduleRequest(false)
|
||||||
|
|
||||||
|
for _, req := range requests {
|
||||||
|
select {
|
||||||
|
case <-req.scheduled:
|
||||||
|
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cl.v = math.MaxFloat64
|
||||||
|
|
||||||
|
var result []string
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < reqCount; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-requests[i].scheduled
|
||||||
|
result = append(result, requests[i].tag)
|
||||||
|
releases[i]()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
q.scheduleRequest(false)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
|
||||||
|
// because the ratio is 2 to 1.
|
||||||
|
// However, there may be deviations due to rounding and sorting.
|
||||||
|
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
|
||||||
|
var class1Count int
|
||||||
|
var class2Count int
|
||||||
|
var class2MaxSeq int
|
||||||
|
for _, res := range result {
|
||||||
|
switch res {
|
||||||
|
case "class1":
|
||||||
|
class1Count++
|
||||||
|
class2MaxSeq = 0
|
||||||
|
case "class2":
|
||||||
|
class2Count++
|
||||||
|
class2MaxSeq++
|
||||||
|
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
|
||||||
|
default:
|
||||||
|
require.Fail(t, "unknown tag")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
|
||||||
|
|
||||||
|
require.Equal(t, 0, q.readyQueue.Len())
|
||||||
|
require.Equal(t, 0, q.sharesQueue.Len())
|
||||||
|
require.Equal(t, 0, q.limitQueue.Len())
|
||||||
|
require.Equal(t, 0, q.reservationQueue.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockReservationScheduling(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
reqCount := 1000
|
||||||
|
reqCount = (reqCount / 2) * 2
|
||||||
|
limit := 0.01 // 1 request in 100 seconds
|
||||||
|
resevation := 100.0 // 100 RPS
|
||||||
|
cl := &noopClock{}
|
||||||
|
q := NewMClockQueue(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {shares: 2, limit: &limit},
|
||||||
|
"class2": {shares: 1, limit: &limit, reservation: &resevation},
|
||||||
|
}, 100)
|
||||||
|
q.clock = cl
|
||||||
|
|
||||||
|
var releases []Release
|
||||||
|
var requests []*request
|
||||||
|
tag := "class1"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
tag = "class2"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
req, release, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
releases = append(releases, release)
|
||||||
|
}
|
||||||
|
|
||||||
|
q.scheduleRequest(false)
|
||||||
|
|
||||||
|
for _, req := range requests {
|
||||||
|
select {
|
||||||
|
case <-req.scheduled:
|
||||||
|
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cl.v = 1.00001 // 1s elapsed
|
||||||
|
q.scheduleRequest(false)
|
||||||
|
|
||||||
|
var result []string
|
||||||
|
for i, req := range requests {
|
||||||
|
select {
|
||||||
|
case <-req.scheduled:
|
||||||
|
result = append(result, requests[i].tag)
|
||||||
|
releases[i]()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, 100, len(result))
|
||||||
|
for _, res := range result {
|
||||||
|
require.Equal(t, "class2", res)
|
||||||
|
}
|
||||||
|
|
||||||
|
cl.v = math.MaxFloat64
|
||||||
|
q.scheduleRequest(false)
|
||||||
|
|
||||||
|
require.Equal(t, 0, q.readyQueue.Len())
|
||||||
|
require.Equal(t, 0, q.sharesQueue.Len())
|
||||||
|
require.Equal(t, 0, q.limitQueue.Len())
|
||||||
|
require.Equal(t, 0, q.reservationQueue.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockIdleTag(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
reqCount := 100
|
||||||
|
idleTimeout := 2.0
|
||||||
|
cl := &noopClock{}
|
||||||
|
q := NewMClockQueue(1, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {shares: 1},
|
||||||
|
"class2": {shares: 1},
|
||||||
|
}, idleTimeout)
|
||||||
|
q.clock = cl
|
||||||
|
|
||||||
|
var requests []*request
|
||||||
|
tag := "class1"
|
||||||
|
for i := 0; i < reqCount/2; i++ {
|
||||||
|
cl.v += idleTimeout / 2
|
||||||
|
req, _, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// class1 requests have shares [1.0; 2.0; 3.0; ... ]
|
||||||
|
|
||||||
|
cl.v += 2 * idleTimeout
|
||||||
|
|
||||||
|
tag = "class2"
|
||||||
|
req, _, err := q.pushRequest(tag)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requests = append(requests, req)
|
||||||
|
|
||||||
|
// class2 must be defined as idle, so all shares tags must be adjusted.
|
||||||
|
|
||||||
|
for _, req := range requests {
|
||||||
|
select {
|
||||||
|
case <-req.scheduled:
|
||||||
|
default:
|
||||||
|
require.True(t, req.shares >= cl.v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockClose(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := NewMClockQueue(1, math.MaxUint64, map[string]TagInfo{
|
||||||
|
"class1": {shares: 1},
|
||||||
|
}, 1000)
|
||||||
|
q.clock = &noopClock{}
|
||||||
|
|
||||||
|
requestRunning := make(chan struct{})
|
||||||
|
checkDone := make(chan struct{})
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
tag := "class1"
|
||||||
|
eg.Go(func() error {
|
||||||
|
release, err := q.RequestArrival(ctx, tag)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
|
close(requestRunning)
|
||||||
|
<-checkDone
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
<-requestRunning
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
release, err := q.RequestArrival(ctx, tag)
|
||||||
|
require.Nil(t, release)
|
||||||
|
require.ErrorIs(t, err, ErrSchedulerClosed)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// wait until second request will be blocked on wait
|
||||||
|
for q.limitQueue.Len() == 0 {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
q.Close()
|
||||||
|
|
||||||
|
release, err := q.RequestArrival(context.Background(), tag)
|
||||||
|
require.Nil(t, release)
|
||||||
|
require.ErrorIs(t, err, ErrSchedulerClosed)
|
||||||
|
|
||||||
|
close(checkDone)
|
||||||
|
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockWaitLimit(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := NewMClockQueue(1, 1, map[string]TagInfo{
|
||||||
|
"class1": {shares: 1},
|
||||||
|
}, 1000)
|
||||||
|
q.clock = &noopClock{}
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
requestRunning := make(chan struct{})
|
||||||
|
checkDone := make(chan struct{})
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
tag := "class1"
|
||||||
|
// running request
|
||||||
|
eg.Go(func() error {
|
||||||
|
release, err := q.RequestArrival(ctx, tag)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
|
close(requestRunning)
|
||||||
|
<-checkDone
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// waiting request
|
||||||
|
eg.Go(func() error {
|
||||||
|
<-requestRunning
|
||||||
|
release, err := q.RequestArrival(ctx, tag)
|
||||||
|
require.NotNil(t, release)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer release()
|
||||||
|
<-checkDone
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// wait until second request will be waiting
|
||||||
|
for q.sharesQueue.Len() == 0 {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
release, err := q.RequestArrival(ctx, tag)
|
||||||
|
require.Nil(t, release)
|
||||||
|
require.ErrorIs(t, err, ErrSchedulerRequestLimitExceeded)
|
||||||
|
|
||||||
|
close(checkDone)
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue