[#326] ir: Make netmap processor to generate audit settlement events

Pass handler of audit settlement event to netmap event processor. Generate
AuditEvent in during new epoch processing.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-01-28 22:51:41 +03:00 committed by Alex Vanin
parent 4204a9f920
commit d4bd726c25
4 changed files with 65 additions and 60 deletions

View file

@ -16,13 +16,13 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/container" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/container"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/neofs" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/neofs"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement"
auditSettlement "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit" auditSettlement "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit"
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
"github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client"
auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper" auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper"
balanceWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper" balanceWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event"
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber"
audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager" audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager"
util2 "github.com/nspcc-dev/neofs-node/pkg/util" util2 "github.com/nspcc-dev/neofs-node/pkg/util"
@ -63,8 +63,6 @@ type (
predefinedValidators []keys.PublicKey predefinedValidators []keys.PublicKey
workers []func(context.Context) workers []func(context.Context)
auditSettlement *auditSettlement.Calculator
} }
contracts struct { contracts struct {
@ -227,6 +225,21 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
return nil, err return nil, err
} }
cnrClient, err := invoke.NewNoFeeContainerClient(server.morphClient, server.contracts.container)
if err != nil {
return nil, err
}
nmClient, err := invoke.NewNoFeeNetmapClient(server.morphClient, server.contracts.netmap)
if err != nil {
return nil, err
}
balClient, err := invoke.NewBalanceClient(server.morphClient, server.contracts.balance)
if err != nil {
return nil, err
}
clientCache := newClientCache(&clientCacheParams{ clientCache := newClientCache(&clientCacheParams{
Log: log, Log: log,
Key: server.key, Key: server.key,
@ -271,6 +284,34 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
return nil, err return nil, err
} }
auditCalcDeps := &auditSettlementDeps{
log: server.log,
cnrSrc: cnrClient,
auditClient: server.auditClient,
nmSrc: nmClient,
clientCache: clientCache,
balanceClient: balClient,
}
auditSettlementCalc := auditSettlement.NewCalculator(
&auditSettlement.CalculatorPrm{
ResultStorage: auditCalcDeps,
ContainerStorage: auditCalcDeps,
PlacementCalculator: auditCalcDeps,
SGStorage: auditCalcDeps,
AccountStorage: auditCalcDeps,
Exchanger: auditCalcDeps,
},
auditSettlement.WithLogger(server.log),
)
settlementProcessor := settlement.New(
settlement.Prm{
AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc),
},
settlement.WithLogger(server.log),
)
var netmapProcessor *netmap.Processor var netmapProcessor *netmap.Processor
server.epochTimer = timers.NewBlockTimer( server.epochTimer = timers.NewBlockTimer(
@ -294,6 +335,8 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
HandleAudit: auditProcessor.StartAuditHandler(), HandleAudit: auditProcessor.StartAuditHandler(),
AuditSettlementsHandler: server.onlyActiveEventHandler(settlementProcessor.HandleAuditEvent),
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -394,48 +437,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
return nil, err return nil, err
} }
cnrClient, err := invoke.NewNoFeeContainerClient(server.morphClient, server.contracts.container)
if err != nil {
return nil, err
}
nmClient, err := invoke.NewNoFeeNetmapClient(server.morphClient, server.contracts.netmap)
if err != nil {
return nil, err
}
balClient, err := invoke.NewBalanceClient(server.morphClient, server.contracts.balance)
if err != nil {
return nil, err
}
auditCalcDeps := &auditSettlementDeps{
log: server.log,
cnrSrc: cnrClient,
auditClient: server.auditClient,
nmSrc: nmClient,
clientCache: clientCache,
balanceClient: balClient,
}
server.auditSettlement = auditSettlement.NewCalculator(
&auditSettlement.CalculatorPrm{
ResultStorage: auditCalcDeps,
ContainerStorage: auditCalcDeps,
PlacementCalculator: auditCalcDeps,
SGStorage: auditCalcDeps,
AccountStorage: auditCalcDeps,
Exchanger: auditCalcDeps,
},
auditSettlement.WithLogger(server.log),
)
server.subscribeNewEpoch(func(e netmapEvent.NewEpoch) {
server.auditSettlement.Calculate(&auditSettlement.CalculatePrm{
Epoch: e.EpochNumber(),
})
})
// todo: create vivid id component // todo: create vivid id component
return server, nil return server, nil
@ -618,21 +619,6 @@ func (s *Server) tickTimers() {
} }
} }
func (s *Server) subscribeNewEpoch(f func(netmapEvent.NewEpoch)) {
hi := event.HandlerInfo{}
// TODO: replace and share
const newEpochNotification = "NewEpoch"
hi.SetType(event.TypeFromString(newEpochNotification))
hi.SetScriptHash(s.contracts.netmap)
hi.SetHandler(s.onlyActiveEventHandler(func(ev event.Event) {
f(ev.(netmapEvent.NewEpoch))
}))
s.morphListener.RegisterHandler(hi)
}
func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler { func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) { return func(ev event.Event) {
if s.IsActive() { if s.IsActive() {

View file

@ -3,6 +3,7 @@ package netmap
import ( import (
"github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -27,6 +28,7 @@ func (np *Processor) processNewEpoch(epoch uint64) {
np.netmapSnapshot.update(snapshot, epoch) np.netmapSnapshot.update(snapshot, epoch)
np.handleCleanupTick(netmapCleanupTick{epoch: epoch}) np.handleCleanupTick(netmapCleanupTick{epoch: epoch})
np.handleNewAudit(audit.NewAuditStartEvent(epoch)) np.handleNewAudit(audit.NewAuditStartEvent(epoch))
np.handleAuditSettlements(settlement.NewAuditEvent(epoch))
} }
// Process new epoch tick by invoking new epoch method in network map contract. // Process new epoch tick by invoking new epoch method in network map contract.

View file

@ -41,6 +41,8 @@ type (
netmapSnapshot cleanupTable netmapSnapshot cleanupTable
handleNewAudit event.Handler handleNewAudit event.Handler
handleAuditSettlements event.Handler
} }
// Params of the processor constructor. // Params of the processor constructor.
@ -55,6 +57,8 @@ type (
CleanupEnabled bool CleanupEnabled bool
CleanupThreshold uint64 // in epochs CleanupThreshold uint64 // in epochs
HandleAudit event.Handler HandleAudit event.Handler
AuditSettlementsHandler event.Handler
} }
) )
@ -79,6 +83,8 @@ func New(p *Params) (*Processor, error) {
return nil, errors.New("ir/netmap: global state is not set") return nil, errors.New("ir/netmap: global state is not set")
case p.HandleAudit == nil: case p.HandleAudit == nil:
return nil, errors.New("ir/netmap: audit handler is not set") return nil, errors.New("ir/netmap: audit handler is not set")
case p.AuditSettlementsHandler == nil:
return nil, errors.New("ir/netmap: audit settlement handler is not set")
} }
p.Log.Debug("netmap worker pool", zap.Int("size", p.PoolSize)) p.Log.Debug("netmap worker pool", zap.Int("size", p.PoolSize))
@ -98,6 +104,8 @@ func New(p *Params) (*Processor, error) {
morphClient: p.MorphClient, morphClient: p.MorphClient,
netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold), netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold),
handleNewAudit: p.HandleAudit, handleNewAudit: p.HandleAudit,
handleAuditSettlements: p.AuditSettlementsHandler,
}, nil }, nil
} }

View file

@ -1,6 +1,7 @@
package innerring package innerring
import ( import (
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit"
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
) )
@ -9,3 +10,11 @@ type blockTimerWrapper timers.BlockTimer
func (t *blockTimerWrapper) ResetEpochTimer() error { func (t *blockTimerWrapper) ResetEpochTimer() error {
return (*timers.BlockTimer)(t).Reset() return (*timers.BlockTimer)(t).Reset()
} }
type auditSettlementCalculator audit.Calculator
func (s *auditSettlementCalculator) ProcessAuditSettlements(epoch uint64) {
(*audit.Calculator)(s).Calculate(&audit.CalculatePrm{
Epoch: epoch,
})
}