diff --git a/pkg/innerring/processors/settlement/calls.go b/pkg/innerring/processors/settlement/calls.go index 609d9f958..fa6c1190f 100644 --- a/pkg/innerring/processors/settlement/calls.go +++ b/pkg/innerring/processors/settlement/calls.go @@ -85,3 +85,41 @@ func (p *Processor) HandleIncomeCollectionEvent(e event.Event) { return } } + +func (p *Processor) HandleIncomeDistributionEvent(e event.Event) { + ev := e.(BasicIncomeDistributeEvent) + epoch := ev.Epoch() + + if !p.state.IsActive() { + p.log.Info("passive mode, ignore income distribution event") + + return + } + + p.log.Info("start basic income distribution", + zap.Uint64("epoch", epoch)) + + p.contextMu.Lock() + defer p.contextMu.Unlock() + + incomeCtx, ok := p.incomeContexts[epoch] + delete(p.incomeContexts, epoch) + + if !ok { + p.log.Warn("income context distribution does not exists", + zap.Uint64("epoch", epoch)) + + return + } + + err := p.pool.Submit(func() { + incomeCtx.Distribute() + }) + if err != nil { + p.log.Warn("could not add handler of basic income distribution to queue", + zap.String("error", err.Error()), + ) + + return + } +} diff --git a/pkg/innerring/settlement.go b/pkg/innerring/settlement.go index 080c5c4f4..56e453142 100644 --- a/pkg/innerring/settlement.go +++ b/pkg/innerring/settlement.go @@ -279,5 +279,6 @@ func (b *basicSettlementConstructor) CreateContext(epoch uint64) (*basic.IncomeS Container: b.dep, Placement: b.dep, Exchange: b.dep, + Accounts: b.dep, }) }