From b5dc28f79cc752c152560767dc4863034b4e740d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 21 Jan 2021 22:16:43 +0300 Subject: [PATCH] [#324] ir/timers: Complicate the logic of fractional block intervals Call handler of the fractional block interval once between base interval ticks by default. Add option to call handler of fractional block interval multiple times (N times if fractional interval == BASE_INTERVAL / N). Signed-off-by: Leonard Lyubich --- pkg/innerring/timers/block.go | 75 ++++++++++++++++++++++++++---- pkg/innerring/timers/block_test.go | 30 +++++++++++- 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/pkg/innerring/timers/block.go b/pkg/innerring/timers/block.go index b0aebd48c..fea993337 100644 --- a/pkg/innerring/timers/block.go +++ b/pkg/innerring/timers/block.go @@ -15,10 +15,14 @@ type BlockTickHandler func() // It can tick the blocks and perform certain actions // on block time intervals. type BlockTimer struct { + rolledBack bool + mtx *sync.Mutex dur BlockMeter + baseDur uint32 + mul, div uint32 cur, tgt uint32 @@ -26,6 +30,23 @@ type BlockTimer struct { h BlockTickHandler ps []BlockTimer + + deltaCfg +} + +// DeltaOption is an option of delta-interval handler. +type DeltaOption func(*deltaCfg) + +type deltaCfg struct { + pulse bool +} + +// WithPulse returns option to call delta-interval handler multiple +// times +func WithPulse() DeltaOption { + return func(c *deltaCfg) { + c.pulse = true + } } // StaticBlockMeter returns BlockMeters that always returns (d, nil). @@ -45,15 +66,32 @@ func NewBlockTimer(dur BlockMeter, h BlockTickHandler) *BlockTimer { mul: 1, div: 1, h: h, + deltaCfg: deltaCfg{ + pulse: true, + }, } } -// OnDelta registers handler which is executed every (mul / div * BlockMeter()) block. -func (t *BlockTimer) OnDelta(mul, div uint32, h BlockTickHandler) { +// OnDelta registers handler which is executed on (mul / div * BlockMeter()) block +// after basic interval reset. +// +// If WithPulse option is provided, handler is executed (mul / div * BlockMeter()) block +// during base interval. +func (t *BlockTimer) OnDelta(mul, div uint32, h BlockTickHandler, opts ...DeltaOption) { + c := deltaCfg{ + pulse: false, + } + + for i := range opts { + opts[i](&c) + } + t.ps = append(t.ps, BlockTimer{ mul: mul, div: div, h: h, + + deltaCfg: c, }) } @@ -67,23 +105,40 @@ func (t *BlockTimer) Reset() error { } t.mtx.Lock() - t.reset(d) + + t.resetWithBaseInterval(d) + + for i := range t.ps { + t.ps[i].resetWithBaseInterval(d) + } + t.mtx.Unlock() return nil } -func (t *BlockTimer) reset(dur uint32) { - delta := t.mul * dur / t.div +func (t *BlockTimer) resetWithBaseInterval(d uint32) { + t.rolledBack = false + t.baseDur = d + t.reset() +} + +func (t *BlockTimer) reset() { + mul, div := t.mul, t.div + + if !t.pulse && t.rolledBack && mul < div { + mul, div = 1, 1 + } + + delta := mul * t.baseDur / div if delta == 0 { delta = 1 } t.tgt = delta - t.cur = 0 for i := range t.ps { - t.ps[i].reset(dur) + t.ps[i].reset() } } @@ -100,12 +155,14 @@ func (t *BlockTimer) tick() { t.cur++ if t.cur == t.tgt { - t.cur = 0 - // it would be advisable to optimize such execution, for example: // 1. push handler to worker pool t.wp.Submit(h); // 2. call t.tickH(h) t.h() + + t.cur = 0 + t.rolledBack = true + t.reset() } for i := range t.ps { diff --git a/pkg/innerring/timers/block_test.go b/pkg/innerring/timers/block_test.go index da677a581..533c442c6 100644 --- a/pkg/innerring/timers/block_test.go +++ b/pkg/innerring/timers/block_test.go @@ -1,6 +1,7 @@ package timers_test import ( + "fmt" "testing" "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" @@ -50,7 +51,34 @@ func TestBlockTimer(t *testing.T) { tickN(bt, intervalNum*blockDur) - require.Equal(t, intervalNum*2, uint32(halfCallCounter)) + require.Equal(t, intervalNum, uint32(halfCallCounter)) require.Equal(t, intervalNum, uint32(baseCallCounter)) require.Equal(t, intervalNum/2, uint32(doubleCallCounter)) } + +func TestDeltaPulse(t *testing.T) { + blockDur := uint32(9) + baseCallCounter := uint32(0) + + bt := timers.NewBlockTimer(timers.StaticBlockMeter(blockDur), func() { + baseCallCounter++ + }) + + deltaCallCounter := uint32(0) + + div := uint32(3) + + bt.OnDelta(1, div, func() { + deltaCallCounter++ + }, timers.WithPulse()) + + require.NoError(t, bt.Reset()) + + intervalNum := uint32(7) + + tickN(bt, intervalNum*blockDur) + + fmt.Println(baseCallCounter, deltaCallCounter) + require.Equal(t, intervalNum, uint32(baseCallCounter)) + require.Equal(t, intervalNum*div, uint32(deltaCallCounter)) +}