From 8c4bf81351068f5437dde969555c062f41215fd9 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 1 Feb 2021 19:18:34 +0300 Subject: [PATCH] [#360] Use basic implement context in settlement processor Signed-off-by: Alex Vanin --- pkg/innerring/processors/settlement/calls.go | 45 ++++++++++++++++++ pkg/innerring/processors/settlement/deps.go | 10 ++++ pkg/innerring/processors/settlement/events.go | 19 ++++++++ .../processors/settlement/processor.go | 47 ++++++++++++++----- 4 files changed, 108 insertions(+), 13 deletions(-) diff --git a/pkg/innerring/processors/settlement/calls.go b/pkg/innerring/processors/settlement/calls.go index 71e3e24ae..609d9f958 100644 --- a/pkg/innerring/processors/settlement/calls.go +++ b/pkg/innerring/processors/settlement/calls.go @@ -40,3 +40,48 @@ func (p *Processor) HandleAuditEvent(e event.Event) { log.Debug("AuditEvent handling successfully scheduled") } + +func (p *Processor) HandleIncomeCollectionEvent(e event.Event) { + ev := e.(BasicIncomeCollectEvent) + epoch := ev.Epoch() + + if !p.state.IsActive() { + p.log.Info("passive mode, ignore income collection event") + + return + } + + p.log.Info("start basic income collection", + zap.Uint64("epoch", epoch)) + + p.contextMu.Lock() + defer p.contextMu.Unlock() + + if _, ok := p.incomeContexts[epoch]; ok { + p.log.Error("income context already exists", + zap.Uint64("epoch", epoch)) + + return + } + + incomeCtx, err := p.basicIncome.CreateContext(epoch) + if err != nil { + p.log.Error("can't create income context", + zap.String("error", err.Error())) + + return + } + + p.incomeContexts[epoch] = incomeCtx + + err = p.pool.Submit(func() { + incomeCtx.Collect() + }) + if err != nil { + p.log.Warn("could not add handler of basic income collection to queue", + zap.String("error", err.Error()), + ) + + return + } +} diff --git a/pkg/innerring/processors/settlement/deps.go b/pkg/innerring/processors/settlement/deps.go index d5400ffae..76ed2574a 100644 --- a/pkg/innerring/processors/settlement/deps.go +++ b/pkg/innerring/processors/settlement/deps.go @@ -1,7 +1,17 @@ package settlement +import ( + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/basic" +) + // AuditProcessor is an interface of data audit fee processor. type AuditProcessor interface { // Must process data audit conducted in epoch. ProcessAuditSettlements(epoch uint64) } + +// BasicIncomeInitializer is an interface of basic income context creator. +type BasicIncomeInitializer interface { + // Creates context that processes basic income for provided epoch. + CreateContext(epoch uint64) (*basic.IncomeSettlementContext, error) +} diff --git a/pkg/innerring/processors/settlement/events.go b/pkg/innerring/processors/settlement/events.go index 276c40417..c3ab18364 100644 --- a/pkg/innerring/processors/settlement/events.go +++ b/pkg/innerring/processors/settlement/events.go @@ -10,6 +10,11 @@ type AuditEvent struct { epoch uint64 } +type ( + BasicIncomeCollectEvent = AuditEvent + BasicIncomeDistributeEvent = AuditEvent +) + // MorphEvent implements Neo:Morph event. func (e AuditEvent) MorphEvent() {} @@ -25,3 +30,17 @@ func NewAuditEvent(epoch uint64) event.Event { func (e AuditEvent) Epoch() uint64 { return e.epoch } + +// NewBasicIncomeCollectEvent for epoch. +func NewBasicIncomeCollectEvent(epoch uint64) event.Event { + return BasicIncomeCollectEvent{ + epoch: epoch, + } +} + +// NewBasicIncomeDistributeEvent for epoch. +func NewBasicIncomeDistributeEvent(epoch uint64) event.Event { + return BasicIncomeDistributeEvent{ + epoch: epoch, + } +} diff --git a/pkg/innerring/processors/settlement/processor.go b/pkg/innerring/processors/settlement/processor.go index c15172e5f..fc7856bdd 100644 --- a/pkg/innerring/processors/settlement/processor.go +++ b/pkg/innerring/processors/settlement/processor.go @@ -2,7 +2,9 @@ package settlement import ( "fmt" + "sync" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/basic" nodeutil "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/panjf2000/ants/v2" @@ -10,19 +12,35 @@ import ( "go.uber.org/zap" ) -// Processor is an event handler for payments in the system. -type Processor struct { - log *logger.Logger +type ( + // ActiveState is a callback interface for inner ring global state + ActiveState interface { + IsActive() bool + } - pool nodeutil.WorkerPool + // Processor is an event handler for payments in the system. + Processor struct { + log *logger.Logger - auditProc AuditProcessor -} + state ActiveState -// Prm groups the required parameters of Processor's constructor. -type Prm struct { - AuditProcessor AuditProcessor -} + pool nodeutil.WorkerPool + + auditProc AuditProcessor + + basicIncome BasicIncomeInitializer + + contextMu sync.Mutex + incomeContexts map[uint64]*basic.IncomeSettlementContext + } + + // Prm groups the required parameters of Processor's constructor. + Prm struct { + AuditProcessor AuditProcessor + BasicIncome BasicIncomeInitializer + State ActiveState + } +) func panicOnPrmValue(n string, v interface{}) { panic(fmt.Sprintf("invalid parameter %s (%T):%v", n, v, v)) @@ -51,8 +69,11 @@ func New(prm Prm, opts ...Option) *Processor { ) return &Processor{ - log: o.log, - pool: pool, - auditProc: prm.AuditProcessor, + log: o.log, + state: prm.State, + pool: pool, + auditProc: prm.AuditProcessor, + basicIncome: prm.BasicIncome, + incomeContexts: make(map[uint64]*basic.IncomeSettlementContext), } }