diff --git a/cmd/neofs-ir/defaults.go b/cmd/neofs-ir/defaults.go index 3b4f4522..e527d73b 100644 --- a/cmd/neofs-ir/defaults.go +++ b/cmd/neofs-ir/defaults.go @@ -62,6 +62,7 @@ func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("contracts.neofs", "") cfg.SetDefault("contracts.balance", "") cfg.SetDefault("contracts.container", "") + cfg.SetDefault("contracts.audit", "") // alphabet contracts cfg.SetDefault("contracts.alphabet.az", "") cfg.SetDefault("contracts.alphabet.buky", "") diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index ad4db2a9..be4bc262 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -10,6 +10,7 @@ import ( crypto "github.com/nspcc-dev/neofs-crypto" "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/balance" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/container" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/neofs" @@ -187,6 +188,18 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } + // create audit processor + auditProcessor, err := audit.New(&audit.Params{ + Log: log, + ContainerContract: server.contracts.container, + AuditContract: server.contracts.audit, + MorphClient: server.morphClient, + IRList: server, + }) + if err != nil { + return nil, err + } + // create netmap processor netmapProcessor, err := netmap.New(&netmap.Params{ Log: log, @@ -198,6 +211,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error ActiveState: server, CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), + HandleAudit: auditProcessor.StartAuditHandler(), }) if err != nil { return nil, err @@ -290,7 +304,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error } // todo: create vivid id component - // todo: create audit scheduler return server, nil } @@ -338,6 +351,7 @@ func parseContracts(cfg *viper.Viper) (*contracts, error) { balanceContractStr := cfg.GetString("contracts.balance") nativeGasContractStr := cfg.GetString("contracts.gas") containerContractStr := cfg.GetString("contracts.container") + auditContractStr := cfg.GetString("contracts.audit") result.netmap, err = util.Uint160DecodeStringLE(netmapContractStr) if err != nil { @@ -364,6 +378,11 @@ func parseContracts(cfg *viper.Viper) (*contracts, error) { return nil, errors.Wrap(err, "ir: can't read container script-hash") } + result.audit, err = util.Uint160DecodeStringLE(auditContractStr) + if err != nil { + return nil, errors.Wrap(err, "ir: can't read audit script-hash") + } + result.alphabet, err = parseAlphabetContracts(cfg) if err != nil { return nil, err diff --git a/pkg/innerring/processors/audit/events.go b/pkg/innerring/processors/audit/events.go new file mode 100644 index 00000000..32ebe7bc --- /dev/null +++ b/pkg/innerring/processors/audit/events.go @@ -0,0 +1,19 @@ +package audit + +// Start is a event to start new round of data audit. +type Start struct { + epoch uint64 +} + +// MorphEvent implements Event interface. +func (a Start) MorphEvent() {} + +func NewAuditStartEvent(epoch uint64) Start { + return Start{ + epoch: epoch, + } +} + +func (a Start) Epoch() uint64 { + return a.epoch +} diff --git a/pkg/innerring/processors/audit/handlers.go b/pkg/innerring/processors/audit/handlers.go new file mode 100644 index 00000000..53a5a1ea --- /dev/null +++ b/pkg/innerring/processors/audit/handlers.go @@ -0,0 +1,18 @@ +package audit + +import ( + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + "go.uber.org/zap" +) + +func (ap *Processor) handleNewAuditRound(ev event.Event) { + auditEvent := ev.(Start) + ap.log.Info("new round of audit", zap.Uint64("epoch", auditEvent.epoch)) + + // send event to the worker pool + + err := ap.pool.Submit(func() { ap.processStartAudit(auditEvent.epoch) }) + if err != nil { + ap.log.Warn("previous round of audit prepare hasn't finished yet") + } +} diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go new file mode 100644 index 00000000..c1bbcdac --- /dev/null +++ b/pkg/innerring/processors/audit/process.go @@ -0,0 +1,5 @@ +package audit + +func (ap *Processor) processStartAudit(epoch uint64) { + ap.log.Info("flushing left audit results and refilling queue") +} diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go new file mode 100644 index 00000000..32ad06d8 --- /dev/null +++ b/pkg/innerring/processors/audit/processor.go @@ -0,0 +1,87 @@ +package audit + +import ( + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/morph/client" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + "github.com/panjf2000/ants/v2" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +type ( + // Indexer is a callback interface for inner ring global state. + Indexer interface { + Index() int32 + } + + // Processor of events related with data audit. + Processor struct { + log *zap.Logger + pool *ants.Pool + containerContract util.Uint160 + auditContract util.Uint160 + morphClient *client.Client + irList Indexer + } + + // Params of the processor constructor. + Params struct { + Log *zap.Logger + ContainerContract util.Uint160 + AuditContract util.Uint160 + MorphClient *client.Client + IRList Indexer + } +) + +// AuditProcessor manages audit tasks and fills queue for next epoch. This +// process must not be interrupted by new audit epoch, so we limit pool size +// for processor to one. +const ProcessorPoolSize = 1 + +// New creates audit processor instance. +func New(p *Params) (*Processor, error) { + switch { + case p.Log == nil: + return nil, errors.New("ir/audit: logger is not set") + case p.MorphClient == nil: + return nil, errors.New("ir/audit: neo:morph client is not set") + case p.IRList == nil: + return nil, errors.New("ir/audit: global state is not set") + } + + pool, err := ants.NewPool(ProcessorPoolSize, ants.WithNonblocking(true)) + if err != nil { + return nil, errors.Wrap(err, "ir/audit: can't create worker pool") + } + + return &Processor{ + log: p.Log, + pool: pool, + containerContract: p.ContainerContract, + auditContract: p.AuditContract, + morphClient: p.MorphClient, + irList: p.IRList, + }, nil +} + +// ListenerParsers for the 'event.Listener' event producer. +func (ap *Processor) ListenerParsers() []event.ParserInfo { + return nil +} + +// ListenerHandlers for the 'event.Listener' event producer. +func (ap *Processor) ListenerHandlers() []event.HandlerInfo { + return nil +} + +// TimersHandlers for the 'Timers' event producer. +func (ap *Processor) TimersHandlers() []event.HandlerInfo { + return nil +} + +// StartAuditHandler for the internal event producer. +func (ap *Processor) StartAuditHandler() event.Handler { + return ap.handleNewAuditRound +} diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index 8736f312..069b0acf 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -2,6 +2,7 @@ package netmap import ( "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" "go.uber.org/zap" ) @@ -22,6 +23,7 @@ func (np *Processor) processNewEpoch(epoch uint64) { np.netmapSnapshot.update(snapshot, epoch) np.handleCleanupTick(netmapCleanupTick{epoch: epoch}) + np.handleNewAudit(audit.NewAuditStartEvent(epoch)) } // Process new epoch tick by invoking new epoch method in network map contract. diff --git a/pkg/innerring/processors/netmap/processor.go b/pkg/innerring/processors/netmap/processor.go index e406d089..7a71c871 100644 --- a/pkg/innerring/processors/netmap/processor.go +++ b/pkg/innerring/processors/netmap/processor.go @@ -40,6 +40,8 @@ type ( morphClient *client.Client netmapSnapshot cleanupTable + + handleNewAudit event.Handler } // Params of the processor constructor. @@ -53,6 +55,7 @@ type ( ActiveState ActiveState CleanupEnabled bool CleanupThreshold uint64 // in epochs + HandleAudit event.Handler } ) @@ -75,6 +78,8 @@ func New(p *Params) (*Processor, error) { return nil, errors.New("ir/netmap: global state is not set") case p.ActiveState == nil: return nil, errors.New("ir/netmap: global state is not set") + case p.HandleAudit == nil: + return nil, errors.New("ir/netmap: audit handler is not set") } p.Log.Debug("netmap worker pool", zap.Int("size", p.PoolSize)) @@ -93,6 +98,7 @@ func New(p *Params) (*Processor, error) { activeState: p.ActiveState, morphClient: p.MorphClient, netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold), + handleNewAudit: p.HandleAudit, }, nil }