From 4204a9f9203baf9d762b2c9a871253584f4b9e7f Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 28 Jan 2021 22:43:32 +0300 Subject: [PATCH] [#326] ir: Implement settlement processor Define a processor of events related to monetary transactions. Define audit-related event. Provide an interface for processing the audit payout event. Signed-off-by: Leonard Lyubich --- pkg/innerring/processors/settlement/calls.go | 42 ++++++++++++++ pkg/innerring/processors/settlement/deps.go | 7 +++ pkg/innerring/processors/settlement/events.go | 27 +++++++++ .../processors/settlement/handlers.go | 19 ++++++ pkg/innerring/processors/settlement/opts.go | 31 ++++++++++ .../processors/settlement/processor.go | 58 +++++++++++++++++++ 6 files changed, 184 insertions(+) create mode 100644 pkg/innerring/processors/settlement/calls.go create mode 100644 pkg/innerring/processors/settlement/deps.go create mode 100644 pkg/innerring/processors/settlement/events.go create mode 100644 pkg/innerring/processors/settlement/handlers.go create mode 100644 pkg/innerring/processors/settlement/opts.go create mode 100644 pkg/innerring/processors/settlement/processor.go diff --git a/pkg/innerring/processors/settlement/calls.go b/pkg/innerring/processors/settlement/calls.go new file mode 100644 index 00000000..71e3e24a --- /dev/null +++ b/pkg/innerring/processors/settlement/calls.go @@ -0,0 +1,42 @@ +package settlement + +import ( + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + "go.uber.org/zap" +) + +// HandleAuditEvent catches a new AuditEvent and +// adds AuditProcessor call to the execution queue. +func (p *Processor) HandleAuditEvent(e event.Event) { + ev := e.(AuditEvent) + + epoch := ev.Epoch() + + log := p.log.With( + zap.Uint64("epoch", epoch), + ) + + log.Info("new audit settlement event") + + if epoch == 0 { + log.Debug("ignore genesis epoch") + return + } + + handler := &auditEventHandler{ + log: log, + epoch: epoch, + proc: p.auditProc, + } + + err := p.pool.Submit(handler.handle) + if err != nil { + log.Warn("could not add handler of AuditEvent to queue", + zap.String("error", err.Error()), + ) + + return + } + + log.Debug("AuditEvent handling successfully scheduled") +} diff --git a/pkg/innerring/processors/settlement/deps.go b/pkg/innerring/processors/settlement/deps.go new file mode 100644 index 00000000..d5400ffa --- /dev/null +++ b/pkg/innerring/processors/settlement/deps.go @@ -0,0 +1,7 @@ +package settlement + +// AuditProcessor is an interface of data audit fee processor. +type AuditProcessor interface { + // Must process data audit conducted in epoch. + ProcessAuditSettlements(epoch uint64) +} diff --git a/pkg/innerring/processors/settlement/events.go b/pkg/innerring/processors/settlement/events.go new file mode 100644 index 00000000..276c4041 --- /dev/null +++ b/pkg/innerring/processors/settlement/events.go @@ -0,0 +1,27 @@ +package settlement + +import ( + "github.com/nspcc-dev/neofs-node/pkg/morph/event" +) + +// AuditEvent is an event of the start of +// cash settlements for data audit. +type AuditEvent struct { + epoch uint64 +} + +// MorphEvent implements Neo:Morph event. +func (e AuditEvent) MorphEvent() {} + +// NewAuditEvent creates new AuditEvent for epoch. +func NewAuditEvent(epoch uint64) event.Event { + return AuditEvent{ + epoch: epoch, + } +} + +// Epoch returns the number of the epoch +// in which the event was generated. +func (e AuditEvent) Epoch() uint64 { + return e.epoch +} diff --git a/pkg/innerring/processors/settlement/handlers.go b/pkg/innerring/processors/settlement/handlers.go new file mode 100644 index 00000000..f63b80a1 --- /dev/null +++ b/pkg/innerring/processors/settlement/handlers.go @@ -0,0 +1,19 @@ +package settlement + +import "github.com/nspcc-dev/neofs-node/pkg/util/logger" + +type auditEventHandler struct { + log *logger.Logger + + epoch uint64 + + proc AuditProcessor +} + +func (p *auditEventHandler) handle() { + p.log.Info("process audit settlements") + + p.proc.ProcessAuditSettlements(p.epoch) + + p.log.Info("audit processing finished") +} diff --git a/pkg/innerring/processors/settlement/opts.go b/pkg/innerring/processors/settlement/opts.go new file mode 100644 index 00000000..8196f55f --- /dev/null +++ b/pkg/innerring/processors/settlement/opts.go @@ -0,0 +1,31 @@ +package settlement + +import ( + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Option is a Processor constructor's option. +type Option func(*options) + +type options struct { + poolSize int + + log *logger.Logger +} + +func defaultOptions() *options { + const poolSize = 10 + + return &options{ + poolSize: poolSize, + log: zap.L(), + } +} + +// WithLogger returns option to override the component for logging. +func WithLogger(l *logger.Logger) Option { + return func(o *options) { + o.log = l + } +} diff --git a/pkg/innerring/processors/settlement/processor.go b/pkg/innerring/processors/settlement/processor.go new file mode 100644 index 00000000..c15172e5 --- /dev/null +++ b/pkg/innerring/processors/settlement/processor.go @@ -0,0 +1,58 @@ +package settlement + +import ( + "fmt" + + nodeutil "github.com/nspcc-dev/neofs-node/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "github.com/panjf2000/ants/v2" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// Processor is an event handler for payments in the system. +type Processor struct { + log *logger.Logger + + pool nodeutil.WorkerPool + + auditProc AuditProcessor +} + +// Prm groups the required parameters of Processor's constructor. +type Prm struct { + AuditProcessor AuditProcessor +} + +func panicOnPrmValue(n string, v interface{}) { + panic(fmt.Sprintf("invalid parameter %s (%T):%v", n, v, v)) +} + +// New creates and returns a new Processor instance. +func New(prm Prm, opts ...Option) *Processor { + switch { + case prm.AuditProcessor == nil: + panicOnPrmValue("AuditProcessor", prm.AuditProcessor) + } + + o := defaultOptions() + + for i := range opts { + opts[i](o) + } + + pool, err := ants.NewPool(o.poolSize, ants.WithNonblocking(true)) + if err != nil { + panic(errors.Wrap(err, "could not create worker pool")) + } + + o.log.Debug("worker pool for settlement processor successfully initialized", + zap.Int("capacity", o.poolSize), + ) + + return &Processor{ + log: o.log, + pool: pool, + auditProc: prm.AuditProcessor, + } +}