diff --git a/cmd/neofs-ir/defaults.go b/cmd/neofs-ir/defaults.go index 2b3fd7e11..31805b02b 100644 --- a/cmd/neofs-ir/defaults.go +++ b/cmd/neofs-ir/defaults.go @@ -82,4 +82,5 @@ func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("workers.balance", "10") cfg.SetDefault("workers.neofs", "10") cfg.SetDefault("workers.container", "10") + cfg.SetDefault("workers.alphabet", "10") } diff --git a/pkg/innerring/invoke/alphabet.go b/pkg/innerring/invoke/alphabet.go new file mode 100644 index 000000000..3621317f3 --- /dev/null +++ b/pkg/innerring/invoke/alphabet.go @@ -0,0 +1,20 @@ +package invoke + +import ( + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/morph/client" +) + +const ( + emitMethod = "emit" +) + +// AlphabetEmit invokes emit method on alphabet contract. +func AlphabetEmit(cli *client.Client, con util.Uint160) error { + if cli == nil { + return client.ErrNilClient + } + + // there is no signature collecting, so we don't need extra fee + return cli.Invoke(con, 0, emitMethod) +} diff --git a/pkg/innerring/processors/alphabet/handlers.go b/pkg/innerring/processors/alphabet/handlers.go new file mode 100644 index 000000000..dfdb71385 --- /dev/null +++ b/pkg/innerring/processors/alphabet/handlers.go @@ -0,0 +1,21 @@ +package alphabet + +import ( + "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + "go.uber.org/zap" +) + +func (np *Processor) handleGasEmission(ev event.Event) { + _ = ev.(timers.NewAlphabetEmitTick) + np.log.Info("tick", zap.String("type", "alphabet gas emit")) + + // send event to the worker pool + + err := np.pool.Submit(func() { np.processEmit() }) + if err != nil { + // there system can be moved into controlled degradation stage + np.log.Warn("alphabet processor worker pool drained", + zap.Int("capacity", np.pool.Cap())) + } +} diff --git a/pkg/innerring/processors/alphabet/process_emit.go b/pkg/innerring/processors/alphabet/process_emit.go new file mode 100644 index 000000000..9a5985bdf --- /dev/null +++ b/pkg/innerring/processors/alphabet/process_emit.go @@ -0,0 +1,23 @@ +package alphabet + +import ( + "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" + "go.uber.org/zap" +) + +func (np *Processor) processEmit() { + index := np.irList.Index() + if index < 0 { + np.log.Info("passive mode, ignore gas emission event") + return + } else if int(index) >= len(np.alphabetContracts) { + np.log.Debug("node is out of alphabet range, ignore gas emission event", + zap.Int32("index", index)) + return + } + + err := invoke.AlphabetEmit(np.morphClient, np.alphabetContracts[index]) + if err != nil { + np.log.Warn("can't invoke alphabet emit method") + } +} diff --git a/pkg/innerring/processors/alphabet/processor.go b/pkg/innerring/processors/alphabet/processor.go new file mode 100644 index 000000000..0e92b19e0 --- /dev/null +++ b/pkg/innerring/processors/alphabet/processor.go @@ -0,0 +1,86 @@ +package alphabet + +import ( + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" + "github.com/nspcc-dev/neofs-node/pkg/morph/client" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + "github.com/panjf2000/ants/v2" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +type ( + // Indexer is a callback interface for inner ring global state. + Indexer interface { + Index() int32 + } + + // Processor of events produced for alphabet contracts in sidechain. + Processor struct { + log *zap.Logger + pool *ants.Pool + alphabetContracts [7]util.Uint160 + morphClient *client.Client + irList Indexer + } + + // Params of the processor constructor. + Params struct { + Log *zap.Logger + PoolSize int + AlphabetContracts [7]util.Uint160 + MorphClient *client.Client + IRList Indexer + } +) + +// New creates neofs mainnet contract processor instance. +func New(p *Params) (*Processor, error) { + switch { + case p.Log == nil: + return nil, errors.New("ir/alphabet: logger is not set") + case p.MorphClient == nil: + return nil, errors.New("ir/alphabet: neo:morph client is not set") + case p.IRList == nil: + return nil, errors.New("ir/alphabet: global state is not set") + } + + p.Log.Debug("alphabet worker pool", zap.Int("size", p.PoolSize)) + + pool, err := ants.NewPool(p.PoolSize, ants.WithNonblocking(true)) + if err != nil { + return nil, errors.Wrap(err, "ir/neofs: can't create worker pool") + } + + return &Processor{ + log: p.Log, + pool: pool, + alphabetContracts: p.AlphabetContracts, + morphClient: p.MorphClient, + irList: p.IRList, + }, nil +} + +// ListenerParsers for the 'event.Listener' event producer. +func (np *Processor) ListenerParsers() []event.ParserInfo { + return nil +} + +// ListenerHandlers for the 'event.Listener' event producer. +func (np *Processor) ListenerHandlers() []event.HandlerInfo { + return nil +} + +// TimersHandlers for the 'Timers' event producer. +func (np *Processor) TimersHandlers() []event.HandlerInfo { + var handlers []event.HandlerInfo + + // new epoch handler + newEpoch := event.HandlerInfo{} + newEpoch.SetType(timers.AlphabetTimer) + newEpoch.SetHandler(np.handleGasEmission) + handlers = append(handlers, newEpoch) + + return handlers +}