From db3ccd2762c54b36e3b5cfaf1d448f4c31db7ba0 Mon Sep 17 00:00:00 2001 From: Artem Tataurov Date: Thu, 9 Mar 2023 16:19:39 +0300 Subject: [PATCH] [#128] innerring: Add GAS pouring mechanism for a configurable list of wallets Signed-off-by: Artem Tataurov --- CHANGELOG.md | 1 + cmd/frostfs-ir/defaults.go | 1 + config/example/ir.yaml | 4 +++ pkg/innerring/innerring.go | 26 ++++++++++++++ .../processors/alphabet/process_emit.go | 15 +++++++- .../processors/alphabet/processor.go | 3 ++ pkg/morph/client/client.go | 34 +++++++++++++++++++ 7 files changed, 83 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e9ba6d06..cfc4eaf43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Changelog for FrostFS Node ## [Unreleased] ### Added +- Add GAS pouring mechanism for a configurable list of wallets (#128) - Separate batching for replicated operations over the same container in pilorama (#1621) - Doc for extended headers (#2128) - New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116) diff --git a/cmd/frostfs-ir/defaults.go b/cmd/frostfs-ir/defaults.go index fa26eef25..8c313fa41 100644 --- a/cmd/frostfs-ir/defaults.go +++ b/cmd/frostfs-ir/defaults.go @@ -102,6 +102,7 @@ func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("emit.mint.threshold", 1) cfg.SetDefault("emit.mint.value", 20000000) // 0.2 Fixed8 cfg.SetDefault("emit.gas.balance_threshold", 0) + cfg.SetDefault("emit.extra_wallets", nil) cfg.SetDefault("audit.task.exec_pool_size", 10) cfg.SetDefault("audit.task.queue_capacity", 100) diff --git a/config/example/ir.yaml b/config/example/ir.yaml index ae5433f67..3dca0017a 100644 --- a/config/example/ir.yaml +++ b/config/example/ir.yaml @@ -72,6 +72,10 @@ emit: threshold: 1 # Lifetime of records in LRU cache of all deposit receivers in FrostFS epochs gas: balance_threshold: 100000000000 # Fixed8 value of inner ring wallet balance threshold when GAS emission for deposit receivers is disabled; disabled by default + extra_wallets: # wallet addresses that are included in gas emission process in equal share with network map nodes + - "NQcfMqU6pfXFwaaBN6KHcTpT63eMtzk6eH" + - "NaSVC4xKySQBpKr1XRVYFCHjLhuYXnMBrP" + - "NT9jL5XcxcDt2iTj67o2d5xNfDxquN3pPk" workers: alphabet: 10 # Number of workers to process events from alphabet contract in parallel diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index aeb5b15b0..33cfc39a6 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -49,7 +49,9 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" + "github.com/nspcc-dev/neo-go/pkg/util" "github.com/panjf2000/ants/v2" "github.com/spf13/viper" "go.uber.org/atomic" @@ -803,8 +805,14 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan } } + parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets")) + if err != nil { + return nil, err + } + // create alphabet processor alphabetProcessor, err := alphabet.New(&alphabet.Params{ + ParsedWallets: parsedWallets, Log: log, PoolSize: cfg.GetInt("workers.alphabet"), AlphabetContracts: server.contracts.alphabet, @@ -1034,6 +1042,24 @@ func ParsePublicKeysFromStrings(pubKeys []string) (keys.PublicKeys, error) { return publicKeys, nil } +// parseWalletAddressesFromStrings returns a slice of util.Uint160 from a slice +// of strings. +func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) { + if len(wallets) == 0 { + return nil, nil + } + + var err error + extraWallets := make([]util.Uint160, len(wallets)) + for i := range wallets { + extraWallets[i], err = address.StringToUint160(wallets[i]) + if err != nil { + return nil, err + } + } + return extraWallets, nil +} + func (s *Server) initConfigFromBlockchain() error { // get current epoch epoch, err := s.netmapClient.Epoch() diff --git a/pkg/innerring/processors/alphabet/process_emit.go b/pkg/innerring/processors/alphabet/process_emit.go index a93eec528..353adb455 100644 --- a/pkg/innerring/processors/alphabet/process_emit.go +++ b/pkg/innerring/processors/alphabet/process_emit.go @@ -57,7 +57,7 @@ func (ap *Processor) processEmit() { return } - gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(ln)) + gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(ln+len(ap.parsedWallets))) for i := range nmNodes { keyBytes := nmNodes[i].PublicKey() @@ -79,4 +79,17 @@ func (ap *Processor) processEmit() { ) } } + + err = ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode) + if err != nil { + receiversLog := make([]string, len(ap.parsedWallets)) + for i, addr := range ap.parsedWallets { + receiversLog[i] = addr.StringLE() + } + ap.log.Warn("can't transfer gas to wallet", + zap.Strings("receivers", receiversLog), + zap.Int64("amount", int64(gasPerNode)), + zap.String("error", err.Error()), + ) + } } diff --git a/pkg/innerring/processors/alphabet/processor.go b/pkg/innerring/processors/alphabet/processor.go index f57d281ee..980158132 100644 --- a/pkg/innerring/processors/alphabet/processor.go +++ b/pkg/innerring/processors/alphabet/processor.go @@ -33,6 +33,7 @@ type ( // Processor of events produced for alphabet contracts in the sidechain. Processor struct { + parsedWallets []util.Uint160 log *logger.Logger pool *ants.Pool alphabetContracts Contracts @@ -44,6 +45,7 @@ type ( // Params of the processor constructor. Params struct { + ParsedWallets []util.Uint160 Log *logger.Logger PoolSize int AlphabetContracts Contracts @@ -73,6 +75,7 @@ func New(p *Params) (*Processor, error) { } return &Processor{ + parsedWallets: p.ParsedWallets, log: p.Log, pool: pool, alphabetContracts: p.AlphabetContracts, diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 4aae6dbfd..51a030e63 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -244,6 +244,40 @@ func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error return nil } +func (c *Client) BatchTransferGas(receivers []util.Uint160, amount fixedn.Fixed8) error { + c.switchLock.RLock() + defer c.switchLock.RUnlock() + + if c.inactive { + return ErrConnectionLost + } + + transferParams := make([]nep17.TransferParameters, len(receivers)) + receiversLog := make([]string, len(receivers)) + + for i, receiver := range receivers { + transferParams[i] = nep17.TransferParameters{ + From: c.accAddr, + To: receiver, + Amount: big.NewInt(int64(amount)), + Data: nil, + } + receiversLog[i] = receiver.StringLE() + } + + txHash, vub, err := c.gasToken.MultiTransfer(transferParams) + if err != nil { + return err + } + + c.logger.Debug("batch gas transfer invoke", + zap.Strings("to", receiversLog), + zap.Stringer("tx_hash", txHash.Reverse()), + zap.Uint32("vub", vub)) + + return nil +} + // Wait function blocks routing execution until there // are `n` new blocks in the chain. //