diff --git a/pkg/innerring/timers/block.go b/pkg/innerring/timers/block.go index b0aebd48c..fea993337 100644 --- a/pkg/innerring/timers/block.go +++ b/pkg/innerring/timers/block.go @@ -15,10 +15,14 @@ type BlockTickHandler func() // It can tick the blocks and perform certain actions // on block time intervals. type BlockTimer struct { + rolledBack bool + mtx *sync.Mutex dur BlockMeter + baseDur uint32 + mul, div uint32 cur, tgt uint32 @@ -26,6 +30,23 @@ type BlockTimer struct { h BlockTickHandler ps []BlockTimer + + deltaCfg +} + +// DeltaOption is an option of delta-interval handler. +type DeltaOption func(*deltaCfg) + +type deltaCfg struct { + pulse bool +} + +// WithPulse returns option to call delta-interval handler multiple +// times +func WithPulse() DeltaOption { + return func(c *deltaCfg) { + c.pulse = true + } } // StaticBlockMeter returns BlockMeters that always returns (d, nil). @@ -45,15 +66,32 @@ func NewBlockTimer(dur BlockMeter, h BlockTickHandler) *BlockTimer { mul: 1, div: 1, h: h, + deltaCfg: deltaCfg{ + pulse: true, + }, } } -// OnDelta registers handler which is executed every (mul / div * BlockMeter()) block. -func (t *BlockTimer) OnDelta(mul, div uint32, h BlockTickHandler) { +// OnDelta registers handler which is executed on (mul / div * BlockMeter()) block +// after basic interval reset. +// +// If WithPulse option is provided, handler is executed (mul / div * BlockMeter()) block +// during base interval. +func (t *BlockTimer) OnDelta(mul, div uint32, h BlockTickHandler, opts ...DeltaOption) { + c := deltaCfg{ + pulse: false, + } + + for i := range opts { + opts[i](&c) + } + t.ps = append(t.ps, BlockTimer{ mul: mul, div: div, h: h, + + deltaCfg: c, }) } @@ -67,23 +105,40 @@ func (t *BlockTimer) Reset() error { } t.mtx.Lock() - t.reset(d) + + t.resetWithBaseInterval(d) + + for i := range t.ps { + t.ps[i].resetWithBaseInterval(d) + } + t.mtx.Unlock() return nil } -func (t *BlockTimer) reset(dur uint32) { - delta := t.mul * dur / t.div +func (t *BlockTimer) resetWithBaseInterval(d uint32) { + t.rolledBack = false + t.baseDur = d + t.reset() +} + +func (t *BlockTimer) reset() { + mul, div := t.mul, t.div + + if !t.pulse && t.rolledBack && mul < div { + mul, div = 1, 1 + } + + delta := mul * t.baseDur / div if delta == 0 { delta = 1 } t.tgt = delta - t.cur = 0 for i := range t.ps { - t.ps[i].reset(dur) + t.ps[i].reset() } } @@ -100,12 +155,14 @@ func (t *BlockTimer) tick() { t.cur++ if t.cur == t.tgt { - t.cur = 0 - // it would be advisable to optimize such execution, for example: // 1. push handler to worker pool t.wp.Submit(h); // 2. call t.tickH(h) t.h() + + t.cur = 0 + t.rolledBack = true + t.reset() } for i := range t.ps { diff --git a/pkg/innerring/timers/block_test.go b/pkg/innerring/timers/block_test.go index da677a581..533c442c6 100644 --- a/pkg/innerring/timers/block_test.go +++ b/pkg/innerring/timers/block_test.go @@ -1,6 +1,7 @@ package timers_test import ( + "fmt" "testing" "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" @@ -50,7 +51,34 @@ func TestBlockTimer(t *testing.T) { tickN(bt, intervalNum*blockDur) - require.Equal(t, intervalNum*2, uint32(halfCallCounter)) + require.Equal(t, intervalNum, uint32(halfCallCounter)) require.Equal(t, intervalNum, uint32(baseCallCounter)) require.Equal(t, intervalNum/2, uint32(doubleCallCounter)) } + +func TestDeltaPulse(t *testing.T) { + blockDur := uint32(9) + baseCallCounter := uint32(0) + + bt := timers.NewBlockTimer(timers.StaticBlockMeter(blockDur), func() { + baseCallCounter++ + }) + + deltaCallCounter := uint32(0) + + div := uint32(3) + + bt.OnDelta(1, div, func() { + deltaCallCounter++ + }, timers.WithPulse()) + + require.NoError(t, bt.Reset()) + + intervalNum := uint32(7) + + tickN(bt, intervalNum*blockDur) + + fmt.Println(baseCallCounter, deltaCallCounter) + require.Equal(t, intervalNum, uint32(baseCallCounter)) + require.Equal(t, intervalNum*div, uint32(deltaCallCounter)) +}