[] innerring: Add GAS pouring mechanism for a configurable list of wallets

Signed-off-by: Artem Tataurov <a.tataurov@yadro.com>
This commit is contained in:
Artem Tataurov 2023-03-09 16:19:39 +03:00 committed by Gitea
parent abd21f8099
commit db3ccd2762
7 changed files with 83 additions and 1 deletions
CHANGELOG.md
cmd/frostfs-ir
config/example
pkg
innerring
morph/client

View file

@ -4,6 +4,7 @@ Changelog for FrostFS Node
## [Unreleased] ## [Unreleased]
### Added ### Added
- Add GAS pouring mechanism for a configurable list of wallets (#128)
- Separate batching for replicated operations over the same container in pilorama (#1621) - Separate batching for replicated operations over the same container in pilorama (#1621)
- Doc for extended headers (#2128) - Doc for extended headers (#2128)
- New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116) - New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116)

View file

@ -102,6 +102,7 @@ func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("emit.mint.threshold", 1) cfg.SetDefault("emit.mint.threshold", 1)
cfg.SetDefault("emit.mint.value", 20000000) // 0.2 Fixed8 cfg.SetDefault("emit.mint.value", 20000000) // 0.2 Fixed8
cfg.SetDefault("emit.gas.balance_threshold", 0) cfg.SetDefault("emit.gas.balance_threshold", 0)
cfg.SetDefault("emit.extra_wallets", nil)
cfg.SetDefault("audit.task.exec_pool_size", 10) cfg.SetDefault("audit.task.exec_pool_size", 10)
cfg.SetDefault("audit.task.queue_capacity", 100) cfg.SetDefault("audit.task.queue_capacity", 100)

View file

@ -72,6 +72,10 @@ emit:
threshold: 1 # Lifetime of records in LRU cache of all deposit receivers in FrostFS epochs threshold: 1 # Lifetime of records in LRU cache of all deposit receivers in FrostFS epochs
gas: gas:
balance_threshold: 100000000000 # Fixed8 value of inner ring wallet balance threshold when GAS emission for deposit receivers is disabled; disabled by default balance_threshold: 100000000000 # Fixed8 value of inner ring wallet balance threshold when GAS emission for deposit receivers is disabled; disabled by default
extra_wallets: # wallet addresses that are included in gas emission process in equal share with network map nodes
- "NQcfMqU6pfXFwaaBN6KHcTpT63eMtzk6eH"
- "NaSVC4xKySQBpKr1XRVYFCHjLhuYXnMBrP"
- "NT9jL5XcxcDt2iTj67o2d5xNfDxquN3pPk"
workers: workers:
alphabet: 10 # Number of workers to process events from alphabet contract in parallel alphabet: 10 # Number of workers to process events from alphabet contract in parallel

View file

@ -49,7 +49,9 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/atomic" "go.uber.org/atomic"
@ -803,8 +805,14 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
} }
} }
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
if err != nil {
return nil, err
}
// create alphabet processor // create alphabet processor
alphabetProcessor, err := alphabet.New(&alphabet.Params{ alphabetProcessor, err := alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets,
Log: log, Log: log,
PoolSize: cfg.GetInt("workers.alphabet"), PoolSize: cfg.GetInt("workers.alphabet"),
AlphabetContracts: server.contracts.alphabet, AlphabetContracts: server.contracts.alphabet,
@ -1034,6 +1042,24 @@ func ParsePublicKeysFromStrings(pubKeys []string) (keys.PublicKeys, error) {
return publicKeys, nil return publicKeys, nil
} }
// parseWalletAddressesFromStrings returns a slice of util.Uint160 from a slice
// of strings.
func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) {
if len(wallets) == 0 {
return nil, nil
}
var err error
extraWallets := make([]util.Uint160, len(wallets))
for i := range wallets {
extraWallets[i], err = address.StringToUint160(wallets[i])
if err != nil {
return nil, err
}
}
return extraWallets, nil
}
func (s *Server) initConfigFromBlockchain() error { func (s *Server) initConfigFromBlockchain() error {
// get current epoch // get current epoch
epoch, err := s.netmapClient.Epoch() epoch, err := s.netmapClient.Epoch()

View file

@ -57,7 +57,7 @@ func (ap *Processor) processEmit() {
return return
} }
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(ln)) gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(ln+len(ap.parsedWallets)))
for i := range nmNodes { for i := range nmNodes {
keyBytes := nmNodes[i].PublicKey() keyBytes := nmNodes[i].PublicKey()
@ -79,4 +79,17 @@ func (ap *Processor) processEmit() {
) )
} }
} }
err = ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode)
if err != nil {
receiversLog := make([]string, len(ap.parsedWallets))
for i, addr := range ap.parsedWallets {
receiversLog[i] = addr.StringLE()
}
ap.log.Warn("can't transfer gas to wallet",
zap.Strings("receivers", receiversLog),
zap.Int64("amount", int64(gasPerNode)),
zap.String("error", err.Error()),
)
}
} }

View file

@ -33,6 +33,7 @@ type (
// Processor of events produced for alphabet contracts in the sidechain. // Processor of events produced for alphabet contracts in the sidechain.
Processor struct { Processor struct {
parsedWallets []util.Uint160
log *logger.Logger log *logger.Logger
pool *ants.Pool pool *ants.Pool
alphabetContracts Contracts alphabetContracts Contracts
@ -44,6 +45,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
ParsedWallets []util.Uint160
Log *logger.Logger Log *logger.Logger
PoolSize int PoolSize int
AlphabetContracts Contracts AlphabetContracts Contracts
@ -73,6 +75,7 @@ func New(p *Params) (*Processor, error) {
} }
return &Processor{ return &Processor{
parsedWallets: p.ParsedWallets,
log: p.Log, log: p.Log,
pool: pool, pool: pool,
alphabetContracts: p.AlphabetContracts, alphabetContracts: p.AlphabetContracts,

View file

@ -244,6 +244,40 @@ func (c *Client) TransferGas(receiver util.Uint160, amount fixedn.Fixed8) error
return nil return nil
} }
func (c *Client) BatchTransferGas(receivers []util.Uint160, amount fixedn.Fixed8) error {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
if c.inactive {
return ErrConnectionLost
}
transferParams := make([]nep17.TransferParameters, len(receivers))
receiversLog := make([]string, len(receivers))
for i, receiver := range receivers {
transferParams[i] = nep17.TransferParameters{
From: c.accAddr,
To: receiver,
Amount: big.NewInt(int64(amount)),
Data: nil,
}
receiversLog[i] = receiver.StringLE()
}
txHash, vub, err := c.gasToken.MultiTransfer(transferParams)
if err != nil {
return err
}
c.logger.Debug("batch gas transfer invoke",
zap.Strings("to", receiversLog),
zap.Stringer("tx_hash", txHash.Reverse()),
zap.Uint32("vub", vub))
return nil
}
// Wait function blocks routing execution until there // Wait function blocks routing execution until there
// are `n` new blocks in the chain. // are `n` new blocks in the chain.
// //