forked from TrueCloudLab/frostfs-node
[#324] ir/timers: Implement chain block timer
Implement a timer that can tick chain blocks and perform actions at time intervals in the blocks. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
23c220ae28
commit
476349dd1c
2 changed files with 170 additions and 0 deletions
114
pkg/innerring/timers/block.go
Normal file
114
pkg/innerring/timers/block.go
Normal file
|
@ -0,0 +1,114 @@
|
||||||
|
package timers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BlockMeter calculates block time interval dynamically.
|
||||||
|
type BlockMeter func() (uint32, error)
|
||||||
|
|
||||||
|
// BlockTickHandler is a callback of a certain block advance.
|
||||||
|
type BlockTickHandler func()
|
||||||
|
|
||||||
|
// BlockTimer represents block timer.
|
||||||
|
//
|
||||||
|
// It can tick the blocks and perform certain actions
|
||||||
|
// on block time intervals.
|
||||||
|
type BlockTimer struct {
|
||||||
|
mtx *sync.Mutex
|
||||||
|
|
||||||
|
dur BlockMeter
|
||||||
|
|
||||||
|
mul, div uint32
|
||||||
|
|
||||||
|
cur, tgt uint32
|
||||||
|
|
||||||
|
h BlockTickHandler
|
||||||
|
|
||||||
|
ps []BlockTimer
|
||||||
|
}
|
||||||
|
|
||||||
|
// StaticBlockMeter returns BlockMeters that always returns (d, nil).
|
||||||
|
func StaticBlockMeter(d uint32) BlockMeter {
|
||||||
|
return func() (uint32, error) {
|
||||||
|
return d, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBlockTimer creates a new BlockTimer.
|
||||||
|
//
|
||||||
|
// Reset should be called before timer ticking.
|
||||||
|
func NewBlockTimer(dur BlockMeter, h BlockTickHandler) *BlockTimer {
|
||||||
|
return &BlockTimer{
|
||||||
|
mtx: new(sync.Mutex),
|
||||||
|
dur: dur,
|
||||||
|
mul: 1,
|
||||||
|
div: 1,
|
||||||
|
h: h,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnDelta registers handler which is executed every (mul / div * BlockMeter()) block.
|
||||||
|
func (t *BlockTimer) OnDelta(mul, div uint32, h BlockTickHandler) {
|
||||||
|
t.ps = append(t.ps, BlockTimer{
|
||||||
|
mul: mul,
|
||||||
|
div: div,
|
||||||
|
h: h,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset resets previous ticks of the BlockTimer.
|
||||||
|
//
|
||||||
|
// Returns BlockMeter's error upon occurrence.
|
||||||
|
func (t *BlockTimer) Reset() error {
|
||||||
|
d, err := t.dur()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.mtx.Lock()
|
||||||
|
t.reset(d)
|
||||||
|
t.mtx.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *BlockTimer) reset(dur uint32) {
|
||||||
|
delta := t.mul * dur / t.div
|
||||||
|
if delta == 0 {
|
||||||
|
delta = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
t.tgt = delta
|
||||||
|
t.cur = 0
|
||||||
|
|
||||||
|
for i := range t.ps {
|
||||||
|
t.ps[i].reset(dur)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tick ticks one block in the BlockTimer.
|
||||||
|
//
|
||||||
|
// Executes all callbacks which are awaiting execution at the new block.
|
||||||
|
func (t *BlockTimer) Tick() {
|
||||||
|
t.mtx.Lock()
|
||||||
|
t.tick()
|
||||||
|
t.mtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *BlockTimer) tick() {
|
||||||
|
t.cur++
|
||||||
|
|
||||||
|
if t.cur == t.tgt {
|
||||||
|
t.cur = 0
|
||||||
|
|
||||||
|
// it would be advisable to optimize such execution, for example:
|
||||||
|
// 1. push handler to worker pool t.wp.Submit(h);
|
||||||
|
// 2. call t.tickH(h)
|
||||||
|
t.h()
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range t.ps {
|
||||||
|
t.ps[i].tick()
|
||||||
|
}
|
||||||
|
}
|
56
pkg/innerring/timers/block_test.go
Normal file
56
pkg/innerring/timers/block_test.go
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
package timers_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func tickN(t *timers.BlockTimer, n uint32) {
|
||||||
|
for i := uint32(0); i < n; i++ {
|
||||||
|
t.Tick()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBlockTimer(t *testing.T) {
|
||||||
|
blockDur := uint32(10)
|
||||||
|
baseCallCounter := uint32(0)
|
||||||
|
|
||||||
|
bt := timers.NewBlockTimer(timers.StaticBlockMeter(blockDur), func() {
|
||||||
|
baseCallCounter++
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, bt.Reset())
|
||||||
|
|
||||||
|
intervalNum := uint32(7)
|
||||||
|
|
||||||
|
tickN(bt, intervalNum*blockDur)
|
||||||
|
|
||||||
|
require.Equal(t, intervalNum, uint32(baseCallCounter))
|
||||||
|
|
||||||
|
// add half-interval handler
|
||||||
|
halfCallCounter := uint32(0)
|
||||||
|
|
||||||
|
bt.OnDelta(1, 2, func() {
|
||||||
|
halfCallCounter++
|
||||||
|
})
|
||||||
|
|
||||||
|
// add double interval handler
|
||||||
|
doubleCallCounter := uint32(0)
|
||||||
|
|
||||||
|
bt.OnDelta(2, 1, func() {
|
||||||
|
doubleCallCounter++
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, bt.Reset())
|
||||||
|
|
||||||
|
baseCallCounter = 0
|
||||||
|
intervalNum = 20
|
||||||
|
|
||||||
|
tickN(bt, intervalNum*blockDur)
|
||||||
|
|
||||||
|
require.Equal(t, intervalNum*2, uint32(halfCallCounter))
|
||||||
|
require.Equal(t, intervalNum, uint32(baseCallCounter))
|
||||||
|
require.Equal(t, intervalNum/2, uint32(doubleCallCounter))
|
||||||
|
}
|
Loading…
Reference in a new issue