[#360] Use basic implement context in settlement processor

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-02-01 19:18:34 +03:00 committed by Alex Vanin
parent a624bb881d
commit 8c4bf81351
4 changed files with 108 additions and 13 deletions

View file

@ -40,3 +40,48 @@ func (p *Processor) HandleAuditEvent(e event.Event) {
log.Debug("AuditEvent handling successfully scheduled") log.Debug("AuditEvent handling successfully scheduled")
} }
func (p *Processor) HandleIncomeCollectionEvent(e event.Event) {
ev := e.(BasicIncomeCollectEvent)
epoch := ev.Epoch()
if !p.state.IsActive() {
p.log.Info("passive mode, ignore income collection event")
return
}
p.log.Info("start basic income collection",
zap.Uint64("epoch", epoch))
p.contextMu.Lock()
defer p.contextMu.Unlock()
if _, ok := p.incomeContexts[epoch]; ok {
p.log.Error("income context already exists",
zap.Uint64("epoch", epoch))
return
}
incomeCtx, err := p.basicIncome.CreateContext(epoch)
if err != nil {
p.log.Error("can't create income context",
zap.String("error", err.Error()))
return
}
p.incomeContexts[epoch] = incomeCtx
err = p.pool.Submit(func() {
incomeCtx.Collect()
})
if err != nil {
p.log.Warn("could not add handler of basic income collection to queue",
zap.String("error", err.Error()),
)
return
}
}

View file

@ -1,7 +1,17 @@
package settlement package settlement
import (
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/basic"
)
// AuditProcessor is an interface of data audit fee processor. // AuditProcessor is an interface of data audit fee processor.
type AuditProcessor interface { type AuditProcessor interface {
// Must process data audit conducted in epoch. // Must process data audit conducted in epoch.
ProcessAuditSettlements(epoch uint64) ProcessAuditSettlements(epoch uint64)
} }
// BasicIncomeInitializer is an interface of basic income context creator.
type BasicIncomeInitializer interface {
// Creates context that processes basic income for provided epoch.
CreateContext(epoch uint64) (*basic.IncomeSettlementContext, error)
}

View file

@ -10,6 +10,11 @@ type AuditEvent struct {
epoch uint64 epoch uint64
} }
type (
BasicIncomeCollectEvent = AuditEvent
BasicIncomeDistributeEvent = AuditEvent
)
// MorphEvent implements Neo:Morph event. // MorphEvent implements Neo:Morph event.
func (e AuditEvent) MorphEvent() {} func (e AuditEvent) MorphEvent() {}
@ -25,3 +30,17 @@ func NewAuditEvent(epoch uint64) event.Event {
func (e AuditEvent) Epoch() uint64 { func (e AuditEvent) Epoch() uint64 {
return e.epoch return e.epoch
} }
// NewBasicIncomeCollectEvent for epoch.
func NewBasicIncomeCollectEvent(epoch uint64) event.Event {
return BasicIncomeCollectEvent{
epoch: epoch,
}
}
// NewBasicIncomeDistributeEvent for epoch.
func NewBasicIncomeDistributeEvent(epoch uint64) event.Event {
return BasicIncomeDistributeEvent{
epoch: epoch,
}
}

View file

@ -2,7 +2,9 @@ package settlement
import ( import (
"fmt" "fmt"
"sync"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/basic"
nodeutil "github.com/nspcc-dev/neofs-node/pkg/util" nodeutil "github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
@ -10,19 +12,35 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type (
// ActiveState is a callback interface for inner ring global state
ActiveState interface {
IsActive() bool
}
// Processor is an event handler for payments in the system. // Processor is an event handler for payments in the system.
type Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
state ActiveState
pool nodeutil.WorkerPool pool nodeutil.WorkerPool
auditProc AuditProcessor auditProc AuditProcessor
basicIncome BasicIncomeInitializer
contextMu sync.Mutex
incomeContexts map[uint64]*basic.IncomeSettlementContext
} }
// Prm groups the required parameters of Processor's constructor. // Prm groups the required parameters of Processor's constructor.
type Prm struct { Prm struct {
AuditProcessor AuditProcessor AuditProcessor AuditProcessor
BasicIncome BasicIncomeInitializer
State ActiveState
} }
)
func panicOnPrmValue(n string, v interface{}) { func panicOnPrmValue(n string, v interface{}) {
panic(fmt.Sprintf("invalid parameter %s (%T):%v", n, v, v)) panic(fmt.Sprintf("invalid parameter %s (%T):%v", n, v, v))
@ -52,7 +70,10 @@ func New(prm Prm, opts ...Option) *Processor {
return &Processor{ return &Processor{
log: o.log, log: o.log,
state: prm.State,
pool: pool, pool: pool,
auditProc: prm.AuditProcessor, auditProc: prm.AuditProcessor,
basicIncome: prm.BasicIncome,
incomeContexts: make(map[uint64]*basic.IncomeSettlementContext),
} }
} }