forked from TrueCloudLab/frostfs-node
[#324] ir/timers: Complicate the logic of fractional block intervals
Call handler of the fractional block interval once between base interval ticks by default. Add option to call handler of fractional block interval multiple times (N times if fractional interval == BASE_INTERVAL / N). Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
476349dd1c
commit
b5dc28f79c
2 changed files with 95 additions and 10 deletions
|
@ -15,10 +15,14 @@ type BlockTickHandler func()
|
||||||
// It can tick the blocks and perform certain actions
|
// It can tick the blocks and perform certain actions
|
||||||
// on block time intervals.
|
// on block time intervals.
|
||||||
type BlockTimer struct {
|
type BlockTimer struct {
|
||||||
|
rolledBack bool
|
||||||
|
|
||||||
mtx *sync.Mutex
|
mtx *sync.Mutex
|
||||||
|
|
||||||
dur BlockMeter
|
dur BlockMeter
|
||||||
|
|
||||||
|
baseDur uint32
|
||||||
|
|
||||||
mul, div uint32
|
mul, div uint32
|
||||||
|
|
||||||
cur, tgt uint32
|
cur, tgt uint32
|
||||||
|
@ -26,6 +30,23 @@ type BlockTimer struct {
|
||||||
h BlockTickHandler
|
h BlockTickHandler
|
||||||
|
|
||||||
ps []BlockTimer
|
ps []BlockTimer
|
||||||
|
|
||||||
|
deltaCfg
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeltaOption is an option of delta-interval handler.
|
||||||
|
type DeltaOption func(*deltaCfg)
|
||||||
|
|
||||||
|
type deltaCfg struct {
|
||||||
|
pulse bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithPulse returns option to call delta-interval handler multiple
|
||||||
|
// times
|
||||||
|
func WithPulse() DeltaOption {
|
||||||
|
return func(c *deltaCfg) {
|
||||||
|
c.pulse = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// StaticBlockMeter returns BlockMeters that always returns (d, nil).
|
// StaticBlockMeter returns BlockMeters that always returns (d, nil).
|
||||||
|
@ -45,15 +66,32 @@ func NewBlockTimer(dur BlockMeter, h BlockTickHandler) *BlockTimer {
|
||||||
mul: 1,
|
mul: 1,
|
||||||
div: 1,
|
div: 1,
|
||||||
h: h,
|
h: h,
|
||||||
|
deltaCfg: deltaCfg{
|
||||||
|
pulse: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnDelta registers handler which is executed every (mul / div * BlockMeter()) block.
|
// OnDelta registers handler which is executed on (mul / div * BlockMeter()) block
|
||||||
func (t *BlockTimer) OnDelta(mul, div uint32, h BlockTickHandler) {
|
// after basic interval reset.
|
||||||
|
//
|
||||||
|
// If WithPulse option is provided, handler is executed (mul / div * BlockMeter()) block
|
||||||
|
// during base interval.
|
||||||
|
func (t *BlockTimer) OnDelta(mul, div uint32, h BlockTickHandler, opts ...DeltaOption) {
|
||||||
|
c := deltaCfg{
|
||||||
|
pulse: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range opts {
|
||||||
|
opts[i](&c)
|
||||||
|
}
|
||||||
|
|
||||||
t.ps = append(t.ps, BlockTimer{
|
t.ps = append(t.ps, BlockTimer{
|
||||||
mul: mul,
|
mul: mul,
|
||||||
div: div,
|
div: div,
|
||||||
h: h,
|
h: h,
|
||||||
|
|
||||||
|
deltaCfg: c,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,23 +105,40 @@ func (t *BlockTimer) Reset() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.mtx.Lock()
|
t.mtx.Lock()
|
||||||
t.reset(d)
|
|
||||||
|
t.resetWithBaseInterval(d)
|
||||||
|
|
||||||
|
for i := range t.ps {
|
||||||
|
t.ps[i].resetWithBaseInterval(d)
|
||||||
|
}
|
||||||
|
|
||||||
t.mtx.Unlock()
|
t.mtx.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BlockTimer) reset(dur uint32) {
|
func (t *BlockTimer) resetWithBaseInterval(d uint32) {
|
||||||
delta := t.mul * dur / t.div
|
t.rolledBack = false
|
||||||
|
t.baseDur = d
|
||||||
|
t.reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *BlockTimer) reset() {
|
||||||
|
mul, div := t.mul, t.div
|
||||||
|
|
||||||
|
if !t.pulse && t.rolledBack && mul < div {
|
||||||
|
mul, div = 1, 1
|
||||||
|
}
|
||||||
|
|
||||||
|
delta := mul * t.baseDur / div
|
||||||
if delta == 0 {
|
if delta == 0 {
|
||||||
delta = 1
|
delta = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
t.tgt = delta
|
t.tgt = delta
|
||||||
t.cur = 0
|
|
||||||
|
|
||||||
for i := range t.ps {
|
for i := range t.ps {
|
||||||
t.ps[i].reset(dur)
|
t.ps[i].reset()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,12 +155,14 @@ func (t *BlockTimer) tick() {
|
||||||
t.cur++
|
t.cur++
|
||||||
|
|
||||||
if t.cur == t.tgt {
|
if t.cur == t.tgt {
|
||||||
t.cur = 0
|
|
||||||
|
|
||||||
// it would be advisable to optimize such execution, for example:
|
// it would be advisable to optimize such execution, for example:
|
||||||
// 1. push handler to worker pool t.wp.Submit(h);
|
// 1. push handler to worker pool t.wp.Submit(h);
|
||||||
// 2. call t.tickH(h)
|
// 2. call t.tickH(h)
|
||||||
t.h()
|
t.h()
|
||||||
|
|
||||||
|
t.cur = 0
|
||||||
|
t.rolledBack = true
|
||||||
|
t.reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range t.ps {
|
for i := range t.ps {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package timers_test
|
package timers_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
|
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
|
||||||
|
@ -50,7 +51,34 @@ func TestBlockTimer(t *testing.T) {
|
||||||
|
|
||||||
tickN(bt, intervalNum*blockDur)
|
tickN(bt, intervalNum*blockDur)
|
||||||
|
|
||||||
require.Equal(t, intervalNum*2, uint32(halfCallCounter))
|
require.Equal(t, intervalNum, uint32(halfCallCounter))
|
||||||
require.Equal(t, intervalNum, uint32(baseCallCounter))
|
require.Equal(t, intervalNum, uint32(baseCallCounter))
|
||||||
require.Equal(t, intervalNum/2, uint32(doubleCallCounter))
|
require.Equal(t, intervalNum/2, uint32(doubleCallCounter))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeltaPulse(t *testing.T) {
|
||||||
|
blockDur := uint32(9)
|
||||||
|
baseCallCounter := uint32(0)
|
||||||
|
|
||||||
|
bt := timers.NewBlockTimer(timers.StaticBlockMeter(blockDur), func() {
|
||||||
|
baseCallCounter++
|
||||||
|
})
|
||||||
|
|
||||||
|
deltaCallCounter := uint32(0)
|
||||||
|
|
||||||
|
div := uint32(3)
|
||||||
|
|
||||||
|
bt.OnDelta(1, div, func() {
|
||||||
|
deltaCallCounter++
|
||||||
|
}, timers.WithPulse())
|
||||||
|
|
||||||
|
require.NoError(t, bt.Reset())
|
||||||
|
|
||||||
|
intervalNum := uint32(7)
|
||||||
|
|
||||||
|
tickN(bt, intervalNum*blockDur)
|
||||||
|
|
||||||
|
fmt.Println(baseCallCounter, deltaCallCounter)
|
||||||
|
require.Equal(t, intervalNum, uint32(baseCallCounter))
|
||||||
|
require.Equal(t, intervalNum*div, uint32(deltaCallCounter))
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue