forked from TrueCloudLab/frostfs-node
[#479] morph/timer: Move block timer to morph package
Block timer is going to be reused in storage node to tick EigenTrust calculation rounds. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
434ecb41da
commit
376bb293b4
4 changed files with 27 additions and 26 deletions
168
pkg/morph/timer/block.go
Normal file
168
pkg/morph/timer/block.go
Normal file
|
@ -0,0 +1,168 @@
|
|||
package timer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// BlockMeter calculates block time interval dynamically.
|
||||
type BlockMeter func() (uint32, error)
|
||||
|
||||
// BlockTickHandler is a callback of a certain block advance.
|
||||
type BlockTickHandler func()
|
||||
|
||||
// BlockTimer represents block timer.
|
||||
//
|
||||
// It can tick the blocks and perform certain actions
|
||||
// on block time intervals.
|
||||
type BlockTimer struct {
|
||||
rolledBack bool
|
||||
|
||||
mtx *sync.Mutex
|
||||
|
||||
dur BlockMeter
|
||||
|
||||
baseDur uint32
|
||||
|
||||
mul, div uint32
|
||||
|
||||
cur, tgt uint32
|
||||
|
||||
h BlockTickHandler
|
||||
|
||||
ps []BlockTimer
|
||||
|
||||
deltaCfg
|
||||
}
|
||||
|
||||
// DeltaOption is an option of delta-interval handler.
|
||||
type DeltaOption func(*deltaCfg)
|
||||
|
||||
type deltaCfg struct {
|
||||
pulse bool
|
||||
}
|
||||
|
||||
// WithPulse returns option to call delta-interval handler multiple
|
||||
// times
|
||||
func WithPulse() DeltaOption {
|
||||
return func(c *deltaCfg) {
|
||||
c.pulse = true
|
||||
}
|
||||
}
|
||||
|
||||
// StaticBlockMeter returns BlockMeters that always returns (d, nil).
|
||||
func StaticBlockMeter(d uint32) BlockMeter {
|
||||
return func() (uint32, error) {
|
||||
return d, nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewBlockTimer creates a new BlockTimer.
|
||||
//
|
||||
// Reset should be called before timer ticking.
|
||||
func NewBlockTimer(dur BlockMeter, h BlockTickHandler) *BlockTimer {
|
||||
return &BlockTimer{
|
||||
mtx: new(sync.Mutex),
|
||||
dur: dur,
|
||||
mul: 1,
|
||||
div: 1,
|
||||
h: h,
|
||||
deltaCfg: deltaCfg{
|
||||
pulse: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// OnDelta registers handler which is executed on (mul / div * BlockMeter()) block
|
||||
// after basic interval reset.
|
||||
//
|
||||
// If WithPulse option is provided, handler is executed (mul / div * BlockMeter()) block
|
||||
// during base interval.
|
||||
func (t *BlockTimer) OnDelta(mul, div uint32, h BlockTickHandler, opts ...DeltaOption) {
|
||||
c := deltaCfg{
|
||||
pulse: false,
|
||||
}
|
||||
|
||||
for i := range opts {
|
||||
opts[i](&c)
|
||||
}
|
||||
|
||||
t.ps = append(t.ps, BlockTimer{
|
||||
mul: mul,
|
||||
div: div,
|
||||
h: h,
|
||||
|
||||
deltaCfg: c,
|
||||
})
|
||||
}
|
||||
|
||||
// Reset resets previous ticks of the BlockTimer.
|
||||
//
|
||||
// Returns BlockMeter's error upon occurrence.
|
||||
func (t *BlockTimer) Reset() error {
|
||||
d, err := t.dur()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.mtx.Lock()
|
||||
|
||||
t.resetWithBaseInterval(d)
|
||||
|
||||
for i := range t.ps {
|
||||
t.ps[i].resetWithBaseInterval(d)
|
||||
}
|
||||
|
||||
t.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *BlockTimer) resetWithBaseInterval(d uint32) {
|
||||
t.rolledBack = false
|
||||
t.baseDur = d
|
||||
t.reset()
|
||||
}
|
||||
|
||||
func (t *BlockTimer) reset() {
|
||||
mul, div := t.mul, t.div
|
||||
|
||||
if !t.pulse && t.rolledBack && mul < div {
|
||||
mul, div = 1, 1
|
||||
}
|
||||
|
||||
delta := mul * t.baseDur / div
|
||||
if delta == 0 {
|
||||
delta = 1
|
||||
}
|
||||
|
||||
t.tgt = delta
|
||||
t.cur = 0
|
||||
}
|
||||
|
||||
// Tick ticks one block in the BlockTimer.
|
||||
//
|
||||
// Executes all callbacks which are awaiting execution at the new block.
|
||||
func (t *BlockTimer) Tick() {
|
||||
t.mtx.Lock()
|
||||
t.tick()
|
||||
t.mtx.Unlock()
|
||||
}
|
||||
|
||||
func (t *BlockTimer) tick() {
|
||||
t.cur++
|
||||
|
||||
if t.cur == t.tgt {
|
||||
// it would be advisable to optimize such execution, for example:
|
||||
// 1. push handler to worker pool t.wp.Submit(h);
|
||||
// 2. call t.tickH(h)
|
||||
t.h()
|
||||
|
||||
t.cur = 0
|
||||
t.rolledBack = true
|
||||
t.reset()
|
||||
}
|
||||
|
||||
for i := range t.ps {
|
||||
t.ps[i].tick()
|
||||
}
|
||||
}
|
110
pkg/morph/timer/block_test.go
Normal file
110
pkg/morph/timer/block_test.go
Normal file
|
@ -0,0 +1,110 @@
|
|||
package timer_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/timer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func tickN(t *timer.BlockTimer, n uint32) {
|
||||
for i := uint32(0); i < n; i++ {
|
||||
t.Tick()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockTimer(t *testing.T) {
|
||||
blockDur := uint32(10)
|
||||
baseCallCounter := uint32(0)
|
||||
|
||||
bt := timer.NewBlockTimer(timer.StaticBlockMeter(blockDur), func() {
|
||||
baseCallCounter++
|
||||
})
|
||||
|
||||
require.NoError(t, bt.Reset())
|
||||
|
||||
intervalNum := uint32(7)
|
||||
|
||||
tickN(bt, intervalNum*blockDur)
|
||||
|
||||
require.Equal(t, intervalNum, uint32(baseCallCounter))
|
||||
|
||||
// add half-interval handler
|
||||
halfCallCounter := uint32(0)
|
||||
|
||||
bt.OnDelta(1, 2, func() {
|
||||
halfCallCounter++
|
||||
})
|
||||
|
||||
// add double interval handler
|
||||
doubleCallCounter := uint32(0)
|
||||
|
||||
bt.OnDelta(2, 1, func() {
|
||||
doubleCallCounter++
|
||||
})
|
||||
|
||||
require.NoError(t, bt.Reset())
|
||||
|
||||
baseCallCounter = 0
|
||||
intervalNum = 20
|
||||
|
||||
tickN(bt, intervalNum*blockDur)
|
||||
|
||||
require.Equal(t, intervalNum, uint32(halfCallCounter))
|
||||
require.Equal(t, intervalNum, uint32(baseCallCounter))
|
||||
require.Equal(t, intervalNum/2, uint32(doubleCallCounter))
|
||||
}
|
||||
|
||||
func TestDeltaPulse(t *testing.T) {
|
||||
blockDur := uint32(9)
|
||||
baseCallCounter := uint32(0)
|
||||
|
||||
bt := timer.NewBlockTimer(timer.StaticBlockMeter(blockDur), func() {
|
||||
baseCallCounter++
|
||||
})
|
||||
|
||||
deltaCallCounter := uint32(0)
|
||||
|
||||
div := uint32(3)
|
||||
|
||||
bt.OnDelta(1, div, func() {
|
||||
deltaCallCounter++
|
||||
}, timer.WithPulse())
|
||||
|
||||
require.NoError(t, bt.Reset())
|
||||
|
||||
intervalNum := uint32(7)
|
||||
|
||||
tickN(bt, intervalNum*blockDur)
|
||||
|
||||
require.Equal(t, intervalNum, uint32(baseCallCounter))
|
||||
require.Equal(t, intervalNum*div, uint32(deltaCallCounter))
|
||||
}
|
||||
|
||||
func TestDeltaReset(t *testing.T) {
|
||||
blockDur := uint32(6)
|
||||
baseCallCounter := 0
|
||||
|
||||
bt := timer.NewBlockTimer(timer.StaticBlockMeter(blockDur), func() {
|
||||
baseCallCounter++
|
||||
})
|
||||
|
||||
detlaCallCounter := 0
|
||||
|
||||
bt.OnDelta(1, 3, func() {
|
||||
detlaCallCounter++
|
||||
})
|
||||
|
||||
require.NoError(t, bt.Reset())
|
||||
|
||||
tickN(bt, 6)
|
||||
|
||||
require.Equal(t, 1, baseCallCounter)
|
||||
require.Equal(t, 1, detlaCallCounter)
|
||||
|
||||
require.NoError(t, bt.Reset())
|
||||
|
||||
tickN(bt, 3)
|
||||
|
||||
require.Equal(t, 2, detlaCallCounter)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue