forked from TrueCloudLab/frostfs-node
[#326] ir: Implement settlement processor
Define a processor of events related to monetary transactions. Define audit-related event. Provide an interface for processing the audit payout event. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
aaeb13f7df
commit
4204a9f920
6 changed files with 184 additions and 0 deletions
42
pkg/innerring/processors/settlement/calls.go
Normal file
42
pkg/innerring/processors/settlement/calls.go
Normal file
|
@ -0,0 +1,42 @@
|
|||
package settlement
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// HandleAuditEvent catches a new AuditEvent and
|
||||
// adds AuditProcessor call to the execution queue.
|
||||
func (p *Processor) HandleAuditEvent(e event.Event) {
|
||||
ev := e.(AuditEvent)
|
||||
|
||||
epoch := ev.Epoch()
|
||||
|
||||
log := p.log.With(
|
||||
zap.Uint64("epoch", epoch),
|
||||
)
|
||||
|
||||
log.Info("new audit settlement event")
|
||||
|
||||
if epoch == 0 {
|
||||
log.Debug("ignore genesis epoch")
|
||||
return
|
||||
}
|
||||
|
||||
handler := &auditEventHandler{
|
||||
log: log,
|
||||
epoch: epoch,
|
||||
proc: p.auditProc,
|
||||
}
|
||||
|
||||
err := p.pool.Submit(handler.handle)
|
||||
if err != nil {
|
||||
log.Warn("could not add handler of AuditEvent to queue",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("AuditEvent handling successfully scheduled")
|
||||
}
|
7
pkg/innerring/processors/settlement/deps.go
Normal file
7
pkg/innerring/processors/settlement/deps.go
Normal file
|
@ -0,0 +1,7 @@
|
|||
package settlement
|
||||
|
||||
// AuditProcessor is an interface of data audit fee processor.
|
||||
type AuditProcessor interface {
|
||||
// Must process data audit conducted in epoch.
|
||||
ProcessAuditSettlements(epoch uint64)
|
||||
}
|
27
pkg/innerring/processors/settlement/events.go
Normal file
27
pkg/innerring/processors/settlement/events.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package settlement
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||
)
|
||||
|
||||
// AuditEvent is an event of the start of
|
||||
// cash settlements for data audit.
|
||||
type AuditEvent struct {
|
||||
epoch uint64
|
||||
}
|
||||
|
||||
// MorphEvent implements Neo:Morph event.
|
||||
func (e AuditEvent) MorphEvent() {}
|
||||
|
||||
// NewAuditEvent creates new AuditEvent for epoch.
|
||||
func NewAuditEvent(epoch uint64) event.Event {
|
||||
return AuditEvent{
|
||||
epoch: epoch,
|
||||
}
|
||||
}
|
||||
|
||||
// Epoch returns the number of the epoch
|
||||
// in which the event was generated.
|
||||
func (e AuditEvent) Epoch() uint64 {
|
||||
return e.epoch
|
||||
}
|
19
pkg/innerring/processors/settlement/handlers.go
Normal file
19
pkg/innerring/processors/settlement/handlers.go
Normal file
|
@ -0,0 +1,19 @@
|
|||
package settlement
|
||||
|
||||
import "github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
|
||||
type auditEventHandler struct {
|
||||
log *logger.Logger
|
||||
|
||||
epoch uint64
|
||||
|
||||
proc AuditProcessor
|
||||
}
|
||||
|
||||
func (p *auditEventHandler) handle() {
|
||||
p.log.Info("process audit settlements")
|
||||
|
||||
p.proc.ProcessAuditSettlements(p.epoch)
|
||||
|
||||
p.log.Info("audit processing finished")
|
||||
}
|
31
pkg/innerring/processors/settlement/opts.go
Normal file
31
pkg/innerring/processors/settlement/opts.go
Normal file
|
@ -0,0 +1,31 @@
|
|||
package settlement
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Option is a Processor constructor's option.
|
||||
type Option func(*options)
|
||||
|
||||
type options struct {
|
||||
poolSize int
|
||||
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultOptions() *options {
|
||||
const poolSize = 10
|
||||
|
||||
return &options{
|
||||
poolSize: poolSize,
|
||||
log: zap.L(),
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns option to override the component for logging.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(o *options) {
|
||||
o.log = l
|
||||
}
|
||||
}
|
58
pkg/innerring/processors/settlement/processor.go
Normal file
58
pkg/innerring/processors/settlement/processor.go
Normal file
|
@ -0,0 +1,58 @@
|
|||
package settlement
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
nodeutil "github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Processor is an event handler for payments in the system.
|
||||
type Processor struct {
|
||||
log *logger.Logger
|
||||
|
||||
pool nodeutil.WorkerPool
|
||||
|
||||
auditProc AuditProcessor
|
||||
}
|
||||
|
||||
// Prm groups the required parameters of Processor's constructor.
|
||||
type Prm struct {
|
||||
AuditProcessor AuditProcessor
|
||||
}
|
||||
|
||||
func panicOnPrmValue(n string, v interface{}) {
|
||||
panic(fmt.Sprintf("invalid parameter %s (%T):%v", n, v, v))
|
||||
}
|
||||
|
||||
// New creates and returns a new Processor instance.
|
||||
func New(prm Prm, opts ...Option) *Processor {
|
||||
switch {
|
||||
case prm.AuditProcessor == nil:
|
||||
panicOnPrmValue("AuditProcessor", prm.AuditProcessor)
|
||||
}
|
||||
|
||||
o := defaultOptions()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](o)
|
||||
}
|
||||
|
||||
pool, err := ants.NewPool(o.poolSize, ants.WithNonblocking(true))
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "could not create worker pool"))
|
||||
}
|
||||
|
||||
o.log.Debug("worker pool for settlement processor successfully initialized",
|
||||
zap.Int("capacity", o.poolSize),
|
||||
)
|
||||
|
||||
return &Processor{
|
||||
log: o.log,
|
||||
pool: pool,
|
||||
auditProc: prm.AuditProcessor,
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue