[#265] innerring: Generate new audit event

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-12-18 12:27:19 +03:00 committed by Alex Vanin
parent 8c4e033db3
commit 87e1252065
8 changed files with 158 additions and 1 deletions

View file

@ -62,6 +62,7 @@ func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("contracts.neofs", "")
cfg.SetDefault("contracts.balance", "")
cfg.SetDefault("contracts.container", "")
cfg.SetDefault("contracts.audit", "")
// alphabet contracts
cfg.SetDefault("contracts.alphabet.az", "")
cfg.SetDefault("contracts.alphabet.buky", "")

View file

@ -10,6 +10,7 @@ import (
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-node/pkg/innerring/invoke"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/balance"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/container"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/neofs"
@ -187,6 +188,18 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
return nil, err
}
// create audit processor
auditProcessor, err := audit.New(&audit.Params{
Log: log,
ContainerContract: server.contracts.container,
AuditContract: server.contracts.audit,
MorphClient: server.morphClient,
IRList: server,
})
if err != nil {
return nil, err
}
// create netmap processor
netmapProcessor, err := netmap.New(&netmap.Params{
Log: log,
@ -198,6 +211,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
ActiveState: server,
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
HandleAudit: auditProcessor.StartAuditHandler(),
})
if err != nil {
return nil, err
@ -290,7 +304,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
}
// todo: create vivid id component
// todo: create audit scheduler
return server, nil
}
@ -338,6 +351,7 @@ func parseContracts(cfg *viper.Viper) (*contracts, error) {
balanceContractStr := cfg.GetString("contracts.balance")
nativeGasContractStr := cfg.GetString("contracts.gas")
containerContractStr := cfg.GetString("contracts.container")
auditContractStr := cfg.GetString("contracts.audit")
result.netmap, err = util.Uint160DecodeStringLE(netmapContractStr)
if err != nil {
@ -364,6 +378,11 @@ func parseContracts(cfg *viper.Viper) (*contracts, error) {
return nil, errors.Wrap(err, "ir: can't read container script-hash")
}
result.audit, err = util.Uint160DecodeStringLE(auditContractStr)
if err != nil {
return nil, errors.Wrap(err, "ir: can't read audit script-hash")
}
result.alphabet, err = parseAlphabetContracts(cfg)
if err != nil {
return nil, err

View file

@ -0,0 +1,19 @@
package audit
// Start is a event to start new round of data audit.
type Start struct {
epoch uint64
}
// MorphEvent implements Event interface.
func (a Start) MorphEvent() {}
func NewAuditStartEvent(epoch uint64) Start {
return Start{
epoch: epoch,
}
}
func (a Start) Epoch() uint64 {
return a.epoch
}

View file

@ -0,0 +1,18 @@
package audit
import (
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"go.uber.org/zap"
)
func (ap *Processor) handleNewAuditRound(ev event.Event) {
auditEvent := ev.(Start)
ap.log.Info("new round of audit", zap.Uint64("epoch", auditEvent.epoch))
// send event to the worker pool
err := ap.pool.Submit(func() { ap.processStartAudit(auditEvent.epoch) })
if err != nil {
ap.log.Warn("previous round of audit prepare hasn't finished yet")
}
}

View file

@ -0,0 +1,5 @@
package audit
func (ap *Processor) processStartAudit(epoch uint64) {
ap.log.Info("flushing left audit results and refilling queue")
}

View file

@ -0,0 +1,87 @@
package audit
import (
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/panjf2000/ants/v2"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type (
// Indexer is a callback interface for inner ring global state.
Indexer interface {
Index() int32
}
// Processor of events related with data audit.
Processor struct {
log *zap.Logger
pool *ants.Pool
containerContract util.Uint160
auditContract util.Uint160
morphClient *client.Client
irList Indexer
}
// Params of the processor constructor.
Params struct {
Log *zap.Logger
ContainerContract util.Uint160
AuditContract util.Uint160
MorphClient *client.Client
IRList Indexer
}
)
// AuditProcessor manages audit tasks and fills queue for next epoch. This
// process must not be interrupted by new audit epoch, so we limit pool size
// for processor to one.
const ProcessorPoolSize = 1
// New creates audit processor instance.
func New(p *Params) (*Processor, error) {
switch {
case p.Log == nil:
return nil, errors.New("ir/audit: logger is not set")
case p.MorphClient == nil:
return nil, errors.New("ir/audit: neo:morph client is not set")
case p.IRList == nil:
return nil, errors.New("ir/audit: global state is not set")
}
pool, err := ants.NewPool(ProcessorPoolSize, ants.WithNonblocking(true))
if err != nil {
return nil, errors.Wrap(err, "ir/audit: can't create worker pool")
}
return &Processor{
log: p.Log,
pool: pool,
containerContract: p.ContainerContract,
auditContract: p.AuditContract,
morphClient: p.MorphClient,
irList: p.IRList,
}, nil
}
// ListenerParsers for the 'event.Listener' event producer.
func (ap *Processor) ListenerParsers() []event.ParserInfo {
return nil
}
// ListenerHandlers for the 'event.Listener' event producer.
func (ap *Processor) ListenerHandlers() []event.HandlerInfo {
return nil
}
// TimersHandlers for the 'Timers' event producer.
func (ap *Processor) TimersHandlers() []event.HandlerInfo {
return nil
}
// StartAuditHandler for the internal event producer.
func (ap *Processor) StartAuditHandler() event.Handler {
return ap.handleNewAuditRound
}

View file

@ -2,6 +2,7 @@ package netmap
import (
"github.com/nspcc-dev/neofs-node/pkg/innerring/invoke"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit"
"go.uber.org/zap"
)
@ -22,6 +23,7 @@ func (np *Processor) processNewEpoch(epoch uint64) {
np.netmapSnapshot.update(snapshot, epoch)
np.handleCleanupTick(netmapCleanupTick{epoch: epoch})
np.handleNewAudit(audit.NewAuditStartEvent(epoch))
}
// Process new epoch tick by invoking new epoch method in network map contract.

View file

@ -40,6 +40,8 @@ type (
morphClient *client.Client
netmapSnapshot cleanupTable
handleNewAudit event.Handler
}
// Params of the processor constructor.
@ -53,6 +55,7 @@ type (
ActiveState ActiveState
CleanupEnabled bool
CleanupThreshold uint64 // in epochs
HandleAudit event.Handler
}
)
@ -75,6 +78,8 @@ func New(p *Params) (*Processor, error) {
return nil, errors.New("ir/netmap: global state is not set")
case p.ActiveState == nil:
return nil, errors.New("ir/netmap: global state is not set")
case p.HandleAudit == nil:
return nil, errors.New("ir/netmap: audit handler is not set")
}
p.Log.Debug("netmap worker pool", zap.Int("size", p.PoolSize))
@ -93,6 +98,7 @@ func New(p *Params) (*Processor, error) {
activeState: p.ActiveState,
morphClient: p.MorphClient,
netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold),
handleNewAudit: p.HandleAudit,
}, nil
}